diff options
Diffstat (limited to 'lib/et/src/et_collector.erl')
-rw-r--r-- | lib/et/src/et_collector.erl | 1204 |
1 files changed, 1204 insertions, 0 deletions
diff --git a/lib/et/src/et_collector.erl b/lib/et/src/et_collector.erl new file mode 100644 index 0000000000..ea23c188f7 --- /dev/null +++ b/lib/et/src/et_collector.erl @@ -0,0 +1,1204 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2000-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +%%---------------------------------------------------------------------- +%% Purpose: Collect trace events and provide a backing storage +%% appropriate for iteration +%%---------------------------------------------------------------------- + +-module(et_collector). + +-behaviour(gen_server). + +%% External exports +-export([ + start_link/1, + stop/1, + + report/2, + report_event/5, + report_event/6, + + iterate/3, + iterate/5, + + start_trace_client/3, + start_trace_port/1, + %% load_event_file/2, + save_event_file/3, + clear_table/1, + + get_global_pid/0, + %% get_table_handle/1, + change_pattern/2, + make_key/2, + + dict_insert/3, + dict_delete/2, + dict_lookup/2, + dict_match/2, + multicast/2 + ]). + +%% gen_server callbacks +-export([init/1,terminate/2, code_change/3, + handle_call/3, handle_cast/2, handle_info/2]). + +-include("../include/et.hrl"). + +-record(state, {parent_pid, + event_tab, + dict_tab, + event_order, + subscribers, + file, + trace_pattern, + trace_port, + trace_max_queue, + trace_nodes, + trace_global}). + +-record(file, {name, desc, event_opt, file_opt, table_opt}). + +-record(table_handle, {collector_pid, event_tab, event_order, filter}). + +-record(trace_ts, {trace_ts, event_ts}). +-record(event_ts, {event_ts, trace_ts}). + +%%%---------------------------------------------------------------------- +%%% Client side +%%%---------------------------------------------------------------------- + +%%---------------------------------------------------------------------- +%% start_link(Options) -> {ok, CollectorPid} | {error, Reason} +%% +%% Start a collector process +%% +%% The collector collects trace events and keeps them ordered by their +%% timestamp. The timestamp may either reflect the time when the +%% actual trace data was generated (trace_ts) or when the trace data +%% was transformed into an event record (event_ts). If the time stamp +%% is missing in the trace data (missing timestamp option to +%% erlang:trace/4) the trace_ts will be set to the event_ts. +%% +%% Events are reported to the collector directly with the report +%% function or indirectly via one or more trace clients. All reported +%% events are first filtered thru the collector filter before they are +%% stored by the collector. By replacing the default collector filter +%% with a customized dito it is possible to allow any trace data as +%% input. The collector filter is a dictionary entry with the +%% predefined key {filter, collector} and the value is a fun of +%% arity 1. See et_selector:parse_event/2 for interface details, +%% such as which erlang:trace/1 tuples that are accepted. +%% +%% The collector has a built-in dictionary service. Any term may be +%% stored as value in the dictionary and bound to a unique key. When +%% new values are inserted with an existing key, the new values will +%% overwrite the existing ones. Processes may subscribe on dictionary +%% updates by using {subscriber, pid()} as dictionary key. All +%% dictionary updates will be propagated to the subscriber processes +%% matching the pattern {{subscriber, '_'}, '_'} where the first '_' +%% is interpreted as a pid(). +%% +%% In global trace mode, the collector will automatically start +%% tracing on all connected Erlang nodes. When a node connects, a port +%% tracer will be started on that node and a corresponding trace +%% client on the collector node. By default the global trace pattern +%% is 'max'. +%% +%% Options = [option()] +%% +%% option() = +%% {parent_pid, pid()} | +%% {event_order, event_order()} | +%% {dict_insert, {filter, collector}, collector_fun()} | +%% {dict_insert, {filter, event_filter_name()}, event_filter_fun()} | +%% {dict_insert, {subscriber, pid()}, dict_val()} | +%% {dict_insert, dict_key(), dict_val()} | +%% {dict_delete, dict_key()} | +%% {trace_client, trace_client()} | +%% {trace_global, boolean()} | +%% {trace_pattern, trace_pattern()} | +%% {trace_port, integer()} | +%% {trace_max_queue, integer()} +%% +%% event_order() = trace_ts | event_ts +%% trace_pattern() = detail_level() | dbg_match_spec() +%% detail_level() = min | max | integer(X) when X =< 0, X >= 100 +%% trace_client() = +%% {event_file, file_name()} | +%% {dbg_trace_type(), dbg_trace_parameters()} +%% file_name() = string() +%% collector_fun() = trace_filter_fun() | event_filter_fun() +%% trace_filter_fun() = fun(TraceData) -> false | true | {true, NewEvent} +%% event_filter_fun() = fun(Event) -> false | true | {true, NewEvent} +%% event_filter_name() = atom() +%% TraceData = erlang_trace_data() +%% Event = NewEvent = record(event) +%% dict_key() = term() +%% dict_val() = term() +%% +%% CollectorPid = pid() +%% Reason = term() +%%---------------------------------------------------------------------- + +start_link(Options) -> + case parse_opt(Options, default_state(), [], []) of + {ok, S, Dict2, Clients} when S#state.trace_global == false -> + case gen_server:start_link(?MODULE, [S, Dict2], []) of + {ok, Pid} when S#state.parent_pid /= self() -> + unlink(Pid), + start_clients(Pid, Clients); + {ok,Pid} -> + start_clients(Pid, Clients); + {error, Reason} -> + {error, Reason} + end; + {ok, S, Dict2, Clients} when S#state.trace_global == true -> + case gen_server:start_link({global, ?MODULE}, ?MODULE, [S, Dict2], []) of + {ok, Pid} when S#state.parent_pid /= self() -> + unlink(Pid), + start_clients(Pid, Clients); + {ok,Pid} -> + start_clients(Pid, Clients); + {error, Reason} -> + {error, Reason} + end; + {error, Reason} -> + {error, Reason} + end. + +default_state() -> + #state{parent_pid = self(), + event_order = trace_ts, + subscribers = [], + trace_global = false, + trace_pattern = undefined, + trace_nodes = [], + trace_port = 4711, + trace_max_queue = 50}. + +parse_opt([], S, Dict, Clients) -> + {Mod, Pattern} = et_selector:make_pattern(S#state.trace_pattern), + Fun = fun(E) -> et_selector:parse_event(Mod, E) end, + Default = {dict_insert, {filter, collector}, Fun}, + {ok, S#state{trace_pattern = {Mod, Pattern}}, [Default | Dict], Clients}; +parse_opt([H | T], S, Dict, Clients) -> + case H of + {parent_pid, Parent} when Parent == undefined -> + parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients); + {parent_pid, Parent} when pid(Parent) -> + parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients); + {event_order, Order} when Order == trace_ts -> + parse_opt(T, S#state{event_order = Order}, Dict, Clients); + {event_order, Order} when Order == event_ts -> + parse_opt(T, S#state{event_order = Order}, Dict, Clients); + {dict_insert, {filter, Name}, Fun} -> + if + atom(Name), function(Fun) -> + parse_opt(T, S, Dict ++ [H], Clients); + true -> + {error, {bad_option, H}} + end; + {dict_insert, {subscriber, Pid}, _Val} -> + if + pid(Pid) -> + parse_opt(T, S, Dict ++ [H], Clients); + true -> + {error, {bad_option, H}} + end; + {dict_insert, _Key, _Val} -> + parse_opt(T, S, Dict ++ [H], Clients); + {dict_delete, _Key} -> + parse_opt(T, S, Dict ++ [H], Clients); + {trace_client, Client = {_, _}} -> + parse_opt(T, S, Dict, Clients ++ [Client]); + {trace_global, Bool} when Bool == false -> + parse_opt(T, S#state{trace_global = Bool}, Dict, Clients); + {trace_global, Bool} when Bool == true -> + parse_opt(T, S#state{trace_global = Bool}, Dict, Clients); + {trace_pattern, {Mod, _} = Pattern} when atom(Mod) -> + parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients); + {trace_pattern, undefined = Pattern} -> + parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients); + {trace_port, Port} when integer(Port) -> + parse_opt(T, S#state{trace_port = Port}, Dict, Clients); + {trace_max_queue, MaxQueue} when integer(MaxQueue) -> + parse_opt(T, S#state{trace_port = MaxQueue}, Dict, Clients); + Bad -> + {error, {bad_option, Bad}} + end; +parse_opt(BadList, _S, _Dict, _Clients) -> + {error, {bad_option_list, BadList}}. + +start_clients(CollectorPid, [{Type, Parameters} | T]) -> + start_trace_client(CollectorPid, Type, Parameters), + start_clients(CollectorPid, T); +start_clients(CollectorPid, []) -> + {ok, CollectorPid}. + +%%---------------------------------------------------------------------- +%% stop(CollectorPid) -> ok +%% +%% Stop a collector process +%% +%% CollectorPid = pid() +%%---------------------------------------------------------------------- + +stop(CollectorPid) -> + call(CollectorPid, stop). + +%%---------------------------------------------------------------------- +%% save_event_file(CollectorPid, FileName, Options) -> ok | {error, Reason} +%% +%% Saves the events to a file +%% +%% CollectorPid = pid() +%% FileName = string() +%% Options = [option()] +%% Reason = term() +%% +%% option() = event_option() | file_option() | table_option() +%% event_option() = existing +%% file_option() = write | append +%% table_option() = keep | clear +%% +%% By default the currently stored events (existing) are +%% written to a brand new file (write) and the events are +%% kept (keep) after they have been written to the file. +%% +%% Instead of keeping the events after writing them to file, +%% it is possible to remove all stored events after they +%% have successfully written to file (clear). +%% +%% The options defaults to existing, write and keep. +%%---------------------------------------------------------------------- + +save_event_file(CollectorPid, FileName, Options) -> + call(CollectorPid, {save_event_file, FileName, Options}). + +%%---------------------------------------------------------------------- +%% load_event_file(CollectorPid, FileName) ->{ok, BadBytes} | exit(Reason) +%% +%% Load the event table from a file +%% +%% CollectorPid = pid() +%% FileName = string() +%% BadBytes = integer(X) where X >= 0 +%% Reason = term() +%%---------------------------------------------------------------------- + +load_event_file(CollectorPid, FileName) -> + Fd = make_ref(), + Args = [{file, FileName}, {name, Fd}, {repair, true}, {mode, read_only}], + Fun = fun(Event, {ok, TH}) -> report(TH, Event) end, + case disk_log:open(Args) of + {ok, _} -> + do_load_event_file(Fun, Fd, start, {ok, CollectorPid}, FileName, 0); + {repaired, _, _, BadBytes} -> + do_load_event_file(Fun, Fd, start, {ok, CollectorPid}, FileName, BadBytes); + {error, Reason} -> + exit({disk_log_open, FileName, Reason}) + end. + +do_load_event_file(Fun, Fd, Cont, Acc, FileName, BadBytes) -> + case disk_log:chunk(Fd, Cont) of + eof -> + {ok, BadBytes}; + {error, Reason} -> + exit({bad_disk_log_chunk, FileName, Reason}); + {Cont2, Events} -> + Acc2 = lists:foldl(Fun, Acc, Events), + do_load_event_file(Fun, Fd, Cont2, Acc2, FileName, BadBytes); + {Cont2, Events, More} -> + Acc2 = lists:foldl(Fun, Acc, Events), + do_load_event_file(Fun, Fd, Cont2, Acc2, FileName, BadBytes + More) + end. + +%%---------------------------------------------------------------------- +%% report(Handle, TraceOrEvent) +%% +%% Report an event to the collector +%% +%% All events are filtered thru the collector filter, which +%% optionally may transform or discard the event. The first +%% call should use the pid of the collector process as +%% report handle, while subsequent calls should use the +%% table handle. +%% +%% Handle = Initial | Continuation +%% Initial = collector_pid() +%% collector_pid() = pid() +%% Continuation = record(table_handle) +%% +%% TraceOrEvent = record(event) | dbg_trace_tuple() | end_of_trace +%% Reason = term() +%% +%% Returns: {ok, Continuation} | exit(Reason) +%%---------------------------------------------------------------------- + +report(CollectorPid, TraceOrEvent) when pid(CollectorPid) -> + case get_table_handle(CollectorPid) of + {ok, TH} when record(TH, table_handle) -> + report(TH, TraceOrEvent); + {error, Reason} -> + exit(Reason) + end; +report(TH, TraceOrEvent) when record(TH, table_handle) -> + Fun = TH#table_handle.filter, + case Fun(TraceOrEvent) of + false -> + {ok, TH}; + true when record(TraceOrEvent, event) -> + Key = make_key(TH, TraceOrEvent), + case catch ets:insert(TH#table_handle.event_tab, {Key, TraceOrEvent}) of + true -> + {ok, TH}; + {'EXIT', _Reason} -> + %% Refresh the report handle and try again + report(TH#table_handle.collector_pid, TraceOrEvent) + end; + {true, Event} when record(Event, event) -> + Key = make_key(TH, Event), + case catch ets:insert(TH#table_handle.event_tab, {Key, Event}) of + true -> + {ok, TH}; + {'EXIT', _Reason} -> + %% Refresh the report handle and try again + report(TH#table_handle.collector_pid, TraceOrEvent) + end; + BadEvent -> + TS = erlang:now(), + Contents = [{trace, TraceOrEvent}, {reason, BadEvent}, {filter, Fun}], + Event = #event{detail_level = 0, + trace_ts = TS, + event_ts = TS, + from = bad_filter, + to = bad_filter, + label = bad_filter, + contents = Contents}, + Key = make_key(TH, Event), + case catch ets:insert(TH#table_handle.event_tab, {Key, Event}) of + true -> + {ok, TH}; + {'EXIT', _Reason} -> + %% Refresh the report handle and try again + report(TH#table_handle.collector_pid, TraceOrEvent) + end + end; +report(TH, end_of_trace) when record(TH, table_handle) -> + {ok, TH}; +report(_, Bad) -> + exit({bad_event, Bad}). + +report_event(CollectorPid, DetailLevel, FromTo, Label, Contents) -> + report_event(CollectorPid, DetailLevel, FromTo, FromTo, Label, Contents). + +report_event(CollectorPid, DetailLevel, From, To, Label, Contents) + when integer(DetailLevel), DetailLevel >= 0, DetailLevel =< 100, list(Contents) -> + TS= erlang:now(), + E = #event{detail_level = DetailLevel, + trace_ts = TS, + event_ts = TS, + from = From, + to = To, + label = Label, + contents = Contents}, + report(CollectorPid, E). + +%%---------------------------------------------------------------------- +%% make_key(Type, Stuff) -> Key +%% +%% Makes a key out of an event record or an old key +%% +%% Type = record(table_handle) | trace_ts | event_ts +%% Stuff = record(event) | Key +%% Key = record(event_ts) | record(trace_ts) +%%---------------------------------------------------------------------- + +make_key(TH, Stuff) when record(TH, table_handle) -> + make_key(TH#table_handle.event_order, Stuff); +make_key(trace_ts, Stuff) -> + if + record(Stuff, event) -> + #event{trace_ts = R, event_ts = P} = Stuff, + #trace_ts{trace_ts = R, event_ts = P}; + record(Stuff, trace_ts) -> + Stuff; + record(Stuff, event_ts) -> + #event_ts{trace_ts = R, event_ts = P} = Stuff, + #trace_ts{trace_ts = R, event_ts = P} + end; +make_key(event_ts, Stuff) -> + if + record(Stuff, event) -> + #event{trace_ts = R, event_ts = P} = Stuff, + #event_ts{trace_ts = R, event_ts = P}; + record(Stuff, event_ts) -> + Stuff; + record(Stuff, trace_ts) -> + #trace_ts{trace_ts = R, event_ts = P} = Stuff, + #event_ts{trace_ts = R, event_ts = P} + end. + +%%---------------------------------------------------------------------- +%% get_table_handle(CollectorPid) -> Handle +%% +%% Return a table handle +%% +%% CollectorPid = pid() +%% Handle = record(table_handle) +%%---------------------------------------------------------------------- + +get_table_handle(CollectorPid) when pid(CollectorPid) -> + call(CollectorPid, get_table_handle). + +%%---------------------------------------------------------------------- +%% get_global_pid() -> CollectorPid | exit(Reason) +%% +%% Return a the identity of the globally registered collector +%% if there is any +%% +%% CollectorPid = pid() +%% Reason = term() +%%---------------------------------------------------------------------- + +get_global_pid() -> + case global:whereis_name(?MODULE) of + CollectorPid when pid(CollectorPid) -> + CollectorPid; + undefined -> + exit(global_collector_not_started) + end. + +%%---------------------------------------------------------------------- +%% change_pattern(CollectorPid, RawPattern) -> {old_pattern, TracePattern} +%% +%% Change active trace pattern globally on all trace nodes +%% +%% CollectorPid = pid() +%% RawPattern = {report_module(), extended_dbg_match_spec()} +%% report_module() = atom() | undefined +%% extended_dbg_match_spec()() = detail_level() | dbg_match_spec() +%% RawPattern = detail_level() +%% detail_level() = min | max | integer(X) when X =< 0, X >= 100 +%% TracePattern = {report_module(), dbg_match_spec_match_spec()} +%%---------------------------------------------------------------------- + +change_pattern(CollectorPid, RawPattern) -> + Pattern = et_selector:make_pattern(RawPattern), + call(CollectorPid, {change_pattern, Pattern}). + +%%---------------------------------------------------------------------- +%% dict_insert(CollectorPid, {filter, collector}, FilterFun) -> ok +%% dict_insert(CollectorPid, {subscriber, SubscriberPid}, Void) -> ok +%% dict_insert(CollectorPid, Key, Val) -> ok +%% +%% Insert a dictionary entry +%% and send a {et, {dict_insert, Key, Val}} tuple +%% to all registered subscribers. +%% +%% If the entry is a new subscriber, it will imply that +%% the new subscriber process first will get one message +%% for each already stored dictionary entry, before it +%% and all old subscribers will get this particular entry. +%% The collector process links to and then supervises the +%% subscriber process. If the subscriber process dies it +%% will imply that it gets unregistered as with a normal +%% dict_delete/2. +%% +%% CollectorPid = pid() +%% FilterFun = filter_fun() +%% SubscriberPid = pid() +%% Void = term() +%% Key = term() +%% Val = term() +%%---------------------------------------------------------------------- + +dict_insert(CollectorPid, Key = {filter, Name}, Fun) -> + if + atom(Name), function(Fun) -> + call(CollectorPid, {dict_insert, Key, Fun}); + true -> + exit({badarg, Key}) + end; +dict_insert(CollectorPid, Key = {subscriber, Pid}, Val) -> + if + pid(Pid) -> + call(CollectorPid, {dict_insert, Key, Val}); + true -> + exit({badarg, Key}) + end; +dict_insert(CollectorPid, Key, Val) -> + call(CollectorPid, {dict_insert, Key, Val}). + +%%---------------------------------------------------------------------- +%% dict_lookup(CollectorPid, Key) -> [Val] +%% +%% Lookup a dictionary entry and return zero or one value +%% +%% CollectorPid = pid() +%% Key = term() +%% Val = term() +%%---------------------------------------------------------------------- + +dict_lookup(CollectorPid, Key) -> + call(CollectorPid, {dict_lookup, Key}). + +%%---------------------------------------------------------------------- +%% Ddict_delete(CollectorPid, Key) -> ok +%% +%% elete a dictionary entry +%% and send a {et, {dict_delete, Key}} tuple +%% to all registered subscribers. +%% +%% If the deleted entry is a registered subscriber, it will +%% imply that the subscriber process gets is unregistered as +%% subscriber as well as it gets it final message. +%% +%% dict_delete(CollectorPid, {subscriber, SubscriberPid}) +%% dict_delete(CollectorPid, Key) +%% +%% CollectorPid = pid() +%% SubscriberPid = pid() +%% Key = term() +%%---------------------------------------------------------------------- + +dict_delete(CollectorPid, Key) -> + call(CollectorPid, {dict_delete, Key}). + +%%---------------------------------------------------------------------- +%% dict_match(CollectorPid, Pattern) -> [Match] +%% +%% Match some dictionary entries +%% +%% CollectorPid = pid() +%% Pattern = '_' | {key_pattern(), val_pattern()} +%% key_pattern() = ets_match_object_pattern() +%% val_pattern() = ets_match_object_pattern() +%% Match = {key(), val()} +%% key() = term() +%% val() = term() +%%---------------------------------------------------------------------- + +dict_match(CollectorPid, Pattern) -> + call(CollectorPid, {dict_match, Pattern}). + +%%---------------------------------------------------------------------- +%% multicast(_CollectorPid, Msg) -> ok +%% +%% Sends a message to all registered subscribers +%% +%% CollectorPid = pid() +%% Msg = term() +%%---------------------------------------------------------------------- + +multicast(_CollectorPid, Msg = {dict_insert, _Key, _Val}) -> + exit({badarg, Msg}); +multicast(_CollectorPid, Msg = {dict_delete, _Key}) -> + exit({badarg, Msg}); +multicast(CollectorPid, Msg) -> + call(CollectorPid, {multicast, Msg}). + +%%---------------------------------------------------------------------- +%% start_trace_client(CollectorPid, Type, Parameters) -> +%% file_loaded | {trace_client_pid, pid()} | exit(Reason) +%% +%% Load raw Erlang trace from a file, port or process. +%% +%% Type = dbg_trace_client_type() +%% Parameters = dbg_trace_client_parameters() +%% Pid = dbg_trace_client_pid() +%%---------------------------------------------------------------------- + +start_trace_client(CollectorPid, Type, FileName) when Type == event_file -> + load_event_file(CollectorPid, FileName); +start_trace_client(CollectorPid, Type, FileName) when Type == file -> + WaitFor = {make_ref(), end_of_trace}, + EventFun = fun(E, {ReplyTo, {ok, TH}}) -> {ReplyTo, report(TH, E)} end, + EndFun = fun({ReplyTo, {ok, _TH}}) -> ReplyTo ! WaitFor, ReplyTo end, + Spec = trace_spec_wrapper(EventFun, EndFun, {self(), {ok, CollectorPid}}), + Pid = dbg:trace_client(Type, FileName, Spec), + unlink(Pid), + Ref = erlang:monitor(process, Pid), + receive + WaitFor -> + erlang:demonitor(Ref), + receive + {'DOWN', Ref, _, _, _} -> + file_loaded + after 0 -> + file_loaded + end; + {'DOWN', Ref, _, _, Reason} -> + exit(Reason) + end; +start_trace_client(CollectorPid, Type, Parameters) -> + EventFun = fun(Event, {ok, TH}) -> report(TH, Event) end, + EndFun = fun(Acc) -> Acc end, + Spec = trace_spec_wrapper(EventFun, EndFun, {ok, CollectorPid}), + Pid = dbg:trace_client(Type, Parameters, Spec), + CollectorPid ! {register_trace_client, Pid}, + unlink(Pid), + {trace_client_pid, Pid}. + +trace_spec_wrapper(EventFun, EndFun, EventInitialAcc) + when function(EventFun), function(EndFun) -> + {fun(Trace, Acc) -> + case Trace == end_of_trace of + true -> EndFun(Acc); + false -> EventFun(Trace, Acc) + end + end, + EventInitialAcc}. + +start_trace_port(Parameters) -> + dbg:tracer(port, dbg:trace_port(ip, Parameters)). + +%%---------------------------------------------------------------------- +%% iterate(Handle, Prev, Limit) -> +%% iterate(Handle, Prev, Limit, undefined, Prev) +%% +%% Iterates over the currently stored events +%% +%% Short for iterate/5. +%%---------------------------------------------------------------------- + +iterate(Handle, Prev, Limit) -> + iterate(Handle, Prev, Limit, undefined, Prev). + +%%---------------------------------------------------------------------- +%% iterate(Handle, Prev, Limit, Fun, Acc) -> NewAcc +%% +%% Iterates over the currently stored events and apply a function for +%% each event. The iteration may be performed forwards or backwards +%% and may be limited to a maximum number of events (abs(Limit)). +%% +%% Handle = collector_pid() | table_handle() +%% Prev = first | last | event_key() +%% Limit = done() | forward() | backward() +%% collector_pid() = pid() +%% table_handle() = record(table_handle) +%% event_key() = +%% done() = 0 +%% forward() = infinity | integer(X) where X > 0 +%% backward() = '-infinity' | integer(X) where X < 0 +%% Fun = fun(Event, Acc) -> NewAcc +%% Acc = NewAcc = term() +%%---------------------------------------------------------------------- + +iterate(_, _, Limit, _, Acc) when Limit == 0 -> + Acc; +iterate(CollectorPid, Prev, Limit, Fun, Acc) when pid(CollectorPid) -> + case get_table_handle(CollectorPid) of + {ok, TH} when record(TH, table_handle) -> + iterate(TH, Prev, Limit, Fun, Acc); + {error, Reason} -> + exit(Reason) + end; +iterate(TH, Prev, Limit, Fun, Acc) when record(TH, table_handle) -> + if + Limit == infinity -> + next_iterate(TH, Prev, Limit, Fun, Acc); + integer(Limit), Limit > 0 -> + next_iterate(TH, Prev, Limit, Fun, Acc); + Limit == '-infinity' -> + prev_iterate(TH, Prev, Limit, Fun, Acc); + integer(Limit), Limit < 0 -> + prev_iterate(TH, Prev, Limit, Fun, Acc) + end. + +next_iterate(TH, Prev = first, Limit, Fun, Acc) -> + Tab = TH#table_handle.event_tab, + case catch ets:first(Tab) of + '$end_of_table' -> + Acc; + {'EXIT', _} = Error -> + io:format("~p(~p): First ~p~n", [?MODULE, ?LINE, Error]), + iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); + First -> + lookup_and_apply(TH, Prev, First, Limit, -1, Fun, Acc) + end; +next_iterate(TH, Prev = last, Limit, Fun, Acc) -> + Tab = TH#table_handle.event_tab, + case catch ets:last(Tab) of + '$end_of_table' -> + Acc; + {'EXIT', _} = Error -> + io:format("~p(~p): Last ~p~n", [?MODULE, ?LINE, Error]), + iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); + Last -> + lookup_and_apply(TH, Prev, Last, Limit, -1, Fun, Acc) + end; +next_iterate(TH, Prev, Limit, Fun, Acc) -> + Tab = TH#table_handle.event_tab, + Key = make_key(TH, Prev), + case catch ets:next(Tab, Key) of + '$end_of_table' -> + Acc; + {'EXIT', _} = Error -> + io:format("~p(~p): Next ~p -> ~p~n", [?MODULE, ?LINE, Key, Error]), + iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); + Next -> + lookup_and_apply(TH, Prev, Next, Limit, -1, Fun, Acc) + end. + +prev_iterate(TH, Prev = first, Limit, Fun, Acc) -> + Tab = TH#table_handle.event_tab, + case catch ets:first(Tab) of + '$end_of_table' -> + Acc; + {'EXIT', _} = Error -> + io:format("~p(~p): First ~p~n", [?MODULE, ?LINE, Error]), + iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); + First -> + lookup_and_apply(TH, Prev, First, Limit, 1, Fun, Acc) + end; +prev_iterate(TH, Prev = last, Limit, Fun, Acc) -> + Tab = TH#table_handle.event_tab, + case catch ets:last(Tab) of + '$end_of_table' -> + Acc; + {'EXIT', _} = Error -> + io:format("~p(~p): Last ~p~n", [?MODULE, ?LINE, Error]), + iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); + Last -> + lookup_and_apply(TH, Prev, Last, Limit, 1, Fun, Acc) + end; +prev_iterate(TH, Prev, Limit, Fun, Acc) -> + Tab = TH#table_handle.event_tab, + Key = make_key(TH, Prev), + case catch ets:prev(Tab, Key) of + '$end_of_table' -> + Acc; + {'EXIT', _} = Error -> + io:format("~p(~p): Prev ~p -> ~p~n", [?MODULE, ?LINE, Key, Error]), + iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); + Next -> + lookup_and_apply(TH, Prev, Next, Limit, 1, Fun, Acc) + end. + +lookup_and_apply(TH, _Prev, Next, Limit, Incr, Fun, _Acc) when Fun == undefined -> + Limit2 = incr(Limit, Incr), + iterate(TH, Next, Limit2, Fun, Next); +lookup_and_apply(TH, Prev, Next, Limit, Incr, Fun, Acc) -> + Tab = TH#table_handle.event_tab, + case catch ets:lookup_element(Tab, Next, 2) of + {'EXIT', _} -> + iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); + E when record(E, event) -> + Acc2 = Fun(E, Acc), + Limit2 = incr(Limit, Incr), + iterate(TH, Next, Limit2, Fun, Acc2) + end. + +incr(Val, Incr) -> + if + Val == infinity -> Val; + Val == '-infinity' -> Val; + integer(Val) -> Val + Incr + end. + +%%---------------------------------------------------------------------- +%% clear_table(Handle) -> ok +%% +%% Clear the event table +%% +%% Handle = collector_pid() | table_handle() +%% collector_pid() = pid() +%% table_handle() = record(table_handle) +%%---------------------------------------------------------------------- + +clear_table(CollectorPid) when pid(CollectorPid) -> + call(CollectorPid, clear_table); +clear_table(TH) when record(TH, table_handle) -> + clear_table(TH#table_handle.collector_pid). + +call(CollectorPid, Request) -> + gen_server:call(CollectorPid, Request, infinity). + +%%%---------------------------------------------------------------------- +%%% Callback functions from gen_server +%%%---------------------------------------------------------------------- + +%%---------------------------------------------------------------------- +%% Func: init/1 +%% Returns: {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%%---------------------------------------------------------------------- + +init([InitialS, Dict]) -> + process_flag(trap_exit, true), + case InitialS#state.parent_pid of + undefined -> + ignore; + Pid when pid(Pid) -> + link(Pid) + end, + Funs = [fun init_tables/1, + fun init_global/1, + fun(S) -> lists:foldl(fun do_dict_insert/2, S, Dict) end], + {ok, lists:foldl(fun(F, S) -> F(S) end, InitialS, Funs)}. + +init_tables(S) -> + EventTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]), + DictTab = ets:new(et_dict, [ordered_set, {keypos, 1}, public]), + S#state{event_tab = EventTab, dict_tab = DictTab}. + +init_global(S) -> + case S#state.trace_global of + true -> + EventFun = fun(Event, {ok, TH}) -> report(TH, Event) end, + EndFun = fun(Acc) -> Acc end, + Spec = trace_spec_wrapper(EventFun, EndFun, {ok, self()}), + dbg:tracer(process, Spec), + et_selector:change_pattern(S#state.trace_pattern), + net_kernel:monitor_nodes(true), + lists:foreach(fun(N) -> self() ! {nodeup, N} end, nodes()), + S#state{trace_nodes = [node()]}; + false -> + S + end. + +%%---------------------------------------------------------------------- +%% Func: handle_call/3 +%% Returns: {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | (terminate/2 is called) +%% {stop, Reason, State} (terminate/2 is called) +%%---------------------------------------------------------------------- + +handle_call({multicast, Msg}, _From, S) -> + do_multicast(S#state.subscribers, Msg), + {reply, ok, S}; + +handle_call(Msg = {dict_insert, _Key, _Val}, _From, S) -> + S2 = do_dict_insert(Msg, S), + {reply, ok, S2}; + +handle_call(Msg = {dict_delete, _Key}, _From, S) -> + S2 = do_dict_delete(Msg, S), + {reply, ok, S2}; + +handle_call({dict_lookup, Key}, _From, S) -> + Reply = ets:lookup(S#state.dict_tab, Key), + {reply, Reply, S}; + +handle_call({dict_match, Pattern}, _From, S) -> + case catch ets:match_object(S#state.dict_tab, Pattern) of + {'EXIT', _Reason} -> + {reply, [], S}; + Matching -> + {reply, Matching, S} + end; + +handle_call(get_table_handle, _From, S) -> + [{_, TableFilter}] = ets:lookup(S#state.dict_tab, {filter, collector}), + TH = #table_handle{collector_pid = self(), + event_tab = S#state.event_tab, + event_order = S#state.event_order, + filter = TableFilter}, + {reply, {ok, TH}, S}; + +handle_call(close, _From, S) -> + case S#state.file of + undefined -> + {reply, {error, file_not_open}, S}; + F -> + Reply = disk_log:close(F#file.desc), + S2 = S#state{file = undefined}, + {reply, Reply, S2} + end; +handle_call({save_event_file, FileName, Options}, _From, S) -> + Default = #file{name = FileName, + event_opt = existing, + file_opt = write, + table_opt = keep}, + case parse_file_options(Default, Options) of + {ok, F} when record(F, file) -> + case file_open(F) of + {ok, Fd} -> + F2 = F#file{desc = Fd}, + {Reply2, S3} = + case F2#file.event_opt of + %% new -> + %% Reply = ok, + %% S2 = S#state{file = F}, + %% {Reply, S2}; + %% + %% insert() -> + %% case S2#state.file of + %% undefined -> + %% ignore; + %% F -> + %% Fd = F#file.desc, + %% ok = disk_log:log(Fd, Event) + %% end. + existing -> + Fun = fun({_, E}, A) -> ok = disk_log:log(Fd, E), A end, + Tab = S#state.event_tab, + Reply = tab_iterate(Fun, Tab, ets:first(Tab), ok), + disk_log:close(Fd), + {Reply, S} + %% all -> + %% Reply = tab_iterate(WriteFun, Tab, ok), + %% S2 = S#state{file = F}, + %% {Reply, S2} + end, + case F2#file.table_opt of + keep -> + {reply, Reply2, S3}; + clear -> + S4 = do_clear_table(S3), + {reply, Reply2, S4} + end; + {error, Reason} -> + {reply, {error, {file_open, Reason}}, S} + end; + {error, Reason} -> + {reply, {error, Reason}, S} + end; + +handle_call({change_pattern, Pattern}, _From, S) -> + Ns = S#state.trace_nodes, + rpc:multicall(Ns, et_selector, change_pattern, [Pattern]), + Reply = {old_pattern, S#state.trace_pattern}, + S2 = S#state{trace_pattern = Pattern}, + {reply, Reply, S2}; + +handle_call(clear_table, _From, S) -> + S2 = do_clear_table(S), + {reply, ok, S2}; + +handle_call(stop, _From, S) -> + do_multicast(S#state.subscribers, close), + case S#state.trace_global of + true -> rpc:multicall(S#state.trace_nodes, dbg, stop, []); + false -> ignore + end, + {stop, shutdown, ok, S}; +handle_call(Request, From, S) -> + ok = error_logger:format("~p(~p): handle_call(~p, ~p, ~p)~n", + [?MODULE, self(), Request, From, S]), + {reply, {error, {bad_request, Request}}, S}. + +%%---------------------------------------------------------------------- +%% Func: handle_cast/2 +%% Returns: {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} (terminate/2 is called) +%%---------------------------------------------------------------------- + +handle_cast(Msg, S) -> + ok = error_logger:format("~p(~p): handle_cast(~p, ~p)~n", + [?MODULE, self(), Msg, S]), + {noreply, S}. + +%%---------------------------------------------------------------------- +%% Func: handle_info/2 +%% Returns: {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} (terminate/2 is called) +%%---------------------------------------------------------------------- + +handle_info({nodeup, Node}, S) -> + Port = S#state.trace_port, + MaxQueue = S#state.trace_max_queue, + case rpc:call(Node, ?MODULE, start_trace_port, [{Port, MaxQueue}]) of + {ok, _} -> + listen_on_trace_port(Node, Port, S); + {error, Reason} when Reason == already_started-> + ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n ~p~n", + [?MODULE, self(), Node, Port, Reason]), + S2 = S#state{trace_port = Port + 1}, + {noreply, S2}; + {badrpc, Reason} -> + ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n ~p~n", + [?MODULE, self(), Node, Port, Reason]), + S2 = S#state{trace_port = Port + 1}, + {noreply, S2}; + {error, Reason} -> + self() ! {nodeup, Node}, + ok = error_logger:format("~p(~p): producer retry(~p:~p):~n ~p~n", + [?MODULE, self(), Node, Port, Reason]), + S2 = S#state{trace_port = Port + 1}, + {noreply, S2} + end; + +handle_info({nodedown, Node}, S) -> + {noreply, S#state{trace_nodes = S#state.trace_nodes -- [Node]}}; + +handle_info({register_trace_client, Pid}, S) -> + link(Pid), + {noreply, S}; + +handle_info({'EXIT', Pid, Reason}, S) when Pid == S#state.parent_pid -> + {stop, Reason, S}; +handle_info(Info = {'EXIT', Pid, _Reason}, S) -> + OldSubscribers = S#state.subscribers, + case lists:member(Pid, OldSubscribers) of + true -> + S2 = do_dict_delete({dict_delete, {subscriber, Pid}}, S), + {noreply, S2}; + false -> + ok = error_logger:format("~p(~p): handle_info(~p, ~p)~n", + [?MODULE, self(), Info, S]), + {noreply, S} + end; +handle_info(Info, S) -> + ok = error_logger:format("~p(~p): handle_info(~p, ~p)~n", + [?MODULE, self(), Info, S]), + {noreply, S}. + +listen_on_trace_port(Node, Port, S) -> + [_Name, Host] = string:tokens(atom_to_list(Node), [$@]), + case catch start_trace_client(self(), ip, {Host, Port}) of + {trace_client_pid, RemotePid} -> + rpc:call(Node, et_selector, change_pattern, [S#state.trace_pattern]), + link(RemotePid), + S2 = S#state{trace_nodes = [Node | S#state.trace_nodes], + trace_port = Port + 1}, + {noreply, S2}; + {'EXIT', Reason} when Reason == already_started-> + ok = error_logger:format("~p(~p): consumer ignored(~p:~p): ~p~n", + [?MODULE, self(), Node, Port, Reason]), + S2 = S#state{trace_port = Port + 1}, + {noreply, S2}; + {'EXIT', Reason} -> + self() ! {nodeup, Node}, + ok = error_logger:format("~p(~p): consumer retry(~p:~p):~n ~p~n", + [?MODULE, self(), Node, Port, Reason]), + S2 = S#state{trace_port = Port + 1}, + {noreply, S2} + end. + +%%---------------------------------------------------------------------- +%% Func: terminate/2 +%% Purpose: Shutdown the server +%% Returns: any (ignored by gen_server) +%%---------------------------------------------------------------------- + +terminate(Reason, S) -> + Fun = fun(Pid) -> exit(Pid, Reason) end, + lists:foreach(Fun, S#state.subscribers). + +%%---------------------------------------------------------------------- +%% Func: code_change/3 +%% Purpose: Convert process state when code is changed +%% Returns: {ok, NewState} +%%---------------------------------------------------------------------- + +code_change(_OldVsn, S, _Extra) -> + {ok, S}. + +%%%---------------------------------------------------------------------- +%%% Internal functions +%%%---------------------------------------------------------------------- + +do_clear_table(S) -> + OldTab = S#state.event_tab, + ets:delete(OldTab), + NewTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]), + S#state{event_tab = NewTab}. + +do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when pid(Pid) -> + OldSubscribers = S#state.subscribers, + NewSubscribers = + case lists:member(Pid, OldSubscribers) of + true -> + OldSubscribers; + false -> + link(Pid), + All = ets:match_object(S#state.dict_tab, '_'), + lists:foreach(fun({K, V}) -> Pid ! {et, {dict_insert, K, V}} end, All), + [Pid | OldSubscribers] + end, + do_multicast(NewSubscribers, Msg), + ets:insert(S#state.dict_tab, {Key, Val}), + S#state{subscribers = NewSubscribers}; +do_dict_insert(Msg = {dict_insert, Key, Val}, S) -> + do_multicast(S#state.subscribers, Msg), + ets:insert(S#state.dict_tab, {Key, Val}), + S. + +do_dict_delete(Msg = {dict_delete, Key = {subscriber, Pid}}, S) -> + OldSubscribers = S#state.subscribers, + do_multicast(OldSubscribers, Msg), + ets:delete(S#state.dict_tab, Key), + case lists:member(Pid, OldSubscribers) of + true -> + unlink(Pid), + S#state{subscribers = OldSubscribers -- [Pid]}; + false -> + S + end; +do_dict_delete({dict_delete, {filter, collector}}, S) -> + S; +do_dict_delete(Msg = {dict_delete, Key}, S) -> + do_multicast(S#state.subscribers, Msg), + ets:delete(S#state.dict_tab, Key), + S. + +tab_iterate(_Fun, _Tab, '$end_of_table', Acc) -> + Acc; +tab_iterate(Fun, Tab, Key, Acc) -> + Acc2 = lists:foldl(Fun, Acc, ets:lookup(Tab, Key)), + tab_iterate(Fun, Tab, ets:next(Tab, Key), Acc2). + +file_open(F) -> + Fd = make_ref(), + case F#file.file_opt of + write -> file:rename(F#file.name, F#file.name ++ ".OLD"); + append -> ignore + end, + Args = [{file, F#file.name}, {name, Fd}, + {repair, true}, {mode, read_write}], + case disk_log:open(Args) of + {ok, _} -> + {ok, Fd}; + {repaired, _, _, BadBytes} -> + ok = error_logger:format("~p: Skipped ~p bad bytes in file: ~p~n", + [?MODULE, BadBytes, F#file.name]), + {ok, Fd}; + {error,Reason} -> + {error,Reason} + end. + +parse_file_options(F, [H | T]) -> + case H of + existing -> parse_file_options(F#file{event_opt = existing} , T); + %%new -> parse_file_options(F#file{event_opt = new} , T); + all -> parse_file_options(F#file{event_opt = all} , T); + write -> parse_file_options(F#file{file_opt = write} , T); + append -> parse_file_options(F#file{file_opt = append} , T); + keep -> parse_file_options(F#file{table_opt = keep} , T); + clear -> parse_file_options(F#file{table_opt = clear} , T); + Bad -> {error, {bad_file_option, Bad}} + end; +parse_file_options(F, []) -> + {ok, F}. + +do_multicast([Pid | Pids], Msg) -> + Pid ! {et, Msg}, + do_multicast(Pids, Msg); +do_multicast([], _Msg) -> + ok. |