%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2000-2013. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
%%----------------------------------------------------------------------
%% Purpose: Collect trace events and provide a backing storage
%% appropriate for iteration
%%----------------------------------------------------------------------
-module(et_collector).
-behaviour(gen_server).
%% External exports
-export([
start_link/1,
stop/1,
report/2,
report_event/5,
report_event/6,
iterate/3,
iterate/5,
lookup/2,
start_trace_client/3,
start_trace_port/1,
%% load_event_file/2,
save_event_file/3,
clear_table/1,
get_global_pid/0,
%% get_table_handle/1,
get_table_size/1,
change_pattern/2,
make_key/2,
dict_insert/3,
dict_delete/2,
dict_lookup/2,
dict_match/2,
multicast/2
]).
%% Internal export
-export([monitor_trace_port/2]).
%% gen_server callbacks
-export([init/1,terminate/2, code_change/3,
handle_call/3, handle_cast/2, handle_info/2]).
-compile([{nowarn_deprecated_function,[{erlang,now,0}]}]).
-include("et_internal.hrl").
-include("../include/et.hrl").
-record(state, {parent_pid,
auto_shutdown, % Optionally shutdown when the last subscriber dies
event_tab_size,
event_tab,
dict_tab,
event_order,
subscribers,
file,
trace_pattern,
trace_port,
trace_max_queue,
trace_nodes,
trace_global}).
-record(file, {name, desc, event_opt, file_opt, table_opt}).
-record(table_handle, {collector_pid, event_tab, event_order, filter}).
-record(trace_ts, {trace_ts, event_ts}).
-record(event_ts, {event_ts, trace_ts}).
%%%----------------------------------------------------------------------
%%% Client side
%%%----------------------------------------------------------------------
%%----------------------------------------------------------------------
%% start_link(Options) -> {ok, CollectorPid} | {error, Reason}
%%
%% Start a collector process
%%
%% The collector collects trace events and keeps them ordered by their
%% timestamp. The timestamp may either reflect the time when the
%% actual trace data was generated (trace_ts) or when the trace data
%% was transformed into an event record (event_ts). If the time stamp
%% is missing in the trace data (missing timestamp option to
%% erlang:trace/4) the trace_ts will be set to the event_ts.
%%
%% Events are reported to the collector directly with the report
%% function or indirectly via one or more trace clients. All reported
%% events are first filtered thru the collector filter before they are
%% stored by the collector. By replacing the default collector filter
%% with a customized dito it is possible to allow any trace data as
%% input. The collector filter is a dictionary entry with the
%% predefined key {filter, all} and the value is a fun of
%% arity 1. See et_selector:parse_event/2 for interface details,
%% such as which erlang:trace/1 tuples that are accepted.
%%
%% The collector has a built-in dictionary service. Any term may be
%% stored as value in the dictionary and bound to a unique key. When
%% new values are inserted with an existing key, the new values will
%% overwrite the existing ones. Processes may subscribe on dictionary
%% updates by using {subscriber, pid()} as dictionary key. All
%% dictionary updates will be propagated to the subscriber processes
%% matching the pattern {{subscriber, '_'}, '_'} where the first '_'
%% is interpreted as a pid().
%%
%% In global trace mode, the collector will automatically start
%% tracing on all connected Erlang nodes. When a node connects, a port
%% tracer will be started on that node and a corresponding trace
%% client on the collector node. By default the global trace pattern
%% is 'max'.
%%
%% Options = [option()]
%%
%% option() =
%% {parent_pid, pid()} |
%% {event_order, event_order()} |
%% {dict_insert, {filter, all}, collector_fun()} |
%% {dict_insert, {filter, event_filter_name()}, event_filter_fun()} |
%% {dict_insert, {subscriber, pid()}, dict_val()} |
%% {dict_insert, dict_key(), dict_val()} |
%% {dict_delete, dict_key()} |
%% {trace_client, trace_client()} |
%% {trace_global, boolean()} |
%% {trace_pattern, trace_pattern()} |
%% {trace_port, integer()} |
%% {trace_max_queue, integer()}
%%
%% event_order() = trace_ts | event_ts
%% trace_pattern() = detail_level() | dbg_match_spec()
%% detail_level() = min | max | integer(X) when X >= 0, X =< 100
%% trace_client() =
%% {event_file, file_name()} |
%% {dbg_trace_type(), dbg_trace_parameters()}
%% file_name() = string()
%% collector_fun() = trace_filter_fun() | event_filter_fun()
%% trace_filter_fun() = fun(TraceData) -> false | true | {true, NewEvent}
%% event_filter_fun() = fun(Event) -> false | true | {true, NewEvent}
%% event_filter_name() = atom()
%% TraceData = erlang_trace_data()
%% Event = NewEvent = record(event)
%% dict_key() = term()
%% dict_val() = term()
%%
%% CollectorPid = pid()
%% Reason = term()
%%----------------------------------------------------------------------
start_link(Options) ->
case parse_opt(Options, default_state(), [], []) of
{ok, S, Dict2, Clients} ->
Res =
case S#state.trace_global of
false ->
gen_server:start_link(?MODULE, [S, Dict2], []);
true ->
gen_server:start_link({global, ?MODULE}, ?MODULE, [S, Dict2], [])
end,
case Res of
{ok, Pid} when S#state.parent_pid =/= self() ->
unlink(Pid),
start_clients(Pid, Clients);
{ok,Pid} ->
start_clients(Pid, Clients);
{error, Reason} ->
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.
default_state() ->
#state{parent_pid = self(),
auto_shutdown = false,
event_order = trace_ts,
subscribers = [],
trace_global = false,
trace_pattern = undefined,
trace_nodes = [],
trace_port = 4711,
trace_max_queue = 50}.
parse_opt([], S, Dict, Clients) ->
{Mod, Pattern} = et_selector:make_pattern(S#state.trace_pattern),
Fun = fun(E) -> et_selector:parse_event(Mod, E) end,
Default = {dict_insert, {filter, ?DEFAULT_FILTER_NAME}, Fun},
{ok, S#state{trace_pattern = {Mod, Pattern}}, [Default | Dict], Clients};
parse_opt([H | T], S, Dict, Clients) ->
case H of
{parent_pid, Parent} when Parent =:= undefined ->
parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients);
{parent_pid, Parent} when is_pid(Parent) ->
parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients);
{auto_shutdown, Bool} when Bool =:= true; Bool =:= false ->
parse_opt(T, S#state{auto_shutdown = Bool}, Dict, Clients);
{event_order, Order} when Order =:= trace_ts ->
parse_opt(T, S#state{event_order = Order}, Dict, Clients);
{event_order, Order} when Order =:= event_ts ->
parse_opt(T, S#state{event_order = Order}, Dict, Clients);
{dict_insert, {filter, Name}, Fun} ->
if
is_atom(Name), is_function(Fun) ->
parse_opt(T, S, Dict ++ [H], Clients);
true ->
{error, {bad_option, H}}
end;
{dict_insert, {subscriber, Pid}, _Val} ->
if
is_pid(Pid) ->
parse_opt(T, S, Dict ++ [H], Clients);
true ->
{error, {bad_option, H}}
end;
{dict_insert, _Key, _Val} ->
parse_opt(T, S, Dict ++ [H], Clients);
{dict_delete, _Key} ->
parse_opt(T, S, Dict ++ [H], Clients);
{trace_client, Client = {_, _}} ->
parse_opt(T, S, Dict, Clients ++ [Client]);
{trace_global, Bool} when Bool =:= false ->
parse_opt(T, S#state{trace_global = Bool}, Dict, Clients);
{trace_global, Bool} when Bool =:= true ->
parse_opt(T, S#state{trace_global = Bool}, Dict, Clients);
{trace_pattern, {Mod, _} = Pattern} when is_atom(Mod) ->
parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients);
{trace_pattern, undefined = Pattern} ->
parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients);
{trace_port, Port} when is_integer(Port) ->
parse_opt(T, S#state{trace_port = Port}, Dict, Clients);
{trace_max_queue, MaxQueue} when is_integer(MaxQueue) ->
parse_opt(T, S#state{trace_port = MaxQueue}, Dict, Clients);
Bad ->
{error, {bad_option, Bad}}
end;
parse_opt(BadList, _S, _Dict, _Clients) ->
{error, {bad_option_list, BadList}}.
start_clients(CollectorPid, [{Type, Parameters} | T]) ->
start_trace_client(CollectorPid, Type, Parameters),
start_clients(CollectorPid, T);
start_clients(CollectorPid, []) ->
{ok, CollectorPid}.
%%----------------------------------------------------------------------
%% stop(CollectorPid) -> ok
%%
%% Stop a collector process
%%
%% CollectorPid = pid()
%%----------------------------------------------------------------------
stop(CollectorPid) ->
call(CollectorPid, stop).
%%----------------------------------------------------------------------
%% save_event_file(CollectorPid, FileName, Options) -> ok | {error, Reason}
%%
%% Saves the events to a file
%%
%% CollectorPid = pid()
%% FileName = string()
%% Options = [option()]
%% Reason = term()
%%
%% option() = event_option() | file_option() | table_option()
%% event_option() = existing
%% file_option() = write | append
%% table_option() = keep | clear
%%
%% By default the currently stored events (existing) are
%% written to a brand new file (write) and the events are
%% kept (keep) after they have been written to the file.
%%
%% Instead of keeping the events after writing them to file,
%% it is possible to remove all stored events after they
%% have successfully written to file (clear).
%%
%% The options defaults to existing, write and keep.
%%----------------------------------------------------------------------
save_event_file(CollectorPid, FileName, Options) ->
call(CollectorPid, {save_event_file, FileName, Options}).
%%----------------------------------------------------------------------
%% load_event_file(CollectorPid, FileName) ->{ok, BadBytes} | exit(Reason)
%%
%% Load the event table from a file
%%
%% CollectorPid = pid()
%% FileName = string()
%% BadBytes = integer(X) where X >= 0
%% Reason = term()
%%----------------------------------------------------------------------
load_event_file(CollectorPid, FileName) ->
Fd = make_ref(),
Args = [{file, FileName}, {name, Fd}, {repair, true}, {mode, read_only}],
Fun = fun(Event, {ok, TH}) -> report(TH, Event) end,
case disk_log:open(Args) of
{ok, _} ->
do_load_event_file(Fun, Fd, start, {ok, CollectorPid}, FileName, 0);
{repaired, _, _, BadBytes} ->
do_load_event_file(Fun, Fd, start, {ok, CollectorPid}, FileName, BadBytes);
{error, Reason} ->
exit({disk_log_open, FileName, Reason})
end.
do_load_event_file(Fun, Fd, Cont, Acc, FileName, BadBytes) ->
case disk_log:chunk(Fd, Cont) of
eof ->
{ok, BadBytes};
{error, Reason} ->
exit({bad_disk_log_chunk, FileName, Reason});
{Cont2, Events} ->
Acc2 = lists:foldl(Fun, Acc, Events),
do_load_event_file(Fun, Fd, Cont2, Acc2, FileName, BadBytes);
{Cont2, Events, More} ->
Acc2 = lists:foldl(Fun, Acc, Events),
do_load_event_file(Fun, Fd, Cont2, Acc2, FileName, BadBytes + More)
end.
%%----------------------------------------------------------------------
%% report(Handle, TraceOrEvent)
%%
%% Report an event to the collector
%%
%% All events are filtered thru the collector filter, which
%% optionally may transform or discard the event. The first
%% call should use the pid of the collector process as
%% report handle, while subsequent calls should use the
%% table handle.
%%
%% Handle = Initial | Continuation
%% Initial = collector_pid()
%% collector_pid() = pid()
%% Continuation = record(table_handle)
%%
%% TraceOrEvent = record(event) | dbg_trace_tuple() | end_of_trace
%% Reason = term()
%%
%% Returns: {ok, Continuation} | exit(Reason)
%%----------------------------------------------------------------------
report(CollectorPid, TraceOrEvent) when is_pid(CollectorPid) ->
case get_table_handle(CollectorPid) of
{ok, TH} when is_record(TH, table_handle) ->
report(TH, TraceOrEvent);
{error, Reason} ->
exit(Reason)
end;
report(TH, TraceOrEvent) when is_record(TH, table_handle) ->
Fun = TH#table_handle.filter,
case Fun(TraceOrEvent) of
false ->
{ok, TH};
true when is_record(TraceOrEvent, event) ->
Key = make_key(TH, TraceOrEvent),
case catch ets:insert(TH#table_handle.event_tab, {Key, TraceOrEvent}) of
true ->
{ok, TH};
{'EXIT', _Reason} ->
%% Refresh the report handle and try again
report(TH#table_handle.collector_pid, TraceOrEvent)
end;
{true, Event} when is_record(Event, event) ->
Key = make_key(TH, Event),
case catch ets:insert(TH#table_handle.event_tab, {Key, Event}) of
true ->
{ok, TH};
{'EXIT', _Reason} ->
%% Refresh the report handle and try again
report(TH#table_handle.collector_pid, TraceOrEvent)
end;
BadEvent ->
TS = erlang:now(),
Contents = [{trace, TraceOrEvent}, {reason, BadEvent}, {filter, Fun}],
Event = #event{detail_level = 0,
trace_ts = TS,
event_ts = TS,
from = bad_filter,
to = bad_filter,
label = bad_filter,
contents = Contents},
Key = make_key(TH, Event),
case catch ets:insert(TH#table_handle.event_tab, {Key, Event}) of
true ->
{ok, TH};
{'EXIT', _Reason} ->
%% Refresh the report handle and try again
report(TH#table_handle.collector_pid, TraceOrEvent)
end
end;
report(_, Bad) ->
exit({bad_event, Bad}).
report_event(CollectorPid, DetailLevel, FromTo, Label, Contents) ->
report_event(CollectorPid, DetailLevel, FromTo, FromTo, Label, Contents).
report_event(CollectorPid, DetailLevel, From, To, Label, Contents)
when is_integer(DetailLevel),
DetailLevel >= ?detail_level_min,
DetailLevel =< ?detail_level_max ->
TS= erlang:now(),
E = #event{detail_level = DetailLevel,
trace_ts = TS,
event_ts = TS,
from = From,
to = To,
label = Label,
contents = Contents},
report(CollectorPid, E).
%%----------------------------------------------------------------------
%% make_key(Type, Stuff) -> Key
%%
%% Makes a key out of an event record or an old key
%%
%% Type = record(table_handle) | trace_ts | event_ts
%% Stuff = record(event) | Key
%% Key = record(event_ts) | record(trace_ts)
%%----------------------------------------------------------------------
make_key(TH, Stuff) when is_record(TH, table_handle) ->
make_key(TH#table_handle.event_order, Stuff);
make_key(trace_ts, Stuff) ->
if
is_record(Stuff, event) ->
#event{trace_ts = R, event_ts = P} = Stuff,
#trace_ts{trace_ts = R, event_ts = P};
is_record(Stuff, trace_ts) ->
Stuff;
is_record(Stuff, event_ts) ->
#event_ts{trace_ts = R, event_ts = P} = Stuff,
#trace_ts{trace_ts = R, event_ts = P}
end;
make_key(event_ts, Stuff) ->
if
is_record(Stuff, event) ->
#event{trace_ts = R, event_ts = P} = Stuff,
#event_ts{trace_ts = R, event_ts = P};
is_record(Stuff, event_ts) ->
Stuff;
is_record(Stuff, trace_ts) ->
#trace_ts{trace_ts = R, event_ts = P} = Stuff,
#event_ts{trace_ts = R, event_ts = P}
end.
%%----------------------------------------------------------------------
%%----------------------------------------------------------------------
get_table_size(CollectorPid) when is_pid(CollectorPid) ->
call(CollectorPid, get_table_size).
%%----------------------------------------------------------------------
%% get_table_handle(CollectorPid) -> Handle
%%
%% Return a table handle
%%
%% CollectorPid = pid()
%% Handle = record(table_handle)
%%----------------------------------------------------------------------
get_table_handle(CollectorPid) when is_pid(CollectorPid) ->
call(CollectorPid, get_table_handle).
%%----------------------------------------------------------------------
%% get_global_pid() -> CollectorPid | exit(Reason)
%%
%% Return a the identity of the globally registered collector
%% if there is any
%%
%% CollectorPid = pid()
%% Reason = term()
%%----------------------------------------------------------------------
get_global_pid() ->
case global:whereis_name(?MODULE) of
CollectorPid when is_pid(CollectorPid) ->
CollectorPid;
undefined ->
exit(global_collector_not_started)
end.
%%----------------------------------------------------------------------
%% change_pattern(CollectorPid, RawPattern) -> {old_pattern, TracePattern}
%%
%% Change active trace pattern globally on all trace nodes
%%
%% CollectorPid = pid()
%% RawPattern = {report_module(), extended_dbg_match_spec()}
%% report_module() = atom() | undefined
%% extended_dbg_match_spec()() = detail_level() | dbg_match_spec()
%% RawPattern = detail_level()
%% detail_level() = min | max | integer(X) when X =< 0, X >= 100
%% TracePattern = {report_module(), dbg_match_spec_match_spec()}
%%----------------------------------------------------------------------
change_pattern(CollectorPid, RawPattern) ->
Pattern = et_selector:make_pattern(RawPattern),
call(CollectorPid, {change_pattern, Pattern}).
%%----------------------------------------------------------------------
%% dict_insert(CollectorPid, {filter, all}, FilterFun) -> ok
%% dict_insert(CollectorPid, {subscriber, SubscriberPid}, Void) -> ok
%% dict_insert(CollectorPid, Key, Val) -> ok
%%
%% Insert a dictionary entry
%% and send a {et, {dict_insert, Key, Val}} tuple
%% to all registered subscribers.
%%
%% If the entry is a new subscriber, it will imply that
%% the new subscriber process first will get one message
%% for each already stored dictionary entry, before it
%% and all old subscribers will get this particular entry.
%% The collector process links to and then supervises the
%% subscriber process. If the subscriber process dies it
%% will imply that it gets unregistered as with a normal
%% dict_delete/2.
%%
%% CollectorPid = pid()
%% FilterFun = filter_fun()
%% SubscriberPid = pid()
%% Void = term()
%% Key = term()
%% Val = term()
%%----------------------------------------------------------------------
dict_insert(CollectorPid, Key = {filter, Name}, Fun) ->
if
is_atom(Name), is_function(Fun) ->
call(CollectorPid, {dict_insert, Key, Fun});
true ->
exit({badarg, Key})
end;
dict_insert(CollectorPid, Key = {subscriber, Pid}, Val) ->
if
is_pid(Pid) ->
call(CollectorPid, {dict_insert, Key, Val});
true ->
exit({badarg, Key})
end;
dict_insert(CollectorPid, Key, Val) ->
call(CollectorPid, {dict_insert, Key, Val}).
%%----------------------------------------------------------------------
%% dict_lookup(CollectorPid, Key) -> [Val]
%%
%% Lookup a dictionary entry and return zero or one value
%%
%% CollectorPid = pid()
%% Key = term()
%% Val = term()
%%----------------------------------------------------------------------
dict_lookup(CollectorPid, Key) ->
call(CollectorPid, {dict_lookup, Key}).
%%----------------------------------------------------------------------
%% Ddict_delete(CollectorPid, Key) -> ok
%%
%% elete a dictionary entry
%% and send a {et, {dict_delete, Key}} tuple
%% to all registered subscribers.
%%
%% If the deleted entry is a registered subscriber, it will
%% imply that the subscriber process gets is unregistered as
%% subscriber as well as it gets it final message.
%%
%% dict_delete(CollectorPid, {subscriber, SubscriberPid})
%% dict_delete(CollectorPid, Key)
%%
%% CollectorPid = pid()
%% SubscriberPid = pid()
%% Key = term()
%%----------------------------------------------------------------------
dict_delete(CollectorPid, Key) ->
call(CollectorPid, {dict_delete, Key}).
%%----------------------------------------------------------------------
%% dict_match(CollectorPid, Pattern) -> [Match]
%%
%% Match some dictionary entries
%%
%% CollectorPid = pid()
%% Pattern = '_' | {key_pattern(), val_pattern()}
%% key_pattern() = ets_match_object_pattern()
%% val_pattern() = ets_match_object_pattern()
%% Match = {key(), val()}
%% key() = term()
%% val() = term()
%%----------------------------------------------------------------------
dict_match(CollectorPid, Pattern) ->
call(CollectorPid, {dict_match, Pattern}).
%%----------------------------------------------------------------------
%% multicast(_CollectorPid, Msg) -> ok
%%
%% Sends a message to all registered subscribers
%%
%% CollectorPid = pid()
%% Msg = term()
%%----------------------------------------------------------------------
multicast(_CollectorPid, Msg = {dict_insert, _Key, _Val}) ->
exit({badarg, Msg});
multicast(_CollectorPid, Msg = {dict_delete, _Key}) ->
exit({badarg, Msg});
multicast(CollectorPid, Msg) ->
call(CollectorPid, {multicast, Msg}).
%%----------------------------------------------------------------------
%% start_trace_client(CollectorPid, Type, Parameters) ->
%% file_loaded | {trace_client_pid, pid()} | exit(Reason)
%%
%% Load raw Erlang trace from a file, port or process.
%%
%% Type = dbg_trace_client_type()
%% Parameters = dbg_trace_client_parameters()
%% Pid = dbg_trace_client_pid()
%%----------------------------------------------------------------------
start_trace_client(CollectorPid, Type, FileName) when Type =:= event_file ->
load_event_file(CollectorPid, FileName);
start_trace_client(CollectorPid, Type, FileName) when Type =:= file ->
WaitFor = {make_ref(), end_of_trace},
EventFun = fun(E, {ReplyTo, {ok, TH}}) -> {ReplyTo, report(TH, E)} end,
EndFun = fun({ReplyTo, {ok, _TH}}) -> ReplyTo ! WaitFor, ReplyTo end,
Spec = trace_spec_wrapper(EventFun, EndFun, {self(), {ok, CollectorPid}}),
Pid = dbg:trace_client(Type, FileName, Spec),
unlink(Pid),
Ref = erlang:monitor(process, Pid),
receive
WaitFor ->
erlang:demonitor(Ref, [flush]),
file_loaded;
{'DOWN', Ref, _, _, Reason} ->
exit(Reason)
end;
start_trace_client(CollectorPid, Type, Parameters) ->
EventFun = fun(Event, {ok, TH}) -> report(TH, Event) end,
EndFun = fun(Acc) -> Acc end,
Spec = trace_spec_wrapper(EventFun, EndFun, {ok, CollectorPid}),
Pid = dbg:trace_client(Type, Parameters, Spec),
CollectorPid ! {register_trace_client, Pid},
unlink(Pid),
{trace_client_pid, Pid}.
trace_spec_wrapper(EventFun, EndFun, EventInitialAcc)
when is_function(EventFun), is_function(EndFun) ->
{fun(Trace, Acc) ->
case Trace =:= end_of_trace of
true -> EndFun(Acc);
false -> EventFun(Trace, Acc)
end
end,
EventInitialAcc}.
start_trace_port(Parameters) ->
dbg:tracer(port, dbg:trace_port(ip, Parameters)).
monitor_trace_port(CollectorPid, Parameters) ->
Res = start_trace_port(Parameters),
spawn(fun() ->
MonitorRef = erlang:monitor(process, CollectorPid),
receive
{'DOWN', MonitorRef, _, _, _} ->
dbg:stop_clear()
end
end),
Res.
%%----------------------------------------------------------------------
%% iterate(Handle, Prev, Limit) ->
%% iterate(Handle, Prev, Limit, undefined, Prev)
%%
%% Iterates over the currently stored events
%%
%% Short for iterate/5.
%%----------------------------------------------------------------------
iterate(Handle, Prev, Limit) ->
iterate(Handle, Prev, Limit, undefined, Prev).
%%----------------------------------------------------------------------
%% iterate(Handle, Prev, Limit, Fun, Acc) -> NewAcc
%%
%% Iterates over the currently stored events and apply a function for
%% each event. The iteration may be performed forwards or backwards
%% and may be limited to a maximum number of events (abs(Limit)).
%%
%% Handle = collector_pid() | table_handle()
%% Prev = first | last | event_key()
%% Limit = done() | forward() | backward()
%% collector_pid() = pid()
%% table_handle() = record(table_handle)
%% event_key() =
%% done() = 0
%% forward() = infinity | integer(X) where X > 0
%% backward() = '-infinity' | integer(X) where X < 0
%% Fun = fun(Event, Acc) -> NewAcc
%% Acc = NewAcc = term()
%%----------------------------------------------------------------------
iterate(_, _, Limit, _, Acc) when Limit =:= 0 ->
Acc;
iterate(CollectorPid, Prev, Limit, Fun, Acc) when is_pid(CollectorPid) ->
case get_table_handle(CollectorPid) of
{ok, TH} when is_record(TH, table_handle) ->
iterate(TH, Prev, Limit, Fun, Acc);
{error, Reason} ->
exit(Reason)
end;
iterate(TH, Prev, Limit, Fun, Acc) when is_record(TH, table_handle) ->
if
Limit =:= infinity ->
next_iterate(TH, Prev, Limit, Fun, Acc);
is_integer(Limit), Limit > 0 ->
next_iterate(TH, Prev, Limit, Fun, Acc);
Limit =:= '-infinity' ->
prev_iterate(TH, Prev, Limit, Fun, Acc);
is_integer(Limit), Limit < 0 ->
prev_iterate(TH, Prev, Limit, Fun, Acc)
end.
next_iterate(TH, Prev = first, Limit, Fun, Acc) ->
Tab = TH#table_handle.event_tab,
case catch ets:first(Tab) of
'$end_of_table' ->
Acc;
{'EXIT', _} = Error ->
io:format("~p(~p): First ~p~n", [?MODULE, ?LINE, Error]),
iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
First ->
lookup_and_apply(TH, Prev, First, Limit, -1, Fun, Acc)
end;
next_iterate(TH, Prev = last, Limit, Fun, Acc) ->
Tab = TH#table_handle.event_tab,
case catch ets:last(Tab) of
'$end_of_table' ->
Acc;
{'EXIT', _} = Error ->
io:format("~p(~p): Last ~p~n", [?MODULE, ?LINE, Error]),
iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
Last ->
lookup_and_apply(TH, Prev, Last, Limit, -1, Fun, Acc)
end;
next_iterate(TH, Prev, Limit, Fun, Acc) ->
Tab = TH#table_handle.event_tab,
Key = make_key(TH, Prev),
case catch ets:next(Tab, Key) of
'$end_of_table' ->
Acc;
{'EXIT', _} = Error ->
io:format("~p(~p): Next ~p -> ~p~n", [?MODULE, ?LINE, Key, Error]),
iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
Next ->
lookup_and_apply(TH, Prev, Next, Limit, -1, Fun, Acc)
end.
prev_iterate(TH, Prev = first, Limit, Fun, Acc) ->
Tab = TH#table_handle.event_tab,
case catch ets:first(Tab) of
'$end_of_table' ->
Acc;
{'EXIT', _} = Error ->
io:format("~p(~p): First ~p~n", [?MODULE, ?LINE, Error]),
iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
First ->
lookup_and_apply(TH, Prev, First, Limit, 1, Fun, Acc)
end;
prev_iterate(TH, Prev = last, Limit, Fun, Acc) ->
Tab = TH#table_handle.event_tab,
case catch ets:last(Tab) of
'$end_of_table' ->
Acc;
{'EXIT', _} = Error ->
io:format("~p(~p): Last ~p~n", [?MODULE, ?LINE, Error]),
iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
Last ->
lookup_and_apply(TH, Prev, Last, Limit, 1, Fun, Acc)
end;
prev_iterate(TH, Prev, Limit, Fun, Acc) ->
Tab = TH#table_handle.event_tab,
Key = make_key(TH, Prev),
case catch ets:prev(Tab, Key) of
'$end_of_table' ->
Acc;
{'EXIT', _} = Error ->
io:format("~p(~p): Prev ~p -> ~p~n", [?MODULE, ?LINE, Key, Error]),
iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
Next ->
lookup_and_apply(TH, Prev, Next, Limit, 1, Fun, Acc)
end.
lookup_and_apply(TH, _Prev, Next, Limit, Incr, Fun, _Acc) when Fun =:= undefined ->
Limit2 = incr(Limit, Incr),
iterate(TH, Next, Limit2, Fun, Next);
lookup_and_apply(TH, Prev, Next, Limit, Incr, Fun, Acc) ->
Tab = TH#table_handle.event_tab,
case catch ets:lookup_element(Tab, Next, 2) of
{'EXIT', _} ->
iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
E when is_record(E, event) ->
Acc2 = Fun(E, Acc),
Limit2 = incr(Limit, Incr),
iterate(TH, Next, Limit2, Fun, Acc2)
end.
lookup(CollectorPid, Key) when is_pid(CollectorPid) ->
case get_table_handle(CollectorPid) of
{ok, TH} when is_record(TH, table_handle) ->
lookup(TH, Key);
{error, Reason} ->
{error, Reason}
end;
lookup(TH, Key) when is_record(TH, table_handle) ->
Tab = TH#table_handle.event_tab,
case catch ets:lookup_element(Tab, Key, 2) of
{'EXIT', _} ->
{error, enoent};
E when is_record(E, event) ->
{ok, E}
end.
incr(Val, Incr) ->
if
Val =:= infinity -> Val;
Val =:= '-infinity' -> Val;
is_integer(Val) -> Val + Incr
end.
%%----------------------------------------------------------------------
%% clear_table(Handle) -> ok
%%
%% Clear the event table
%%
%% Handle = collector_pid() | table_handle()
%% collector_pid() = pid()
%% table_handle() = record(table_handle)
%%----------------------------------------------------------------------
clear_table(CollectorPid) when is_pid(CollectorPid) ->
call(CollectorPid, clear_table);
clear_table(TH) when is_record(TH, table_handle) ->
clear_table(TH#table_handle.collector_pid).
call(CollectorPid, Request) ->
try
gen_server:call(CollectorPid, Request, infinity)
catch
exit:{noproc,_} ->
{error, no_collector}
end.
%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------
%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%%----------------------------------------------------------------------
init([InitialS, Dict]) ->
process_flag(trap_exit, true),
case InitialS#state.parent_pid of
undefined ->
ignore;
Pid when is_pid(Pid) ->
link(Pid)
end,
Funs = [fun init_tables/1,
fun init_global/1,
fun(S) -> lists:foldl(fun do_dict_insert/2, S, Dict) end],
{ok, lists:foldl(fun(F, S) -> F(S) end, InitialS, Funs)}.
init_tables(S) ->
EventTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]),
DictTab = ets:new(et_dict, [ordered_set, {keypos, 1}, public]),
S#state{event_tab = EventTab, dict_tab = DictTab, event_tab_size = 0}.
init_global(S) ->
case S#state.trace_global of
true ->
EventFun = fun(Event, {ok, TH}) -> report(TH, Event) end,
EndFun = fun(Acc) -> Acc end,
Spec = trace_spec_wrapper(EventFun, EndFun, {ok, self()}),
dbg:tracer(process, Spec),
et_selector:change_pattern(S#state.trace_pattern),
net_kernel:monitor_nodes(true),
lists:foreach(fun(N) -> self() ! {nodeup, N} end, nodes()),
S#state{trace_nodes = [node()]};
false ->
S
end.
%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call({multicast, Msg}, _From, S) ->
do_multicast(S#state.subscribers, Msg),
reply(ok, S);
handle_call(Msg = {dict_insert, _Key, _Val}, _From, S) ->
S2 = do_dict_insert(Msg, S),
reply(ok, S2);
handle_call(Msg = {dict_delete, _Key}, _From, S) ->
try
S2 = do_dict_delete(Msg, S),
reply(ok, S2)
catch
throw:{stop, R} ->
opt_unlink(S#state.parent_pid),
{stop, R, S}
end;
handle_call({dict_lookup, Key}, _From, S) ->
Reply = ets:lookup(S#state.dict_tab, Key),
reply(Reply, S);
handle_call({dict_match, Pattern}, _From, S) ->
case catch ets:match_object(S#state.dict_tab, Pattern) of
{'EXIT', _Reason} ->
reply([], S);
Matching ->
reply(Matching, S)
end;
handle_call(get_table_handle, _From, S) ->
[{_, TableFilter}] = ets:lookup(S#state.dict_tab, {filter, ?DEFAULT_FILTER_NAME}),
TH = #table_handle{collector_pid = self(),
event_tab = S#state.event_tab,
event_order = S#state.event_order,
filter = TableFilter},
reply({ok, TH}, S);
handle_call(get_table_size, _From, S) ->
Size = ets:info(S#state.event_tab, size),
reply({ok, Size}, S);
handle_call(close, _From, S) ->
case S#state.file of
undefined ->
reply({error, file_not_open}, S);
F ->
Reply = disk_log:close(F#file.desc),
S2 = S#state{file = undefined},
reply(Reply, S2)
end;
handle_call({save_event_file, FileName, Options}, _From, S) ->
Default = #file{name = FileName,
event_opt = existing,
file_opt = write,
table_opt = keep},
case parse_file_options(Default, Options) of
{ok, F} when is_record(F, file) ->
case file_open(F) of
{ok, Fd} ->
F2 = F#file{desc = Fd},
{Reply2, S3} =
case F2#file.event_opt of
%% new ->
%% Reply = ok,
%% S2 = S#state{file = F},
%% {Reply, S2};
%%
%% insert() ->
%% case S2#state.file of
%% undefined ->
%% ignore;
%% F ->
%% Fd = F#file.desc,
%% ok = disk_log:log(Fd, Event)
%% end.
existing ->
Fun = fun({_, E}, A) -> ok = disk_log:log(Fd, E), A end,
Tab = S#state.event_tab,
Reply = tab_iterate(Fun, Tab, ets:first(Tab), ok),
disk_log:close(Fd),
{Reply, S}
%% all ->
%% Reply = tab_iterate(WriteFun, Tab, ok),
%% S2 = S#state{file = F},
%% {Reply, S2}
end,
case F2#file.table_opt of
keep ->
reply(Reply2, S3);
clear ->
S4 = do_clear_table(S3),
reply(Reply2, S4)
end;
{error, Reason} ->
reply({error, {file_open, Reason}}, S)
end;
{error, Reason} ->
reply({error, Reason}, S)
end;
handle_call({change_pattern, Pattern}, _From, S) ->
Ns = S#state.trace_nodes,
rpc:multicall(Ns, et_selector, change_pattern, [Pattern]),
Reply = {old_pattern, S#state.trace_pattern},
S2 = S#state{trace_pattern = Pattern},
reply(Reply, S2);
handle_call(clear_table, _From, S) ->
S2 = do_clear_table(S),
reply(ok, S2);
handle_call(stop, _From, S) ->
do_multicast(S#state.subscribers, close),
case S#state.trace_global of
true -> rpc:multicall(S#state.trace_nodes, dbg, stop_clear, []);
false -> ignore
end,
{stop, shutdown, ok, S};
handle_call(Request, From, S) ->
ok = error_logger:format("~p(~p): handle_call(~p, ~p, ~p)~n",
[?MODULE, self(), Request, From, S]),
reply({error, {bad_request, Request}}, S).
%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_cast(Msg, S) ->
ok = error_logger:format("~p(~p): handle_cast(~p, ~p)~n",
[?MODULE, self(), Msg, S]),
noreply(S).
%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_info(timeout, S) ->
S2 = check_size(S),
noreply(S2);
handle_info({nodeup, Node}, S) ->
Port = S#state.trace_port,
MaxQueue = S#state.trace_max_queue,
case rpc:call(Node, ?MODULE, monitor_trace_port, [self(), {Port, MaxQueue}]) of
{ok, _} ->
S2 = listen_on_trace_port(Node, Port, S),
noreply(S2);
{error, Reason} when Reason =:= already_started->
ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n ~p~n",
[?MODULE, self(), Node, Port, Reason]),
S2 = S#state{trace_port = Port + 1},
noreply(S2);
{badrpc, Reason} ->
ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n ~p~n",
[?MODULE, self(), Node, Port, Reason]),
S2 = S#state{trace_port = Port + 1},
noreply(S2);
{error, Reason} ->
self() ! {nodeup, Node},
ok = error_logger:format("~p(~p): producer retry(~p:~p):~n ~p~n",
[?MODULE, self(), Node, Port, Reason]),
S2 = S#state{trace_port = Port + 1},
noreply(S2)
end;
handle_info({nodedown, Node}, S) ->
noreply(S#state{trace_nodes = S#state.trace_nodes -- [Node]});
handle_info({register_trace_client, Pid}, S) ->
link(Pid),
noreply(S);
handle_info({'EXIT', Pid, Reason}, S) when Pid =:= S#state.parent_pid ->
{stop, Reason, S};
handle_info(Info = {'EXIT', Pid, Reason}, S) ->
OldSubscribers = S#state.subscribers,
case lists:member(Pid, OldSubscribers) of
true when Reason =:= shutdown ->
try
S2 = do_dict_delete({dict_delete, {subscriber, Pid}}, S),
noreply(S2)
catch
throw:{stop, R} ->
opt_unlink(S#state.parent_pid),
{stop, R, S}
end;
true ->
opt_unlink(S#state.parent_pid),
{stop, Reason, S};
false ->
ok = error_logger:format("~p(~p): handle_info(~p, ~p)~n",
[?MODULE, self(), Info, S]),
noreply(S)
end;
handle_info(Info, S) ->
ok = error_logger:format("~p(~p): handle_info(~p, ~p)~n",
[?MODULE, self(), Info, S]),
noreply(S).
listen_on_trace_port(Node, Port, S) ->
[_Name, Host] = string:tokens(atom_to_list(Node), [$@]),
case catch start_trace_client(self(), ip, {Host, Port}) of
{trace_client_pid, RemotePid} ->
rpc:call(Node, et_selector, change_pattern, [S#state.trace_pattern]),
link(RemotePid),
S#state{trace_nodes = [Node | S#state.trace_nodes],
trace_port = Port + 1};
{'EXIT', Reason} when Reason =:= already_started->
ok = error_logger:format("~p(~p): consumer ignored(~p:~p): ~p~n",
[?MODULE, self(), Node, Port, Reason]),
S#state{trace_port = Port + 1};
{'EXIT', Reason} ->
self() ! {nodeup, Node},
ok = error_logger:format("~p(~p): consumer retry(~p:~p):~n ~p~n",
[?MODULE, self(), Node, Port, Reason]),
S#state{trace_port = Port + 1}
end.
%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(Reason, S) ->
Fun = fun(Pid) -> exit(Pid, Reason) end,
lists:foreach(Fun, S#state.subscribers).
%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(_OldVsn, S, _Extra) ->
{ok, S}.
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
do_clear_table(S) ->
OldTab = S#state.event_tab,
ets:delete(OldTab),
NewTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]),
S#state{event_tab = NewTab}.
do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when is_pid(Pid) ->
OldSubscribers = S#state.subscribers,
NewSubscribers =
case lists:member(Pid, OldSubscribers) of
true ->
OldSubscribers;
false ->
link(Pid),
All = ets:match_object(S#state.dict_tab, '_'),
lists:foreach(fun({K, V}) -> Pid ! {et, {dict_insert, K, V}} end, All),
[Pid | OldSubscribers]
end,
do_multicast(NewSubscribers, Msg),
Size = ets:info(S#state.event_tab, size),
do_multicast(NewSubscribers, {more_events, Size}),
ets:insert(S#state.dict_tab, {Key, Val}),
S#state{subscribers = NewSubscribers};
do_dict_insert(Msg = {dict_insert, Key, Val}, S) ->
do_multicast(S#state.subscribers, Msg),
ets:insert(S#state.dict_tab, {Key, Val}),
S.
do_dict_delete(Msg = {dict_delete, Key = {subscriber, Pid}}, S) ->
OldSubscribers = S#state.subscribers,
do_multicast(OldSubscribers, Msg),
ets:delete(S#state.dict_tab, Key),
case lists:member(Pid, OldSubscribers) of
true ->
unlink(Pid),
S2 = S#state{subscribers = OldSubscribers -- [Pid]},
if
S2#state.auto_shutdown,
S2#state.subscribers =:= [] ->
throw({stop, shutdown});
true ->
S2
end;
false ->
S
end;
do_dict_delete({dict_delete, {filter, ?DEFAULT_FILTER_NAME}}, S) ->
S;
do_dict_delete(Msg = {dict_delete, Key}, S) ->
do_multicast(S#state.subscribers, Msg),
ets:delete(S#state.dict_tab, Key),
S.
tab_iterate(_Fun, _Tab, '$end_of_table', Acc) ->
Acc;
tab_iterate(Fun, Tab, Key, Acc) ->
Acc2 = lists:foldl(Fun, Acc, ets:lookup(Tab, Key)),
tab_iterate(Fun, Tab, ets:next(Tab, Key), Acc2).
file_open(F) ->
Fd = make_ref(),
case F#file.file_opt of
write -> file:rename(F#file.name, F#file.name ++ ".OLD");
append -> ignore
end,
Args = [{file, F#file.name}, {name, Fd},
{repair, true}, {mode, read_write}],
case disk_log:open(Args) of
{ok, _} ->
{ok, Fd};
{repaired, _, _, BadBytes} ->
ok = error_logger:format("~p: Skipped ~p bad bytes in file: ~p~n",
[?MODULE, BadBytes, F#file.name]),
{ok, Fd};
{error,Reason} ->
{error,Reason}
end.
parse_file_options(F, [H | T]) ->
case H of
existing -> parse_file_options(F#file{event_opt = existing} , T);
%%new -> parse_file_options(F#file{event_opt = new} , T);
all -> parse_file_options(F#file{event_opt = all} , T);
write -> parse_file_options(F#file{file_opt = write} , T);
append -> parse_file_options(F#file{file_opt = append} , T);
keep -> parse_file_options(F#file{table_opt = keep} , T);
clear -> parse_file_options(F#file{table_opt = clear} , T);
Bad -> {error, {bad_file_option, Bad}}
end;
parse_file_options(F, []) ->
{ok, F}.
do_multicast([Pid | Pids], Msg) ->
Pid ! {et, Msg},
do_multicast(Pids, Msg);
do_multicast([], _Msg) ->
ok.
opt_unlink(Pid) ->
if
Pid =:= undefined ->
ignore;
true ->
unlink(Pid)
end.
reply(Reply, #state{subscribers = []} = S) ->
{reply, Reply, S};
reply(Reply, S) ->
{reply, Reply, S, 500}.
noreply(#state{subscribers = []} = S) ->
{noreply, S};
noreply(S) ->
{noreply, S, 500}.
check_size(S) ->
Size = ets:info(S#state.event_tab, size),
if
Size =:= S#state.event_tab_size ->
S;
true ->
%% Tell the subscribers that more events are available
Msg = {more_events, Size},
do_multicast(S#state.subscribers, Msg),
S#state{event_tab_size = Size}
end.