aboutsummaryrefslogtreecommitdiffstats
path: root/lib/et/src/et_collector.erl
diff options
context:
space:
mode:
authorHÃ¥kan Mattsson <[email protected]>2010-02-03 08:59:06 +0000
committerErlang/OTP <[email protected]>2010-02-03 08:59:06 +0000
commit43f3482adf5eee657e5ba922733dfff6600c4e14 (patch)
tree7ea7b32a171de1a7690102c403a8a946e8a382a8 /lib/et/src/et_collector.erl
parent768da5a5f6312496b9b8a09cca5ea1d6b89a2c1c (diff)
downloadotp-43f3482adf5eee657e5ba922733dfff6600c4e14.tar.gz
otp-43f3482adf5eee657e5ba922733dfff6600c4e14.tar.bz2
otp-43f3482adf5eee657e5ba922733dfff6600c4e14.zip
OTP-8058 The GUI parts are rewritten to use wxWidgets. Thanks Olle
Mattsson! For the time being it is still possible to use the old GS based version of the tool, but it is deprecated. The wxWidgets based version is started by default. A new tutorial has been added to the documentation. It is based on Jayson Vantuyl's article http://souja.net/2009/04/making-sense-of-erlangs-event-tracer.htm l. The functions et:trace_me/4 and et:trace_me/5 has been introduced in order to replace the deprecated functions et:report_event/4 and et:report_event/5. Hopefully the new names makes it a little more obvious what the intended usage of the functions are. A print function has been added to the GUI, in order to enable printing of sequence charts. More functionality for hiding unwanted events has been added to the GUI. The max_events, hide_unknown and display_mode configuration parameters to et_viewer is not used any more. Now the event cache in the Viewer only contains those events that actually are displayed in the GUI.
Diffstat (limited to 'lib/et/src/et_collector.erl')
-rw-r--r--lib/et/src/et_collector.erl330
1 files changed, 210 insertions, 120 deletions
diff --git a/lib/et/src/et_collector.erl b/lib/et/src/et_collector.erl
index ea23c188f7..289537541d 100644
--- a/lib/et/src/et_collector.erl
+++ b/lib/et/src/et_collector.erl
@@ -1,19 +1,19 @@
%%
%% %CopyrightBegin%
-%%
-%% Copyright Ericsson AB 2000-2009. All Rights Reserved.
-%%
+%%
+%% Copyright Ericsson AB 2000-2010. All Rights Reserved.
+%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
-%%
+%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
-%%
+%%
%% %CopyrightEnd%
%%
%%----------------------------------------------------------------------
@@ -36,6 +36,7 @@
iterate/3,
iterate/5,
+ lookup/2,
start_trace_client/3,
start_trace_port/1,
@@ -45,6 +46,7 @@
get_global_pid/0,
%% get_table_handle/1,
+ get_table_size/1,
change_pattern/2,
make_key/2,
@@ -59,9 +61,12 @@
-export([init/1,terminate/2, code_change/3,
handle_call/3, handle_cast/2, handle_info/2]).
+-include("et_internal.hrl").
-include("../include/et.hrl").
-record(state, {parent_pid,
+ auto_shutdown, % Optionally shutdown when the last subscriber dies
+ event_tab_size,
event_tab,
dict_tab,
event_order,
@@ -102,7 +107,7 @@
%% stored by the collector. By replacing the default collector filter
%% with a customized dito it is possible to allow any trace data as
%% input. The collector filter is a dictionary entry with the
-%% predefined key {filter, collector} and the value is a fun of
+%% predefined key {filter, all} and the value is a fun of
%% arity 1. See et_selector:parse_event/2 for interface details,
%% such as which erlang:trace/1 tuples that are accepted.
%%
@@ -126,7 +131,7 @@
%% option() =
%% {parent_pid, pid()} |
%% {event_order, event_order()} |
-%% {dict_insert, {filter, collector}, collector_fun()} |
+%% {dict_insert, {filter, all}, collector_fun()} |
%% {dict_insert, {filter, event_filter_name()}, event_filter_fun()} |
%% {dict_insert, {subscriber, pid()}, dict_val()} |
%% {dict_insert, dict_key(), dict_val()} |
@@ -159,19 +164,16 @@
start_link(Options) ->
case parse_opt(Options, default_state(), [], []) of
- {ok, S, Dict2, Clients} when S#state.trace_global == false ->
- case gen_server:start_link(?MODULE, [S, Dict2], []) of
- {ok, Pid} when S#state.parent_pid /= self() ->
- unlink(Pid),
- start_clients(Pid, Clients);
- {ok,Pid} ->
- start_clients(Pid, Clients);
- {error, Reason} ->
- {error, Reason}
- end;
- {ok, S, Dict2, Clients} when S#state.trace_global == true ->
- case gen_server:start_link({global, ?MODULE}, ?MODULE, [S, Dict2], []) of
- {ok, Pid} when S#state.parent_pid /= self() ->
+ {ok, S, Dict2, Clients} ->
+ Res =
+ case S#state.trace_global of
+ false ->
+ gen_server:start_link(?MODULE, [S, Dict2], []);
+ true ->
+ gen_server:start_link({global, ?MODULE}, ?MODULE, [S, Dict2], [])
+ end,
+ case Res of
+ {ok, Pid} when S#state.parent_pid =/= self() ->
unlink(Pid),
start_clients(Pid, Clients);
{ok,Pid} ->
@@ -185,6 +187,7 @@ start_link(Options) ->
default_state() ->
#state{parent_pid = self(),
+ auto_shutdown = false,
event_order = trace_ts,
subscribers = [],
trace_global = false,
@@ -196,28 +199,30 @@ default_state() ->
parse_opt([], S, Dict, Clients) ->
{Mod, Pattern} = et_selector:make_pattern(S#state.trace_pattern),
Fun = fun(E) -> et_selector:parse_event(Mod, E) end,
- Default = {dict_insert, {filter, collector}, Fun},
+ Default = {dict_insert, {filter, ?DEFAULT_FILTER_NAME}, Fun},
{ok, S#state{trace_pattern = {Mod, Pattern}}, [Default | Dict], Clients};
parse_opt([H | T], S, Dict, Clients) ->
case H of
- {parent_pid, Parent} when Parent == undefined ->
+ {parent_pid, Parent} when Parent =:= undefined ->
parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients);
- {parent_pid, Parent} when pid(Parent) ->
+ {parent_pid, Parent} when is_pid(Parent) ->
parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients);
- {event_order, Order} when Order == trace_ts ->
+ {auto_shutdown, Bool} when Bool =:= true; Bool =:= false ->
+ parse_opt(T, S#state{auto_shutdown = Bool}, Dict, Clients);
+ {event_order, Order} when Order =:= trace_ts ->
parse_opt(T, S#state{event_order = Order}, Dict, Clients);
- {event_order, Order} when Order == event_ts ->
+ {event_order, Order} when Order =:= event_ts ->
parse_opt(T, S#state{event_order = Order}, Dict, Clients);
{dict_insert, {filter, Name}, Fun} ->
if
- atom(Name), function(Fun) ->
+ is_atom(Name), is_function(Fun) ->
parse_opt(T, S, Dict ++ [H], Clients);
true ->
{error, {bad_option, H}}
end;
{dict_insert, {subscriber, Pid}, _Val} ->
if
- pid(Pid) ->
+ is_pid(Pid) ->
parse_opt(T, S, Dict ++ [H], Clients);
true ->
{error, {bad_option, H}}
@@ -228,17 +233,17 @@ parse_opt([H | T], S, Dict, Clients) ->
parse_opt(T, S, Dict ++ [H], Clients);
{trace_client, Client = {_, _}} ->
parse_opt(T, S, Dict, Clients ++ [Client]);
- {trace_global, Bool} when Bool == false ->
+ {trace_global, Bool} when Bool =:= false ->
parse_opt(T, S#state{trace_global = Bool}, Dict, Clients);
- {trace_global, Bool} when Bool == true ->
+ {trace_global, Bool} when Bool =:= true ->
parse_opt(T, S#state{trace_global = Bool}, Dict, Clients);
- {trace_pattern, {Mod, _} = Pattern} when atom(Mod) ->
+ {trace_pattern, {Mod, _} = Pattern} when is_atom(Mod) ->
parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients);
{trace_pattern, undefined = Pattern} ->
parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients);
- {trace_port, Port} when integer(Port) ->
+ {trace_port, Port} when is_integer(Port) ->
parse_opt(T, S#state{trace_port = Port}, Dict, Clients);
- {trace_max_queue, MaxQueue} when integer(MaxQueue) ->
+ {trace_max_queue, MaxQueue} when is_integer(MaxQueue) ->
parse_opt(T, S#state{trace_port = MaxQueue}, Dict, Clients);
Bad ->
{error, {bad_option, Bad}}
@@ -352,19 +357,19 @@ do_load_event_file(Fun, Fd, Cont, Acc, FileName, BadBytes) ->
%% Returns: {ok, Continuation} | exit(Reason)
%%----------------------------------------------------------------------
-report(CollectorPid, TraceOrEvent) when pid(CollectorPid) ->
+report(CollectorPid, TraceOrEvent) when is_pid(CollectorPid) ->
case get_table_handle(CollectorPid) of
- {ok, TH} when record(TH, table_handle) ->
+ {ok, TH} when is_record(TH, table_handle) ->
report(TH, TraceOrEvent);
{error, Reason} ->
exit(Reason)
end;
-report(TH, TraceOrEvent) when record(TH, table_handle) ->
+report(TH, TraceOrEvent) when is_record(TH, table_handle) ->
Fun = TH#table_handle.filter,
case Fun(TraceOrEvent) of
false ->
{ok, TH};
- true when record(TraceOrEvent, event) ->
+ true when is_record(TraceOrEvent, event) ->
Key = make_key(TH, TraceOrEvent),
case catch ets:insert(TH#table_handle.event_tab, {Key, TraceOrEvent}) of
true ->
@@ -373,7 +378,7 @@ report(TH, TraceOrEvent) when record(TH, table_handle) ->
%% Refresh the report handle and try again
report(TH#table_handle.collector_pid, TraceOrEvent)
end;
- {true, Event} when record(Event, event) ->
+ {true, Event} when is_record(Event, event) ->
Key = make_key(TH, Event),
case catch ets:insert(TH#table_handle.event_tab, {Key, Event}) of
true ->
@@ -401,7 +406,7 @@ report(TH, TraceOrEvent) when record(TH, table_handle) ->
report(TH#table_handle.collector_pid, TraceOrEvent)
end
end;
-report(TH, end_of_trace) when record(TH, table_handle) ->
+report(TH, end_of_trace) when is_record(TH, table_handle) ->
{ok, TH};
report(_, Bad) ->
exit({bad_event, Bad}).
@@ -410,7 +415,7 @@ report_event(CollectorPid, DetailLevel, FromTo, Label, Contents) ->
report_event(CollectorPid, DetailLevel, FromTo, FromTo, Label, Contents).
report_event(CollectorPid, DetailLevel, From, To, Label, Contents)
- when integer(DetailLevel), DetailLevel >= 0, DetailLevel =< 100, list(Contents) ->
+ when is_integer(DetailLevel), DetailLevel >= 0, DetailLevel =< 100 ->
TS= erlang:now(),
E = #event{detail_level = DetailLevel,
trace_ts = TS,
@@ -431,32 +436,38 @@ report_event(CollectorPid, DetailLevel, From, To, Label, Contents)
%% Key = record(event_ts) | record(trace_ts)
%%----------------------------------------------------------------------
-make_key(TH, Stuff) when record(TH, table_handle) ->
+make_key(TH, Stuff) when is_record(TH, table_handle) ->
make_key(TH#table_handle.event_order, Stuff);
make_key(trace_ts, Stuff) ->
if
- record(Stuff, event) ->
+ is_record(Stuff, event) ->
#event{trace_ts = R, event_ts = P} = Stuff,
#trace_ts{trace_ts = R, event_ts = P};
- record(Stuff, trace_ts) ->
+ is_record(Stuff, trace_ts) ->
Stuff;
- record(Stuff, event_ts) ->
+ is_record(Stuff, event_ts) ->
#event_ts{trace_ts = R, event_ts = P} = Stuff,
#trace_ts{trace_ts = R, event_ts = P}
end;
make_key(event_ts, Stuff) ->
if
- record(Stuff, event) ->
+ is_record(Stuff, event) ->
#event{trace_ts = R, event_ts = P} = Stuff,
#event_ts{trace_ts = R, event_ts = P};
- record(Stuff, event_ts) ->
+ is_record(Stuff, event_ts) ->
Stuff;
- record(Stuff, trace_ts) ->
+ is_record(Stuff, trace_ts) ->
#trace_ts{trace_ts = R, event_ts = P} = Stuff,
#event_ts{trace_ts = R, event_ts = P}
end.
%%----------------------------------------------------------------------
+%%----------------------------------------------------------------------
+
+get_table_size(CollectorPid) when is_pid(CollectorPid) ->
+ call(CollectorPid, get_table_size).
+
+%%----------------------------------------------------------------------
%% get_table_handle(CollectorPid) -> Handle
%%
%% Return a table handle
@@ -465,7 +476,7 @@ make_key(event_ts, Stuff) ->
%% Handle = record(table_handle)
%%----------------------------------------------------------------------
-get_table_handle(CollectorPid) when pid(CollectorPid) ->
+get_table_handle(CollectorPid) when is_pid(CollectorPid) ->
call(CollectorPid, get_table_handle).
%%----------------------------------------------------------------------
@@ -480,7 +491,7 @@ get_table_handle(CollectorPid) when pid(CollectorPid) ->
get_global_pid() ->
case global:whereis_name(?MODULE) of
- CollectorPid when pid(CollectorPid) ->
+ CollectorPid when is_pid(CollectorPid) ->
CollectorPid;
undefined ->
exit(global_collector_not_started)
@@ -505,7 +516,7 @@ change_pattern(CollectorPid, RawPattern) ->
call(CollectorPid, {change_pattern, Pattern}).
%%----------------------------------------------------------------------
-%% dict_insert(CollectorPid, {filter, collector}, FilterFun) -> ok
+%% dict_insert(CollectorPid, {filter, all}, FilterFun) -> ok
%% dict_insert(CollectorPid, {subscriber, SubscriberPid}, Void) -> ok
%% dict_insert(CollectorPid, Key, Val) -> ok
%%
@@ -532,14 +543,14 @@ change_pattern(CollectorPid, RawPattern) ->
dict_insert(CollectorPid, Key = {filter, Name}, Fun) ->
if
- atom(Name), function(Fun) ->
+ is_atom(Name), is_function(Fun) ->
call(CollectorPid, {dict_insert, Key, Fun});
true ->
exit({badarg, Key})
end;
dict_insert(CollectorPid, Key = {subscriber, Pid}, Val) ->
if
- pid(Pid) ->
+ is_pid(Pid) ->
call(CollectorPid, {dict_insert, Key, Val});
true ->
exit({badarg, Key})
@@ -626,9 +637,9 @@ multicast(CollectorPid, Msg) ->
%% Pid = dbg_trace_client_pid()
%%----------------------------------------------------------------------
-start_trace_client(CollectorPid, Type, FileName) when Type == event_file ->
+start_trace_client(CollectorPid, Type, FileName) when Type =:= event_file ->
load_event_file(CollectorPid, FileName);
-start_trace_client(CollectorPid, Type, FileName) when Type == file ->
+start_trace_client(CollectorPid, Type, FileName) when Type =:= file ->
WaitFor = {make_ref(), end_of_trace},
EventFun = fun(E, {ReplyTo, {ok, TH}}) -> {ReplyTo, report(TH, E)} end,
EndFun = fun({ReplyTo, {ok, _TH}}) -> ReplyTo ! WaitFor, ReplyTo end,
@@ -658,9 +669,9 @@ start_trace_client(CollectorPid, Type, Parameters) ->
{trace_client_pid, Pid}.
trace_spec_wrapper(EventFun, EndFun, EventInitialAcc)
- when function(EventFun), function(EndFun) ->
+ when is_function(EventFun), is_function(EndFun) ->
{fun(Trace, Acc) ->
- case Trace == end_of_trace of
+ case Trace =:= end_of_trace of
true -> EndFun(Acc);
false -> EventFun(Trace, Acc)
end
@@ -702,24 +713,24 @@ iterate(Handle, Prev, Limit) ->
%% Acc = NewAcc = term()
%%----------------------------------------------------------------------
-iterate(_, _, Limit, _, Acc) when Limit == 0 ->
+iterate(_, _, Limit, _, Acc) when Limit =:= 0 ->
Acc;
-iterate(CollectorPid, Prev, Limit, Fun, Acc) when pid(CollectorPid) ->
+iterate(CollectorPid, Prev, Limit, Fun, Acc) when is_pid(CollectorPid) ->
case get_table_handle(CollectorPid) of
- {ok, TH} when record(TH, table_handle) ->
+ {ok, TH} when is_record(TH, table_handle) ->
iterate(TH, Prev, Limit, Fun, Acc);
{error, Reason} ->
exit(Reason)
end;
-iterate(TH, Prev, Limit, Fun, Acc) when record(TH, table_handle) ->
+iterate(TH, Prev, Limit, Fun, Acc) when is_record(TH, table_handle) ->
if
- Limit == infinity ->
+ Limit =:= infinity ->
next_iterate(TH, Prev, Limit, Fun, Acc);
- integer(Limit), Limit > 0 ->
+ is_integer(Limit), Limit > 0 ->
next_iterate(TH, Prev, Limit, Fun, Acc);
- Limit == '-infinity' ->
+ Limit =:= '-infinity' ->
prev_iterate(TH, Prev, Limit, Fun, Acc);
- integer(Limit), Limit < 0 ->
+ is_integer(Limit), Limit < 0 ->
prev_iterate(TH, Prev, Limit, Fun, Acc)
end.
@@ -793,7 +804,7 @@ prev_iterate(TH, Prev, Limit, Fun, Acc) ->
lookup_and_apply(TH, Prev, Next, Limit, 1, Fun, Acc)
end.
-lookup_and_apply(TH, _Prev, Next, Limit, Incr, Fun, _Acc) when Fun == undefined ->
+lookup_and_apply(TH, _Prev, Next, Limit, Incr, Fun, _Acc) when Fun =:= undefined ->
Limit2 = incr(Limit, Incr),
iterate(TH, Next, Limit2, Fun, Next);
lookup_and_apply(TH, Prev, Next, Limit, Incr, Fun, Acc) ->
@@ -801,17 +812,33 @@ lookup_and_apply(TH, Prev, Next, Limit, Incr, Fun, Acc) ->
case catch ets:lookup_element(Tab, Next, 2) of
{'EXIT', _} ->
iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
- E when record(E, event) ->
+ E when is_record(E, event) ->
Acc2 = Fun(E, Acc),
Limit2 = incr(Limit, Incr),
iterate(TH, Next, Limit2, Fun, Acc2)
end.
+lookup(CollectorPid, Key) when is_pid(CollectorPid) ->
+ case get_table_handle(CollectorPid) of
+ {ok, TH} when is_record(TH, table_handle) ->
+ lookup(TH, Key);
+ {error, Reason} ->
+ {error, Reason}
+ end;
+lookup(TH, Key) when is_record(TH, table_handle) ->
+ Tab = TH#table_handle.event_tab,
+ case catch ets:lookup_element(Tab, Key, 2) of
+ {'EXIT', _} ->
+ {error, enoent};
+ E when is_record(E, event) ->
+ {ok, E}
+ end.
+
incr(Val, Incr) ->
if
- Val == infinity -> Val;
- Val == '-infinity' -> Val;
- integer(Val) -> Val + Incr
+ Val =:= infinity -> Val;
+ Val =:= '-infinity' -> Val;
+ is_integer(Val) -> Val + Incr
end.
%%----------------------------------------------------------------------
@@ -824,13 +851,18 @@ incr(Val, Incr) ->
%% table_handle() = record(table_handle)
%%----------------------------------------------------------------------
-clear_table(CollectorPid) when pid(CollectorPid) ->
+clear_table(CollectorPid) when is_pid(CollectorPid) ->
call(CollectorPid, clear_table);
-clear_table(TH) when record(TH, table_handle) ->
+clear_table(TH) when is_record(TH, table_handle) ->
clear_table(TH#table_handle.collector_pid).
call(CollectorPid, Request) ->
- gen_server:call(CollectorPid, Request, infinity).
+ try
+ gen_server:call(CollectorPid, Request, infinity)
+ catch
+ exit:{noproc,_} ->
+ {error, no_collector}
+ end.
%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
@@ -849,7 +881,7 @@ init([InitialS, Dict]) ->
case InitialS#state.parent_pid of
undefined ->
ignore;
- Pid when pid(Pid) ->
+ Pid when is_pid(Pid) ->
link(Pid)
end,
Funs = [fun init_tables/1,
@@ -860,7 +892,7 @@ init([InitialS, Dict]) ->
init_tables(S) ->
EventTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]),
DictTab = ets:new(et_dict, [ordered_set, {keypos, 1}, public]),
- S#state{event_tab = EventTab, dict_tab = DictTab}.
+ S#state{event_tab = EventTab, dict_tab = DictTab, event_tab_size = 0}.
init_global(S) ->
case S#state.trace_global of
@@ -889,44 +921,53 @@ init_global(S) ->
handle_call({multicast, Msg}, _From, S) ->
do_multicast(S#state.subscribers, Msg),
- {reply, ok, S};
+ reply(ok, S);
handle_call(Msg = {dict_insert, _Key, _Val}, _From, S) ->
S2 = do_dict_insert(Msg, S),
- {reply, ok, S2};
+ reply(ok, S2);
handle_call(Msg = {dict_delete, _Key}, _From, S) ->
- S2 = do_dict_delete(Msg, S),
- {reply, ok, S2};
-
+ try
+ S2 = do_dict_delete(Msg, S),
+ reply(ok, S2)
+ catch
+ throw:{stop, R} ->
+ opt_unlink(S#state.parent_pid),
+ {stop, R, S}
+ end;
handle_call({dict_lookup, Key}, _From, S) ->
Reply = ets:lookup(S#state.dict_tab, Key),
- {reply, Reply, S};
+ reply(Reply, S);
handle_call({dict_match, Pattern}, _From, S) ->
case catch ets:match_object(S#state.dict_tab, Pattern) of
{'EXIT', _Reason} ->
- {reply, [], S};
+ reply([], S);
Matching ->
- {reply, Matching, S}
+ reply(Matching, S)
end;
handle_call(get_table_handle, _From, S) ->
- [{_, TableFilter}] = ets:lookup(S#state.dict_tab, {filter, collector}),
+ [{_, TableFilter}] = ets:lookup(S#state.dict_tab, {filter, ?DEFAULT_FILTER_NAME}),
TH = #table_handle{collector_pid = self(),
event_tab = S#state.event_tab,
event_order = S#state.event_order,
filter = TableFilter},
- {reply, {ok, TH}, S};
+ reply({ok, TH}, S);
+
+handle_call(get_table_size, _From, S) ->
+ Size = ets:info(S#state.event_tab, size),
+ reply({ok, Size}, S);
handle_call(close, _From, S) ->
case S#state.file of
undefined ->
- {reply, {error, file_not_open}, S};
+ reply({error, file_not_open}, S);
F ->
Reply = disk_log:close(F#file.desc),
S2 = S#state{file = undefined},
- {reply, Reply, S2}
+ reply(Reply, S2)
end;
handle_call({save_event_file, FileName, Options}, _From, S) ->
Default = #file{name = FileName,
@@ -934,7 +975,7 @@ handle_call({save_event_file, FileName, Options}, _From, S) ->
file_opt = write,
table_opt = keep},
case parse_file_options(Default, Options) of
- {ok, F} when record(F, file) ->
+ {ok, F} when is_record(F, file) ->
case file_open(F) of
{ok, Fd} ->
F2 = F#file{desc = Fd},
@@ -966,16 +1007,16 @@ handle_call({save_event_file, FileName, Options}, _From, S) ->
end,
case F2#file.table_opt of
keep ->
- {reply, Reply2, S3};
+ reply(Reply2, S3);
clear ->
S4 = do_clear_table(S3),
- {reply, Reply2, S4}
+ reply(Reply2, S4)
end;
{error, Reason} ->
- {reply, {error, {file_open, Reason}}, S}
+ reply({error, {file_open, Reason}}, S)
end;
{error, Reason} ->
- {reply, {error, Reason}, S}
+ reply({error, Reason}, S)
end;
handle_call({change_pattern, Pattern}, _From, S) ->
@@ -983,11 +1024,11 @@ handle_call({change_pattern, Pattern}, _From, S) ->
rpc:multicall(Ns, et_selector, change_pattern, [Pattern]),
Reply = {old_pattern, S#state.trace_pattern},
S2 = S#state{trace_pattern = Pattern},
- {reply, Reply, S2};
+ reply(Reply, S2);
handle_call(clear_table, _From, S) ->
S2 = do_clear_table(S),
- {reply, ok, S2};
+ reply(ok, S2);
handle_call(stop, _From, S) ->
do_multicast(S#state.subscribers, close),
@@ -999,7 +1040,7 @@ handle_call(stop, _From, S) ->
handle_call(Request, From, S) ->
ok = error_logger:format("~p(~p): handle_call(~p, ~p, ~p)~n",
[?MODULE, self(), Request, From, S]),
- {reply, {error, {bad_request, Request}}, S}.
+ reply({error, {bad_request, Request}}, S).
%%----------------------------------------------------------------------
%% Func: handle_cast/2
@@ -1011,7 +1052,7 @@ handle_call(Request, From, S) ->
handle_cast(Msg, S) ->
ok = error_logger:format("~p(~p): handle_cast(~p, ~p)~n",
[?MODULE, self(), Msg, S]),
- {noreply, S}.
+ noreply(S).
%%----------------------------------------------------------------------
%% Func: handle_info/2
@@ -1020,54 +1061,67 @@ handle_cast(Msg, S) ->
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
+handle_info(timeout, S) ->
+ S2 = check_size(S),
+ noreply(S2);
handle_info({nodeup, Node}, S) ->
Port = S#state.trace_port,
MaxQueue = S#state.trace_max_queue,
case rpc:call(Node, ?MODULE, start_trace_port, [{Port, MaxQueue}]) of
{ok, _} ->
- listen_on_trace_port(Node, Port, S);
- {error, Reason} when Reason == already_started->
+ S2 = listen_on_trace_port(Node, Port, S),
+ noreply(S2);
+ {error, Reason} when Reason =:= already_started->
ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n ~p~n",
[?MODULE, self(), Node, Port, Reason]),
S2 = S#state{trace_port = Port + 1},
- {noreply, S2};
+ noreply(S2);
{badrpc, Reason} ->
ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n ~p~n",
[?MODULE, self(), Node, Port, Reason]),
S2 = S#state{trace_port = Port + 1},
- {noreply, S2};
+ noreply(S2);
{error, Reason} ->
self() ! {nodeup, Node},
ok = error_logger:format("~p(~p): producer retry(~p:~p):~n ~p~n",
[?MODULE, self(), Node, Port, Reason]),
S2 = S#state{trace_port = Port + 1},
- {noreply, S2}
+ noreply(S2)
end;
handle_info({nodedown, Node}, S) ->
- {noreply, S#state{trace_nodes = S#state.trace_nodes -- [Node]}};
+ noreply(S#state{trace_nodes = S#state.trace_nodes -- [Node]});
handle_info({register_trace_client, Pid}, S) ->
link(Pid),
- {noreply, S};
+ noreply(S);
-handle_info({'EXIT', Pid, Reason}, S) when Pid == S#state.parent_pid ->
+handle_info({'EXIT', Pid, Reason}, S) when Pid =:= S#state.parent_pid ->
{stop, Reason, S};
-handle_info(Info = {'EXIT', Pid, _Reason}, S) ->
- OldSubscribers = S#state.subscribers,
+handle_info(Info = {'EXIT', Pid, Reason}, S) ->
+ OldSubscribers = S#state.subscribers,
case lists:member(Pid, OldSubscribers) of
- true ->
- S2 = do_dict_delete({dict_delete, {subscriber, Pid}}, S),
- {noreply, S2};
+ true when Reason =:= shutdown ->
+ try
+ S2 = do_dict_delete({dict_delete, {subscriber, Pid}}, S),
+ noreply(S2)
+ catch
+ throw:{stop, R} ->
+ opt_unlink(S#state.parent_pid),
+ {stop, R, S}
+ end;
+ true ->
+ opt_unlink(S#state.parent_pid),
+ {stop, Reason, S};
false ->
ok = error_logger:format("~p(~p): handle_info(~p, ~p)~n",
[?MODULE, self(), Info, S]),
- {noreply, S}
+ noreply(S)
end;
handle_info(Info, S) ->
ok = error_logger:format("~p(~p): handle_info(~p, ~p)~n",
[?MODULE, self(), Info, S]),
- {noreply, S}.
+ noreply(S).
listen_on_trace_port(Node, Port, S) ->
[_Name, Host] = string:tokens(atom_to_list(Node), [$@]),
@@ -1075,20 +1129,17 @@ listen_on_trace_port(Node, Port, S) ->
{trace_client_pid, RemotePid} ->
rpc:call(Node, et_selector, change_pattern, [S#state.trace_pattern]),
link(RemotePid),
- S2 = S#state{trace_nodes = [Node | S#state.trace_nodes],
- trace_port = Port + 1},
- {noreply, S2};
- {'EXIT', Reason} when Reason == already_started->
+ S#state{trace_nodes = [Node | S#state.trace_nodes],
+ trace_port = Port + 1};
+ {'EXIT', Reason} when Reason =:= already_started->
ok = error_logger:format("~p(~p): consumer ignored(~p:~p): ~p~n",
[?MODULE, self(), Node, Port, Reason]),
- S2 = S#state{trace_port = Port + 1},
- {noreply, S2};
+ S#state{trace_port = Port + 1};
{'EXIT', Reason} ->
self() ! {nodeup, Node},
ok = error_logger:format("~p(~p): consumer retry(~p:~p):~n ~p~n",
[?MODULE, self(), Node, Port, Reason]),
- S2 = S#state{trace_port = Port + 1},
- {noreply, S2}
+ S#state{trace_port = Port + 1}
end.
%%----------------------------------------------------------------------
@@ -1120,7 +1171,7 @@ do_clear_table(S) ->
NewTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]),
S#state{event_tab = NewTab}.
-do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when pid(Pid) ->
+do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when is_pid(Pid) ->
OldSubscribers = S#state.subscribers,
NewSubscribers =
case lists:member(Pid, OldSubscribers) of
@@ -1133,6 +1184,8 @@ do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when pid(Pi
[Pid | OldSubscribers]
end,
do_multicast(NewSubscribers, Msg),
+ Size = ets:info(S#state.event_tab, size),
+ do_multicast(NewSubscribers, {more_events, Size}),
ets:insert(S#state.dict_tab, {Key, Val}),
S#state{subscribers = NewSubscribers};
do_dict_insert(Msg = {dict_insert, Key, Val}, S) ->
@@ -1147,11 +1200,18 @@ do_dict_delete(Msg = {dict_delete, Key = {subscriber, Pid}}, S) ->
case lists:member(Pid, OldSubscribers) of
true ->
unlink(Pid),
- S#state{subscribers = OldSubscribers -- [Pid]};
+ S2 = S#state{subscribers = OldSubscribers -- [Pid]},
+ if
+ S2#state.auto_shutdown,
+ S2#state.subscribers =:= [] ->
+ throw({stop, shutdown});
+ true ->
+ S2
+ end;
false ->
S
end;
-do_dict_delete({dict_delete, {filter, collector}}, S) ->
+do_dict_delete({dict_delete, {filter, ?DEFAULT_FILTER_NAME}}, S) ->
S;
do_dict_delete(Msg = {dict_delete, Key}, S) ->
do_multicast(S#state.subscribers, Msg),
@@ -1202,3 +1262,33 @@ do_multicast([Pid | Pids], Msg) ->
do_multicast(Pids, Msg);
do_multicast([], _Msg) ->
ok.
+
+opt_unlink(Pid) ->
+ if
+ Pid =:= undefined ->
+ ignore;
+ true ->
+ unlink(Pid)
+ end.
+
+reply(Reply, #state{subscribers = []} = S) ->
+ {reply, Reply, S};
+reply(Reply, S) ->
+ {reply, Reply, S, 500}.
+
+noreply(#state{subscribers = []} = S) ->
+ {noreply, S};
+noreply(S) ->
+ {noreply, S, 500}.
+
+check_size(S) ->
+ Size = ets:info(S#state.event_tab, size),
+ if
+ Size =:= S#state.event_tab_size ->
+ S;
+ true ->
+ %% Tell the subscribers that more events are available
+ Msg = {more_events, Size},
+ do_multicast(S#state.subscribers, Msg),
+ S#state{event_tab_size = Size}
+ end.