%% 
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2007-2011. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%

%% 
%% @doc Percept database.
%%	
%% 

-module(percept_db).

-export([
	start/0,
	stop/0,
	insert/1,
	select/2,
	select/1,
	consolidate/0
	]).

-include("percept.hrl").
-define(STOP_TIMEOUT, 1000).
%%==========================================================================
%%
%% 		Type definitions 
%%
%%==========================================================================

%% @type activity_option() = 
%%	{ts_min, timestamp()} | 
%%	{ts_max, timestamp()} | 
%%	{ts_exact, bool()} | 
%%	{mfa, {atom(), atom(), byte()}} | 
%%	{state, active | inactive} | 
%%	{id, all | procs | ports | pid() | port()}

%% @type scheduler_option() =
%%	{ts_min, timestamp()} | 
%%	{ts_max, timestamp()} |
%%	{ts_exact, bool()} |
%%	{id, scheduler_id()}

%% @type system_option() = start_ts | stop_ts

%% @type information_option() =
%%	all | procs | ports | pid() | port()




%%==========================================================================
%%
%% 		Interface functions
%%
%%==========================================================================

%% @spec start() -> ok | {started, Pid} | {restarted, Pid}
%%	Pid = pid()
%% @doc Starts or restarts the percept database.

-spec start() -> {'started', pid()} | {'restarted', pid()}.

start() ->
    case erlang:whereis(percept_db) of
    	undefined ->
	    {started, do_start()};
	PerceptDB ->
	    {restarted, restart(PerceptDB)}
    end.

%% @spec restart(pid()) -> pid()
%% @private
%% @doc restarts the percept database.

-spec restart(pid())-> pid().

restart(PerceptDB)->
        stop_sync(PerceptDB),
        do_start().

%% @spec do_start() -> pid()
%% @private
%% @doc starts the percept database.

-spec do_start()-> pid().

do_start()->
    Pid = spawn( fun() -> init_percept_db() end),
    erlang:register(percept_db, Pid),
    Pid.

%% @spec stop() -> not_started | {stopped, Pid}
%%	Pid = pid()
%% @doc Stops the percept database.

-spec stop() -> 'not_started' | {'stopped', pid()}.

stop() ->
    case erlang:whereis(percept_db) of
    	undefined -> 
	    not_started;
	Pid -> 
	    Pid ! {action, stop},
	    {stopped, Pid}
    end.

%% @spec stop_sync(pid()) -> true
%% @private
%% @doc Stops the percept database, with a synchronous call.

-spec stop_sync(pid())-> true.

stop_sync(Pid)->
    MonitorRef = erlang:monitor(process, Pid),
    stop(),
    receive
        {'DOWN', MonitorRef, _Type, Pid, _Info}->
            true
    after ?STOP_TIMEOUT->
            erlang:demonitor(MonitorRef, [flush]),
            exit(Pid, kill)
    end.

%% @spec insert(tuple()) -> ok
%% @doc Inserts a trace or profile message to the database.  

insert(Trace) -> 
    percept_db ! {insert, Trace},
    ok.


%% @spec select({atom(), Options}) -> Result
%% @doc Synchronous call. Selects information based on a query.
%% 
%% <p>Queries:</p>
%% <pre>
%% {system, Option}
%%	Option = system_option()
%%	Result = timestamp() 
%% {information, Options}
%%	Options = [information_option()]
%%	Result = [#information{}] 
%% {scheduler, Options}
%%	Options = [sceduler_option()]
%%	Result = [#activity{}]
%% {activity, Options}
%%	Options = [activity_option()]
%%	Result = [#activity{}]
%% </pre>
%% <p>
%% Note: selection of Id's are always OR all other options are considered AND.
%% </p>

select(Query) -> 
    percept_db ! {select, self(), Query},
    receive {result, Match} -> Match end.

%% @spec select(atom(), list()) -> Result
%% @equiv select({Table,Options}) 

select(Table, Options) -> 
    percept_db ! {select, self(), {Table, Options}},
    receive {result, Match} -> Match end.

%% @spec consolidate() -> Result
%% @doc Checks timestamp and state-flow inconsistencies in the
%%	the database.

consolidate() ->
    percept_db ! {action, consolidate},
    ok.

%%==========================================================================
%%
%% 		Database loop 
%%
%%==========================================================================

init_percept_db() ->
    % Proc and Port information
    ets:new(pdb_info, [named_table, private, {keypos, #information.id}, set]),

    % Scheduler runnability
    ets:new(pdb_scheduler, [named_table, private, {keypos, #activity.timestamp}, ordered_set]),
    
    % Process and Port runnability
    ets:new(pdb_activity, [named_table, private, {keypos, #activity.timestamp}, ordered_set]),
    
    % System status
    ets:new(pdb_system, [named_table, private, {keypos, 1}, set]),
    
    % System warnings
    ets:new(pdb_warnings, [named_table, private, {keypos, 1}, ordered_set]),
    put(debug, 0),
    loop_percept_db().

loop_percept_db() ->
    receive
	{insert, Trace} ->
	    insert_trace(clean_trace(Trace)),
	    loop_percept_db();
	{select, Pid, Query} ->
	    Pid ! {result, select_query(Query)},
	    loop_percept_db();
	{action, stop} -> 
	    stopped;
	{action, consolidate} ->
	    consolidate_db(),
	    loop_percept_db();
        {operate, Pid, {Table, {Fun, Start}}} ->
	    Result = ets:foldl(Fun, Start, Table),
	    Pid ! {result, Result},
	    loop_percept_db();
	Unhandled -> 
	    io:format("loop_percept_db, unhandled query: ~p~n", [Unhandled]),
	    loop_percept_db()
    end.

%%==========================================================================
%%
%% 		Auxiliary functions 
%%
%%==========================================================================

%% cleans trace messages from external pids

clean_trace(Trace) when is_tuple(Trace) -> list_to_tuple(clean_trace(tuple_to_list(Trace)));
clean_trace(Trace) when is_list(Trace) -> clean_list(Trace, []);
clean_trace(Trace) when is_pid(Trace) ->
    PidStr = pid_to_list(Trace),
    [_,P2,P3p] = string:tokens(PidStr,"."),
    P3 = lists:sublist(P3p, 1, length(P3p) - 1),
    erlang:list_to_pid("<0." ++ P2 ++ "." ++ P3 ++ ">");
clean_trace(Trace) -> Trace.

clean_list([], Out) -> lists:reverse(Out);
clean_list([Element|Trace], Out) ->
    clean_list(Trace, [clean_trace(Element)|Out]).


insert_trace(Trace) ->
    case Trace of
	{profile_start, Ts} ->
	    update_system_start_ts(Ts),
	    ok;
	{profile_stop, Ts} ->
	    update_system_stop_ts(Ts),
	    ok; 
        %%% erlang:system_profile, option: runnable_procs
    	%%% ---------------------------------------------
    	{profile, Id, State, Mfa, TS} when is_pid(Id) ->
	    % Update runnable count in activity and db
	    
	    case check_activity_consistency(Id, State) of
		invalid_state -> 
		    ignored;
		ok ->
		    Rc = get_runnable_count(procs, State),
		    % Update registered procs
		    % insert proc activity
		    update_activity(#activity{
		    	id = Id, 
			state = State, 
			timestamp = TS, 
			runnable_count = Rc,
			where = Mfa}),
		    ok
            end;
	%%% erlang:system_profile, option: runnable_ports
    	%%% ---------------------------------------------
	{profile, Id, State, Mfa, TS} when is_port(Id) ->
	    case check_activity_consistency(Id, State) of
		invalid_state -> 
		    ignored;
		ok ->
		    % Update runnable count in activity and db
		    Rc = get_runnable_count(ports, State),
	    
		    % Update registered ports
		    % insert port activity
		    update_activity(#activity{
		    	id = Id, 
			state = State, 
			timestamp = TS, 
			runnable_count = Rc,
			where = Mfa}),

		    ok
	    end;
	%%% erlang:system_profile, option: scheduler
	{profile, scheduler, Id, State, Scheds, Ts} ->
	    % insert scheduler activity
	    update_scheduler(#activity{
	    	id = {scheduler, Id}, 
		state = State, 
		timestamp = Ts, 
		where = Scheds}),
	    ok;

	%%% erlang:trace, option: procs
	%%% ---------------------------
	{trace_ts, Parent, spawn, Pid, Mfa, TS} ->
	    InformativeMfa = mfa2informative(Mfa),
	    % Update id_information
	    update_information(#information{id = Pid, start = TS, parent = Parent, entry = InformativeMfa}),
	    update_information_child(Parent, Pid),
	    ok;
	{trace_ts, Pid, exit, _Reason, TS} ->
	    % Update registered procs
	    
	    % Update id_information
	    update_information(#information{id = Pid, stop = TS}),
	    
	    ok;
	{trace_ts, Pid, register, Name, _Ts} when is_pid(Pid) ->
	    % Update id_information
	    update_information(#information{id = Pid, name = Name}),
	    ok;
	{trace_ts, Pid, register, Name, _Ts} when is_pid(Pid) ->
	    % Update id_information
	    update_information(#information{id = Pid, name = Name}),
	    ok;
	{trace_ts, _Pid, unregister, _Name, _Ts} -> 
	    % Not implemented
	    ok;
	{trace_ts, Pid, getting_unlinked, _Id, _Ts} when is_pid(Pid) ->
	    % Update id_information
	    ok;
	{trace_ts, Pid, getting_linked, _Id, _Ts} when is_pid(Pid)->
	    % Update id_information
	    ok;
	{trace_ts, Pid, link, _Id, _Ts} when is_pid(Pid)->
	    % Update id_information
	    ok;
	{trace_ts, Pid, unlink, _Id, _Ts} when is_pid(Pid) ->
	    % Update id_information
	    ok;

	%%% erlang:trace, option: ports
	%%% ----------------------------
	{trace_ts, Caller, open, Port, Driver, TS} ->
	    % Update id_information
	    update_information(#information{
	    	id = Port, entry = Driver, start = TS, parent = Caller}),
	    ok;
	{trace_ts, Port, closed, _Reason, Ts} ->
	    % Update id_information
	    update_information(#information{id = Port, stop = Ts}),
	    ok;

	Unhandled ->
	    io:format("insert_trace, unhandled: ~p~n", [Unhandled])
    end.

mfa2informative({erlang, apply, [M, F, Args]})  -> mfa2informative({M, F,Args});
mfa2informative({erlang, apply, [Fun, Args]}) ->
    FunInfo = erlang:fun_info(Fun), 
    M = case proplists:get_value(module, FunInfo, undefined) of
	    []        -> undefined_fun_module;
	    undefined -> undefined_fun_module;
	    Module    -> Module
	end,
    F = case proplists:get_value(name, FunInfo, undefined) of
	    []        -> undefined_fun_function;
	    undefined -> undefined_fun_function;
	    Function  -> Function
	end,
    mfa2informative({M, F, Args});
mfa2informative(Mfa) -> Mfa.

%% consolidate_db() -> bool()
%% Purpose:
%%	Check start/stop time
%%	Activity consistency

consolidate_db() ->
    io:format("Consolidating...~n"),
    % Check start/stop timestamps
    case select_query({system, start_ts}) of
	undefined ->
	    Min = lists:min(list_all_ts()),
	    update_system_start_ts(Min);
	_ -> ok
    end,
    case select_query({system, stop_ts}) of
	undefined ->
	    Max = lists:max(list_all_ts()),
	    update_system_stop_ts(Max);
	_ -> ok
    end,
    consolidate_runnability(),
    ok.

consolidate_runnability() ->
    put({runnable, procs}, undefined),
    put({runnable, ports}, undefined),
    consolidate_runnability_loop(ets:first(pdb_activity)).

consolidate_runnability_loop('$end_of_table') -> ok;
consolidate_runnability_loop(Key) ->
    case ets:lookup(pdb_activity, Key) of
	[#activity{id = Id, state = State } = A] when is_pid(Id) ->
	    Rc = get_runnable_count(procs, State),
	    ets:insert(pdb_activity, A#activity{ runnable_count = Rc});
	[#activity{id = Id, state = State } = A] when is_port(Id) ->
	    Rc = get_runnable_count(ports, State),
	    ets:insert(pdb_activity, A#activity{ runnable_count = Rc});
	_ -> throw(consolidate)
    end,
    consolidate_runnability_loop(ets:next(pdb_activity, Key)).

list_all_ts() ->
    ATs = [Act#activity.timestamp || Act <- select_query({activity, []})],
    STs = [Act#activity.timestamp || Act <- select_query({scheduler, []})],
    ITs = lists:flatten([
	[I#information.start, 
	 I#information.stop] || 
	 I <- select_query({information, all})]),
    %% Filter out all undefined (non ts)
    [Elem || Elem = {_,_,_} <- ATs ++ STs ++ ITs].

%% get_runnable_count(Type, State) -> RunnableCount
%% In: 
%%	Type = procs | ports
%%	State = active | inactive
%% Out:
%%	RunnableCount = integer()
%% Purpose:
%%	Keep track of the number of runnable ports and processes
%%	during the profile duration.

get_runnable_count(Type, State) ->
    case {get({runnable, Type}), State} of 
    	{undefined, active} -> 
	    put({runnable, Type}, 1),
	    1;
	{N, active} ->
	    put({runnable, Type}, N + 1),
	    N + 1;
	{N, inactive} ->
	    put({runnable, Type}, N - 1),
	    N - 1;
	Unhandled ->
	    io:format("get_runnable_count, unhandled ~p~n", [Unhandled]),
	    Unhandled
    end.

check_activity_consistency(Id, State) ->
    case get({previous_state, Id}) of
	State ->
	    io:format("check_activity_consistency, state flow invalid.~n"),
	    invalid_state;
	undefined when State == inactive -> 
	    invalid_state;
	_ ->
	    put({previous_state, Id}, State),
	    ok
    end.
%%%
%%% select_query
%%% In:
%%%	Query = {Table, Option}
%%%	Table = system | activity | scheduler | information


select_query(Query) ->
    case Query of
	{system, _ } -> 
	    select_query_system(Query);
	{activity, _ } -> 
	    select_query_activity(Query);
    	{scheduler, _} ->
	    select_query_scheduler(Query);
	{information, _ } -> 
	    select_query_information(Query);
	Unhandled ->
	    io:format("select_query, unhandled: ~p~n", [Unhandled]),
	    []
    end.

%%% select_query_information

select_query_information(Query) ->
    case Query of
    	{information, all} -> 
	    ets:select(pdb_info, [{
		#information{ _ = '_'},
		[],
		['$_']
		}]);
	{information, procs} ->
	    ets:select(pdb_info, [{
		#information{ id = '$1', _ = '_'},
		[{is_pid, '$1'}],
		['$_']
		}]);
	{information, ports} ->
	    ets:select(pdb_info, [{
		#information{ id = '$1', _ = '_'},
		[{is_port, '$1'}],
		['$_']
		}]);
	{information, Id} when is_port(Id) ; is_pid(Id) -> 
	    ets:select(pdb_info, [{
		#information{ id = Id, _ = '_'},
		[],
		['$_']
		}]);
	Unhandled ->
	    io:format("select_query_information, unhandled: ~p~n", [Unhandled]),
	    []
    end.

%%% select_query_scheduler

select_query_scheduler(Query) ->
    case Query of
	{scheduler, Options} when is_list(Options) ->
	    Head = #activity{
	    	timestamp = '$1',
		id = '$2',
		state = '$3',
		where = '$4',
		_ = '_'},
	    Body = ['$_'],
	    % We don't need id's
	    {Constraints, _ } = activity_ms_and(Head, Options, [], []),
	    ets:select(pdb_scheduler, [{Head, Constraints, Body}]);
	Unhandled ->
	    io:format("select_query_scheduler, unhandled: ~p~n", [Unhandled]),
	    []
    end.

%%% select_query_system

select_query_system(Query) ->
    case Query of
    	{system, start_ts} ->
    	    case ets:lookup(pdb_system, {system, start_ts}) of
	    	[] -> undefined;
		[{{system, start_ts}, StartTS}] -> StartTS
	    end;
    	{system, stop_ts} ->
    	    case ets:lookup(pdb_system, {system, stop_ts}) of
	    	[] -> undefined;
		[{{system, stop_ts}, StopTS}] -> StopTS
	    end;
	Unhandled ->
	    io:format("select_query_system, unhandled: ~p~n", [Unhandled]),
	    []
    end.

%%% select_query_activity

select_query_activity(Query) ->
    case Query of
    	{activity, Options} when is_list(Options) ->
	    case lists:member({ts_exact, true},Options) of
		true ->
		    case catch select_query_activity_exact_ts(Options) of
			{'EXIT', Reason} ->
	    		    io:format(" - select_query_activity [ catch! ]: ~p~n", [Reason]),
			    [];
		    	Match ->
			    Match
		    end;		    
		false ->
		    MS = activity_ms(Options),
		    case catch ets:select(pdb_activity, MS) of
			{'EXIT', Reason} ->
	    		    io:format(" - select_query_activity [ catch! ]: ~p~n", [Reason]),
			    [];
		    	Match ->
			    Match
		    end
	    end;
	Unhandled ->
	    io:format("select_query_activity, unhandled: ~p~n", [Unhandled]),
    	    []
    end.

select_query_activity_exact_ts(Options) ->
    case { proplists:get_value(ts_min, Options, undefined), proplists:get_value(ts_max, Options, undefined) } of
	{undefined, undefined} -> [];
	{undefined, _        } -> [];
	{_        , undefined} -> [];
	{TsMin    , TsMax    } ->
	    % Remove unwanted options
	    Opts = lists_filter([ts_exact], Options),
	    Ms = activity_ms(Opts),
	    case ets:select(pdb_activity, Ms) of
		% no entries within interval
		[] -> 
		    Opts2 = lists_filter([ts_max, ts_min], Opts) ++ [{ts_min, TsMax}],
		    Ms2   = activity_ms(Opts2),
		    case ets:select(pdb_activity, Ms2, 1) of
			'$end_of_table' -> [];
			{[E], _}  -> 
			    [PrevAct] = ets:lookup(pdb_activity, ets:prev(pdb_activity, E#activity.timestamp)),
			    [PrevAct#activity{ timestamp = TsMin} , E] 
		    end;
		Acts ->
		    [Head| _] = Acts,
		    if
			Head#activity.timestamp == TsMin -> Acts;
			true ->
			    PrevTs = ets:prev(pdb_activity, Head#activity.timestamp),
			    case ets:lookup(pdb_activity, PrevTs) of
				[] -> Acts;
				[PrevAct] -> [PrevAct#activity{timestamp = TsMin}|Acts]
			    end
		    end
	    end
    end.

lists_filter([], Options) -> Options;
lists_filter([D|Ds], Options) ->
    lists_filter(Ds, lists:filter(
	fun ({Pred, _}) ->
	    if 
		Pred == D -> false;
		true      -> true
	    end
	end, Options)).

% Options:
% {ts_min, timestamp()}
% {ts_max, timestamp()}
% {mfa, mfa()}
% {state, active | inactive}
% {id, all | procs | ports | pid() | port()}
%
% All options are regarded as AND expect id which are regarded as OR
% For example: [{ts_min, TS1}, {ts_max, TS2}, {id, PID1}, {id, PORT1}] would be
% ({ts_min, TS1} and {ts_max, TS2} and {id, PID1}) or
% ({ts_min, TS1} and {ts_max, TS2} and {id, PORT1}).

activity_ms(Opts) ->
    % {activity, Timestamp, State, Mfa}
    Head = #activity{
    	timestamp = '$1',
	id = '$2',
	state = '$3',
	where = '$4',
	_ = '_'},

    {Conditions, IDs} = activity_ms_and(Head, Opts, [], []),
    Body = ['$_'],
    
    lists:foldl(
    	fun (Option, MS) ->
	    case Option of
		{id, ports} ->
	    	    [{Head, [{is_port, Head#activity.id} | Conditions], Body} | MS];
		{id, procs} ->
	    	    [{Head,[{is_pid, Head#activity.id} | Conditions], Body} | MS];
		{id, ID} when is_pid(ID) ; is_port(ID) ->
	    	    [{Head,[{'==', Head#activity.id, ID} | Conditions], Body} | MS];
		{id, all} ->
	    	    [{Head, Conditions,Body} | MS];
		_ ->
	    	    io:format("activity_ms id dropped ~p~n", [Option]),
	    	    MS
	    end
	end, [], IDs).

activity_ms_and(_, [], Constraints, []) ->
    {Constraints, [{id, all}]};
activity_ms_and(_, [], Constraints, IDs) -> 
    {Constraints, IDs};
activity_ms_and(Head, [Opt|Opts], Constraints, IDs) ->
    case Opt of
	{ts_min, Min} ->
	    activity_ms_and(Head, Opts, 
		[{'>=', Head#activity.timestamp, {Min}} | Constraints], IDs);
	{ts_max, Max} ->
	    activity_ms_and(Head, Opts, 
		[{'=<', Head#activity.timestamp, {Max}} | Constraints], IDs);
	{id, ID} ->
	    activity_ms_and(Head, Opts, 
		Constraints, [{id, ID} | IDs]);
	{state, State} ->
	    activity_ms_and(Head, Opts, 
		[{'==', Head#activity.state, State} | Constraints], IDs);
	{mfa, Mfa} ->
	    activity_ms_and(Head, Opts, 
		[{'==', Head#activity.where, {Mfa}}| Constraints], IDs);
	_ -> 
	    io:format("activity_ms_and option dropped ~p~n", [Opt]),
	    activity_ms_and(Head, Opts, Constraints, IDs)
    end.

% Information = information()

%%%
%%% update_information
%%%


update_information(#information{id = Id} = NewInfo) ->
    case ets:lookup(pdb_info, Id) of
    	[] ->
	    ets:insert(pdb_info, NewInfo),
	    ok;
	[Info] ->
	    % Remake NewInfo and Info to lists then substitute
	    % old values for new values that are not undefined or empty lists.
	    
	    {_, Result} = lists:foldl(
	    	fun (InfoElem, {[NewInfoElem | Tail], Out}) ->
		    case NewInfoElem of
		    	undefined ->
			    {Tail, [InfoElem | Out]};
		    	[] ->
			    {Tail, [InfoElem | Out]};
			NewInfoElem ->
			    {Tail, [NewInfoElem | Out]}
		    end
		end, {tuple_to_list(NewInfo), []}, tuple_to_list(Info)),
	    ets:insert(pdb_info, list_to_tuple(lists:reverse(Result))),
	    ok
    end.

update_information_child(Id, Child) -> 
    case ets:lookup(pdb_info, Id) of
    	[] ->
	    ets:insert(pdb_info,#information{
	    	id = Id,
		children = [Child]}),
	    ok;
	[I] ->
	    ets:insert(pdb_info,I#information{children = [Child | I#information.children]}),
	    ok
    end.

%%%
%%% update_activity
%%%
update_scheduler(Activity) ->
    ets:insert(pdb_scheduler, Activity).

update_activity(Activity) ->
    ets:insert(pdb_activity, Activity). 

%%%
%%% update_system_ts
%%%

update_system_start_ts(TS) ->
    case ets:lookup(pdb_system, {system, start_ts}) of
    	[] ->
	    ets:insert(pdb_system, {{system, start_ts}, TS});
	[{{system, start_ts}, StartTS}] ->
	    DT = ?seconds(StartTS, TS),
	    if 
		DT > 0.0 -> ets:insert(pdb_system, {{system, start_ts}, TS});
	    	true -> ok
	    end;
	Unhandled -> 
	    io:format("update_system_start_ts, unhandled ~p ~n", [Unhandled])
    end.
	
update_system_stop_ts(TS) ->
    case ets:lookup(pdb_system, {system, stop_ts}) of
    	[] ->
	    ets:insert(pdb_system, {{system, stop_ts}, TS});
	[{{system, stop_ts}, StopTS}] ->
	    DT = ?seconds(StopTS, TS),
	    if 
		DT < 0.0 -> ets:insert(pdb_system, {{system, stop_ts}, TS});
	  	true -> ok
	    end;
	Unhandled -> 
	    io:format("update_system_stop_ts, unhandled ~p ~n", [Unhandled])
    end.