aboutsummaryrefslogtreecommitdiffstats
path: root/lib/et/src/et_collector.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/et/src/et_collector.erl')
-rw-r--r--lib/et/src/et_collector.erl330
1 files changed, 210 insertions, 120 deletions
diff --git a/lib/et/src/et_collector.erl b/lib/et/src/et_collector.erl
index ea23c188f7..289537541d 100644
--- a/lib/et/src/et_collector.erl
+++ b/lib/et/src/et_collector.erl
@@ -1,19 +1,19 @@
%%
%% %CopyrightBegin%
-%%
-%% Copyright Ericsson AB 2000-2009. All Rights Reserved.
-%%
+%%
+%% Copyright Ericsson AB 2000-2010. All Rights Reserved.
+%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
-%%
+%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
-%%
+%%
%% %CopyrightEnd%
%%
%%----------------------------------------------------------------------
@@ -36,6 +36,7 @@
iterate/3,
iterate/5,
+ lookup/2,
start_trace_client/3,
start_trace_port/1,
@@ -45,6 +46,7 @@
get_global_pid/0,
%% get_table_handle/1,
+ get_table_size/1,
change_pattern/2,
make_key/2,
@@ -59,9 +61,12 @@
-export([init/1,terminate/2, code_change/3,
handle_call/3, handle_cast/2, handle_info/2]).
+-include("et_internal.hrl").
-include("../include/et.hrl").
-record(state, {parent_pid,
+ auto_shutdown, % Optionally shutdown when the last subscriber dies
+ event_tab_size,
event_tab,
dict_tab,
event_order,
@@ -102,7 +107,7 @@
%% stored by the collector. By replacing the default collector filter
%% with a customized dito it is possible to allow any trace data as
%% input. The collector filter is a dictionary entry with the
-%% predefined key {filter, collector} and the value is a fun of
+%% predefined key {filter, all} and the value is a fun of
%% arity 1. See et_selector:parse_event/2 for interface details,
%% such as which erlang:trace/1 tuples that are accepted.
%%
@@ -126,7 +131,7 @@
%% option() =
%% {parent_pid, pid()} |
%% {event_order, event_order()} |
-%% {dict_insert, {filter, collector}, collector_fun()} |
+%% {dict_insert, {filter, all}, collector_fun()} |
%% {dict_insert, {filter, event_filter_name()}, event_filter_fun()} |
%% {dict_insert, {subscriber, pid()}, dict_val()} |
%% {dict_insert, dict_key(), dict_val()} |
@@ -159,19 +164,16 @@
start_link(Options) ->
case parse_opt(Options, default_state(), [], []) of
- {ok, S, Dict2, Clients} when S#state.trace_global == false ->
- case gen_server:start_link(?MODULE, [S, Dict2], []) of
- {ok, Pid} when S#state.parent_pid /= self() ->
- unlink(Pid),
- start_clients(Pid, Clients);
- {ok,Pid} ->
- start_clients(Pid, Clients);
- {error, Reason} ->
- {error, Reason}
- end;
- {ok, S, Dict2, Clients} when S#state.trace_global == true ->
- case gen_server:start_link({global, ?MODULE}, ?MODULE, [S, Dict2], []) of
- {ok, Pid} when S#state.parent_pid /= self() ->
+ {ok, S, Dict2, Clients} ->
+ Res =
+ case S#state.trace_global of
+ false ->
+ gen_server:start_link(?MODULE, [S, Dict2], []);
+ true ->
+ gen_server:start_link({global, ?MODULE}, ?MODULE, [S, Dict2], [])
+ end,
+ case Res of
+ {ok, Pid} when S#state.parent_pid =/= self() ->
unlink(Pid),
start_clients(Pid, Clients);
{ok,Pid} ->
@@ -185,6 +187,7 @@ start_link(Options) ->
default_state() ->
#state{parent_pid = self(),
+ auto_shutdown = false,
event_order = trace_ts,
subscribers = [],
trace_global = false,
@@ -196,28 +199,30 @@ default_state() ->
parse_opt([], S, Dict, Clients) ->
{Mod, Pattern} = et_selector:make_pattern(S#state.trace_pattern),
Fun = fun(E) -> et_selector:parse_event(Mod, E) end,
- Default = {dict_insert, {filter, collector}, Fun},
+ Default = {dict_insert, {filter, ?DEFAULT_FILTER_NAME}, Fun},
{ok, S#state{trace_pattern = {Mod, Pattern}}, [Default | Dict], Clients};
parse_opt([H | T], S, Dict, Clients) ->
case H of
- {parent_pid, Parent} when Parent == undefined ->
+ {parent_pid, Parent} when Parent =:= undefined ->
parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients);
- {parent_pid, Parent} when pid(Parent) ->
+ {parent_pid, Parent} when is_pid(Parent) ->
parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients);
- {event_order, Order} when Order == trace_ts ->
+ {auto_shutdown, Bool} when Bool =:= true; Bool =:= false ->
+ parse_opt(T, S#state{auto_shutdown = Bool}, Dict, Clients);
+ {event_order, Order} when Order =:= trace_ts ->
parse_opt(T, S#state{event_order = Order}, Dict, Clients);
- {event_order, Order} when Order == event_ts ->
+ {event_order, Order} when Order =:= event_ts ->
parse_opt(T, S#state{event_order = Order}, Dict, Clients);
{dict_insert, {filter, Name}, Fun} ->
if
- atom(Name), function(Fun) ->
+ is_atom(Name), is_function(Fun) ->
parse_opt(T, S, Dict ++ [H], Clients);
true ->
{error, {bad_option, H}}
end;
{dict_insert, {subscriber, Pid}, _Val} ->
if
- pid(Pid) ->
+ is_pid(Pid) ->
parse_opt(T, S, Dict ++ [H], Clients);
true ->
{error, {bad_option, H}}
@@ -228,17 +233,17 @@ parse_opt([H | T], S, Dict, Clients) ->
parse_opt(T, S, Dict ++ [H], Clients);
{trace_client, Client = {_, _}} ->
parse_opt(T, S, Dict, Clients ++ [Client]);
- {trace_global, Bool} when Bool == false ->
+ {trace_global, Bool} when Bool =:= false ->
parse_opt(T, S#state{trace_global = Bool}, Dict, Clients);
- {trace_global, Bool} when Bool == true ->
+ {trace_global, Bool} when Bool =:= true ->
parse_opt(T, S#state{trace_global = Bool}, Dict, Clients);
- {trace_pattern, {Mod, _} = Pattern} when atom(Mod) ->
+ {trace_pattern, {Mod, _} = Pattern} when is_atom(Mod) ->
parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients);
{trace_pattern, undefined = Pattern} ->
parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients);
- {trace_port, Port} when integer(Port) ->
+ {trace_port, Port} when is_integer(Port) ->
parse_opt(T, S#state{trace_port = Port}, Dict, Clients);
- {trace_max_queue, MaxQueue} when integer(MaxQueue) ->
+ {trace_max_queue, MaxQueue} when is_integer(MaxQueue) ->
parse_opt(T, S#state{trace_port = MaxQueue}, Dict, Clients);
Bad ->
{error, {bad_option, Bad}}
@@ -352,19 +357,19 @@ do_load_event_file(Fun, Fd, Cont, Acc, FileName, BadBytes) ->
%% Returns: {ok, Continuation} | exit(Reason)
%%----------------------------------------------------------------------
-report(CollectorPid, TraceOrEvent) when pid(CollectorPid) ->
+report(CollectorPid, TraceOrEvent) when is_pid(CollectorPid) ->
case get_table_handle(CollectorPid) of
- {ok, TH} when record(TH, table_handle) ->
+ {ok, TH} when is_record(TH, table_handle) ->
report(TH, TraceOrEvent);
{error, Reason} ->
exit(Reason)
end;
-report(TH, TraceOrEvent) when record(TH, table_handle) ->
+report(TH, TraceOrEvent) when is_record(TH, table_handle) ->
Fun = TH#table_handle.filter,
case Fun(TraceOrEvent) of
false ->
{ok, TH};
- true when record(TraceOrEvent, event) ->
+ true when is_record(TraceOrEvent, event) ->
Key = make_key(TH, TraceOrEvent),
case catch ets:insert(TH#table_handle.event_tab, {Key, TraceOrEvent}) of
true ->
@@ -373,7 +378,7 @@ report(TH, TraceOrEvent) when record(TH, table_handle) ->
%% Refresh the report handle and try again
report(TH#table_handle.collector_pid, TraceOrEvent)
end;
- {true, Event} when record(Event, event) ->
+ {true, Event} when is_record(Event, event) ->
Key = make_key(TH, Event),
case catch ets:insert(TH#table_handle.event_tab, {Key, Event}) of
true ->
@@ -401,7 +406,7 @@ report(TH, TraceOrEvent) when record(TH, table_handle) ->
report(TH#table_handle.collector_pid, TraceOrEvent)
end
end;
-report(TH, end_of_trace) when record(TH, table_handle) ->
+report(TH, end_of_trace) when is_record(TH, table_handle) ->
{ok, TH};
report(_, Bad) ->
exit({bad_event, Bad}).
@@ -410,7 +415,7 @@ report_event(CollectorPid, DetailLevel, FromTo, Label, Contents) ->
report_event(CollectorPid, DetailLevel, FromTo, FromTo, Label, Contents).
report_event(CollectorPid, DetailLevel, From, To, Label, Contents)
- when integer(DetailLevel), DetailLevel >= 0, DetailLevel =< 100, list(Contents) ->
+ when is_integer(DetailLevel), DetailLevel >= 0, DetailLevel =< 100 ->
TS= erlang:now(),
E = #event{detail_level = DetailLevel,
trace_ts = TS,
@@ -431,32 +436,38 @@ report_event(CollectorPid, DetailLevel, From, To, Label, Contents)
%% Key = record(event_ts) | record(trace_ts)
%%----------------------------------------------------------------------
-make_key(TH, Stuff) when record(TH, table_handle) ->
+make_key(TH, Stuff) when is_record(TH, table_handle) ->
make_key(TH#table_handle.event_order, Stuff);
make_key(trace_ts, Stuff) ->
if
- record(Stuff, event) ->
+ is_record(Stuff, event) ->
#event{trace_ts = R, event_ts = P} = Stuff,
#trace_ts{trace_ts = R, event_ts = P};
- record(Stuff, trace_ts) ->
+ is_record(Stuff, trace_ts) ->
Stuff;
- record(Stuff, event_ts) ->
+ is_record(Stuff, event_ts) ->
#event_ts{trace_ts = R, event_ts = P} = Stuff,
#trace_ts{trace_ts = R, event_ts = P}
end;
make_key(event_ts, Stuff) ->
if
- record(Stuff, event) ->
+ is_record(Stuff, event) ->
#event{trace_ts = R, event_ts = P} = Stuff,
#event_ts{trace_ts = R, event_ts = P};
- record(Stuff, event_ts) ->
+ is_record(Stuff, event_ts) ->
Stuff;
- record(Stuff, trace_ts) ->
+ is_record(Stuff, trace_ts) ->
#trace_ts{trace_ts = R, event_ts = P} = Stuff,
#event_ts{trace_ts = R, event_ts = P}
end.
%%----------------------------------------------------------------------
+%%----------------------------------------------------------------------
+
+get_table_size(CollectorPid) when is_pid(CollectorPid) ->
+ call(CollectorPid, get_table_size).
+
+%%----------------------------------------------------------------------
%% get_table_handle(CollectorPid) -> Handle
%%
%% Return a table handle
@@ -465,7 +476,7 @@ make_key(event_ts, Stuff) ->
%% Handle = record(table_handle)
%%----------------------------------------------------------------------
-get_table_handle(CollectorPid) when pid(CollectorPid) ->
+get_table_handle(CollectorPid) when is_pid(CollectorPid) ->
call(CollectorPid, get_table_handle).
%%----------------------------------------------------------------------
@@ -480,7 +491,7 @@ get_table_handle(CollectorPid) when pid(CollectorPid) ->
get_global_pid() ->
case global:whereis_name(?MODULE) of
- CollectorPid when pid(CollectorPid) ->
+ CollectorPid when is_pid(CollectorPid) ->
CollectorPid;
undefined ->
exit(global_collector_not_started)
@@ -505,7 +516,7 @@ change_pattern(CollectorPid, RawPattern) ->
call(CollectorPid, {change_pattern, Pattern}).
%%----------------------------------------------------------------------
-%% dict_insert(CollectorPid, {filter, collector}, FilterFun) -> ok
+%% dict_insert(CollectorPid, {filter, all}, FilterFun) -> ok
%% dict_insert(CollectorPid, {subscriber, SubscriberPid}, Void) -> ok
%% dict_insert(CollectorPid, Key, Val) -> ok
%%
@@ -532,14 +543,14 @@ change_pattern(CollectorPid, RawPattern) ->
dict_insert(CollectorPid, Key = {filter, Name}, Fun) ->
if
- atom(Name), function(Fun) ->
+ is_atom(Name), is_function(Fun) ->
call(CollectorPid, {dict_insert, Key, Fun});
true ->
exit({badarg, Key})
end;
dict_insert(CollectorPid, Key = {subscriber, Pid}, Val) ->
if
- pid(Pid) ->
+ is_pid(Pid) ->
call(CollectorPid, {dict_insert, Key, Val});
true ->
exit({badarg, Key})
@@ -626,9 +637,9 @@ multicast(CollectorPid, Msg) ->
%% Pid = dbg_trace_client_pid()
%%----------------------------------------------------------------------
-start_trace_client(CollectorPid, Type, FileName) when Type == event_file ->
+start_trace_client(CollectorPid, Type, FileName) when Type =:= event_file ->
load_event_file(CollectorPid, FileName);
-start_trace_client(CollectorPid, Type, FileName) when Type == file ->
+start_trace_client(CollectorPid, Type, FileName) when Type =:= file ->
WaitFor = {make_ref(), end_of_trace},
EventFun = fun(E, {ReplyTo, {ok, TH}}) -> {ReplyTo, report(TH, E)} end,
EndFun = fun({ReplyTo, {ok, _TH}}) -> ReplyTo ! WaitFor, ReplyTo end,
@@ -658,9 +669,9 @@ start_trace_client(CollectorPid, Type, Parameters) ->
{trace_client_pid, Pid}.
trace_spec_wrapper(EventFun, EndFun, EventInitialAcc)
- when function(EventFun), function(EndFun) ->
+ when is_function(EventFun), is_function(EndFun) ->
{fun(Trace, Acc) ->
- case Trace == end_of_trace of
+ case Trace =:= end_of_trace of
true -> EndFun(Acc);
false -> EventFun(Trace, Acc)
end
@@ -702,24 +713,24 @@ iterate(Handle, Prev, Limit) ->
%% Acc = NewAcc = term()
%%----------------------------------------------------------------------
-iterate(_, _, Limit, _, Acc) when Limit == 0 ->
+iterate(_, _, Limit, _, Acc) when Limit =:= 0 ->
Acc;
-iterate(CollectorPid, Prev, Limit, Fun, Acc) when pid(CollectorPid) ->
+iterate(CollectorPid, Prev, Limit, Fun, Acc) when is_pid(CollectorPid) ->
case get_table_handle(CollectorPid) of
- {ok, TH} when record(TH, table_handle) ->
+ {ok, TH} when is_record(TH, table_handle) ->
iterate(TH, Prev, Limit, Fun, Acc);
{error, Reason} ->
exit(Reason)
end;
-iterate(TH, Prev, Limit, Fun, Acc) when record(TH, table_handle) ->
+iterate(TH, Prev, Limit, Fun, Acc) when is_record(TH, table_handle) ->
if
- Limit == infinity ->
+ Limit =:= infinity ->
next_iterate(TH, Prev, Limit, Fun, Acc);
- integer(Limit), Limit > 0 ->
+ is_integer(Limit), Limit > 0 ->
next_iterate(TH, Prev, Limit, Fun, Acc);
- Limit == '-infinity' ->
+ Limit =:= '-infinity' ->
prev_iterate(TH, Prev, Limit, Fun, Acc);
- integer(Limit), Limit < 0 ->
+ is_integer(Limit), Limit < 0 ->
prev_iterate(TH, Prev, Limit, Fun, Acc)
end.
@@ -793,7 +804,7 @@ prev_iterate(TH, Prev, Limit, Fun, Acc) ->
lookup_and_apply(TH, Prev, Next, Limit, 1, Fun, Acc)
end.
-lookup_and_apply(TH, _Prev, Next, Limit, Incr, Fun, _Acc) when Fun == undefined ->
+lookup_and_apply(TH, _Prev, Next, Limit, Incr, Fun, _Acc) when Fun =:= undefined ->
Limit2 = incr(Limit, Incr),
iterate(TH, Next, Limit2, Fun, Next);
lookup_and_apply(TH, Prev, Next, Limit, Incr, Fun, Acc) ->
@@ -801,17 +812,33 @@ lookup_and_apply(TH, Prev, Next, Limit, Incr, Fun, Acc) ->
case catch ets:lookup_element(Tab, Next, 2) of
{'EXIT', _} ->
iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc);
- E when record(E, event) ->
+ E when is_record(E, event) ->
Acc2 = Fun(E, Acc),
Limit2 = incr(Limit, Incr),
iterate(TH, Next, Limit2, Fun, Acc2)
end.
+lookup(CollectorPid, Key) when is_pid(CollectorPid) ->
+ case get_table_handle(CollectorPid) of
+ {ok, TH} when is_record(TH, table_handle) ->
+ lookup(TH, Key);
+ {error, Reason} ->
+ {error, Reason}
+ end;
+lookup(TH, Key) when is_record(TH, table_handle) ->
+ Tab = TH#table_handle.event_tab,
+ case catch ets:lookup_element(Tab, Key, 2) of
+ {'EXIT', _} ->
+ {error, enoent};
+ E when is_record(E, event) ->
+ {ok, E}
+ end.
+
incr(Val, Incr) ->
if
- Val == infinity -> Val;
- Val == '-infinity' -> Val;
- integer(Val) -> Val + Incr
+ Val =:= infinity -> Val;
+ Val =:= '-infinity' -> Val;
+ is_integer(Val) -> Val + Incr
end.
%%----------------------------------------------------------------------
@@ -824,13 +851,18 @@ incr(Val, Incr) ->
%% table_handle() = record(table_handle)
%%----------------------------------------------------------------------
-clear_table(CollectorPid) when pid(CollectorPid) ->
+clear_table(CollectorPid) when is_pid(CollectorPid) ->
call(CollectorPid, clear_table);
-clear_table(TH) when record(TH, table_handle) ->
+clear_table(TH) when is_record(TH, table_handle) ->
clear_table(TH#table_handle.collector_pid).
call(CollectorPid, Request) ->
- gen_server:call(CollectorPid, Request, infinity).
+ try
+ gen_server:call(CollectorPid, Request, infinity)
+ catch
+ exit:{noproc,_} ->
+ {error, no_collector}
+ end.
%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
@@ -849,7 +881,7 @@ init([InitialS, Dict]) ->
case InitialS#state.parent_pid of
undefined ->
ignore;
- Pid when pid(Pid) ->
+ Pid when is_pid(Pid) ->
link(Pid)
end,
Funs = [fun init_tables/1,
@@ -860,7 +892,7 @@ init([InitialS, Dict]) ->
init_tables(S) ->
EventTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]),
DictTab = ets:new(et_dict, [ordered_set, {keypos, 1}, public]),
- S#state{event_tab = EventTab, dict_tab = DictTab}.
+ S#state{event_tab = EventTab, dict_tab = DictTab, event_tab_size = 0}.
init_global(S) ->
case S#state.trace_global of
@@ -889,44 +921,53 @@ init_global(S) ->
handle_call({multicast, Msg}, _From, S) ->
do_multicast(S#state.subscribers, Msg),
- {reply, ok, S};
+ reply(ok, S);
handle_call(Msg = {dict_insert, _Key, _Val}, _From, S) ->
S2 = do_dict_insert(Msg, S),
- {reply, ok, S2};
+ reply(ok, S2);
handle_call(Msg = {dict_delete, _Key}, _From, S) ->
- S2 = do_dict_delete(Msg, S),
- {reply, ok, S2};
-
+ try
+ S2 = do_dict_delete(Msg, S),
+ reply(ok, S2)
+ catch
+ throw:{stop, R} ->
+ opt_unlink(S#state.parent_pid),
+ {stop, R, S}
+ end;
handle_call({dict_lookup, Key}, _From, S) ->
Reply = ets:lookup(S#state.dict_tab, Key),
- {reply, Reply, S};
+ reply(Reply, S);
handle_call({dict_match, Pattern}, _From, S) ->
case catch ets:match_object(S#state.dict_tab, Pattern) of
{'EXIT', _Reason} ->
- {reply, [], S};
+ reply([], S);
Matching ->
- {reply, Matching, S}
+ reply(Matching, S)
end;
handle_call(get_table_handle, _From, S) ->
- [{_, TableFilter}] = ets:lookup(S#state.dict_tab, {filter, collector}),
+ [{_, TableFilter}] = ets:lookup(S#state.dict_tab, {filter, ?DEFAULT_FILTER_NAME}),
TH = #table_handle{collector_pid = self(),
event_tab = S#state.event_tab,
event_order = S#state.event_order,
filter = TableFilter},
- {reply, {ok, TH}, S};
+ reply({ok, TH}, S);
+
+handle_call(get_table_size, _From, S) ->
+ Size = ets:info(S#state.event_tab, size),
+ reply({ok, Size}, S);
handle_call(close, _From, S) ->
case S#state.file of
undefined ->
- {reply, {error, file_not_open}, S};
+ reply({error, file_not_open}, S);
F ->
Reply = disk_log:close(F#file.desc),
S2 = S#state{file = undefined},
- {reply, Reply, S2}
+ reply(Reply, S2)
end;
handle_call({save_event_file, FileName, Options}, _From, S) ->
Default = #file{name = FileName,
@@ -934,7 +975,7 @@ handle_call({save_event_file, FileName, Options}, _From, S) ->
file_opt = write,
table_opt = keep},
case parse_file_options(Default, Options) of
- {ok, F} when record(F, file) ->
+ {ok, F} when is_record(F, file) ->
case file_open(F) of
{ok, Fd} ->
F2 = F#file{desc = Fd},
@@ -966,16 +1007,16 @@ handle_call({save_event_file, FileName, Options}, _From, S) ->
end,
case F2#file.table_opt of
keep ->
- {reply, Reply2, S3};
+ reply(Reply2, S3);
clear ->
S4 = do_clear_table(S3),
- {reply, Reply2, S4}
+ reply(Reply2, S4)
end;
{error, Reason} ->
- {reply, {error, {file_open, Reason}}, S}
+ reply({error, {file_open, Reason}}, S)
end;
{error, Reason} ->
- {reply, {error, Reason}, S}
+ reply({error, Reason}, S)
end;
handle_call({change_pattern, Pattern}, _From, S) ->
@@ -983,11 +1024,11 @@ handle_call({change_pattern, Pattern}, _From, S) ->
rpc:multicall(Ns, et_selector, change_pattern, [Pattern]),
Reply = {old_pattern, S#state.trace_pattern},
S2 = S#state{trace_pattern = Pattern},
- {reply, Reply, S2};
+ reply(Reply, S2);
handle_call(clear_table, _From, S) ->
S2 = do_clear_table(S),
- {reply, ok, S2};
+ reply(ok, S2);
handle_call(stop, _From, S) ->
do_multicast(S#state.subscribers, close),
@@ -999,7 +1040,7 @@ handle_call(stop, _From, S) ->
handle_call(Request, From, S) ->
ok = error_logger:format("~p(~p): handle_call(~p, ~p, ~p)~n",
[?MODULE, self(), Request, From, S]),
- {reply, {error, {bad_request, Request}}, S}.
+ reply({error, {bad_request, Request}}, S).
%%----------------------------------------------------------------------
%% Func: handle_cast/2
@@ -1011,7 +1052,7 @@ handle_call(Request, From, S) ->
handle_cast(Msg, S) ->
ok = error_logger:format("~p(~p): handle_cast(~p, ~p)~n",
[?MODULE, self(), Msg, S]),
- {noreply, S}.
+ noreply(S).
%%----------------------------------------------------------------------
%% Func: handle_info/2
@@ -1020,54 +1061,67 @@ handle_cast(Msg, S) ->
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
+handle_info(timeout, S) ->
+ S2 = check_size(S),
+ noreply(S2);
handle_info({nodeup, Node}, S) ->
Port = S#state.trace_port,
MaxQueue = S#state.trace_max_queue,
case rpc:call(Node, ?MODULE, start_trace_port, [{Port, MaxQueue}]) of
{ok, _} ->
- listen_on_trace_port(Node, Port, S);
- {error, Reason} when Reason == already_started->
+ S2 = listen_on_trace_port(Node, Port, S),
+ noreply(S2);
+ {error, Reason} when Reason =:= already_started->
ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n ~p~n",
[?MODULE, self(), Node, Port, Reason]),
S2 = S#state{trace_port = Port + 1},
- {noreply, S2};
+ noreply(S2);
{badrpc, Reason} ->
ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n ~p~n",
[?MODULE, self(), Node, Port, Reason]),
S2 = S#state{trace_port = Port + 1},
- {noreply, S2};
+ noreply(S2);
{error, Reason} ->
self() ! {nodeup, Node},
ok = error_logger:format("~p(~p): producer retry(~p:~p):~n ~p~n",
[?MODULE, self(), Node, Port, Reason]),
S2 = S#state{trace_port = Port + 1},
- {noreply, S2}
+ noreply(S2)
end;
handle_info({nodedown, Node}, S) ->
- {noreply, S#state{trace_nodes = S#state.trace_nodes -- [Node]}};
+ noreply(S#state{trace_nodes = S#state.trace_nodes -- [Node]});
handle_info({register_trace_client, Pid}, S) ->
link(Pid),
- {noreply, S};
+ noreply(S);
-handle_info({'EXIT', Pid, Reason}, S) when Pid == S#state.parent_pid ->
+handle_info({'EXIT', Pid, Reason}, S) when Pid =:= S#state.parent_pid ->
{stop, Reason, S};
-handle_info(Info = {'EXIT', Pid, _Reason}, S) ->
- OldSubscribers = S#state.subscribers,
+handle_info(Info = {'EXIT', Pid, Reason}, S) ->
+ OldSubscribers = S#state.subscribers,
case lists:member(Pid, OldSubscribers) of
- true ->
- S2 = do_dict_delete({dict_delete, {subscriber, Pid}}, S),
- {noreply, S2};
+ true when Reason =:= shutdown ->
+ try
+ S2 = do_dict_delete({dict_delete, {subscriber, Pid}}, S),
+ noreply(S2)
+ catch
+ throw:{stop, R} ->
+ opt_unlink(S#state.parent_pid),
+ {stop, R, S}
+ end;
+ true ->
+ opt_unlink(S#state.parent_pid),
+ {stop, Reason, S};
false ->
ok = error_logger:format("~p(~p): handle_info(~p, ~p)~n",
[?MODULE, self(), Info, S]),
- {noreply, S}
+ noreply(S)
end;
handle_info(Info, S) ->
ok = error_logger:format("~p(~p): handle_info(~p, ~p)~n",
[?MODULE, self(), Info, S]),
- {noreply, S}.
+ noreply(S).
listen_on_trace_port(Node, Port, S) ->
[_Name, Host] = string:tokens(atom_to_list(Node), [$@]),
@@ -1075,20 +1129,17 @@ listen_on_trace_port(Node, Port, S) ->
{trace_client_pid, RemotePid} ->
rpc:call(Node, et_selector, change_pattern, [S#state.trace_pattern]),
link(RemotePid),
- S2 = S#state{trace_nodes = [Node | S#state.trace_nodes],
- trace_port = Port + 1},
- {noreply, S2};
- {'EXIT', Reason} when Reason == already_started->
+ S#state{trace_nodes = [Node | S#state.trace_nodes],
+ trace_port = Port + 1};
+ {'EXIT', Reason} when Reason =:= already_started->
ok = error_logger:format("~p(~p): consumer ignored(~p:~p): ~p~n",
[?MODULE, self(), Node, Port, Reason]),
- S2 = S#state{trace_port = Port + 1},
- {noreply, S2};
+ S#state{trace_port = Port + 1};
{'EXIT', Reason} ->
self() ! {nodeup, Node},
ok = error_logger:format("~p(~p): consumer retry(~p:~p):~n ~p~n",
[?MODULE, self(), Node, Port, Reason]),
- S2 = S#state{trace_port = Port + 1},
- {noreply, S2}
+ S#state{trace_port = Port + 1}
end.
%%----------------------------------------------------------------------
@@ -1120,7 +1171,7 @@ do_clear_table(S) ->
NewTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]),
S#state{event_tab = NewTab}.
-do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when pid(Pid) ->
+do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when is_pid(Pid) ->
OldSubscribers = S#state.subscribers,
NewSubscribers =
case lists:member(Pid, OldSubscribers) of
@@ -1133,6 +1184,8 @@ do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when pid(Pi
[Pid | OldSubscribers]
end,
do_multicast(NewSubscribers, Msg),
+ Size = ets:info(S#state.event_tab, size),
+ do_multicast(NewSubscribers, {more_events, Size}),
ets:insert(S#state.dict_tab, {Key, Val}),
S#state{subscribers = NewSubscribers};
do_dict_insert(Msg = {dict_insert, Key, Val}, S) ->
@@ -1147,11 +1200,18 @@ do_dict_delete(Msg = {dict_delete, Key = {subscriber, Pid}}, S) ->
case lists:member(Pid, OldSubscribers) of
true ->
unlink(Pid),
- S#state{subscribers = OldSubscribers -- [Pid]};
+ S2 = S#state{subscribers = OldSubscribers -- [Pid]},
+ if
+ S2#state.auto_shutdown,
+ S2#state.subscribers =:= [] ->
+ throw({stop, shutdown});
+ true ->
+ S2
+ end;
false ->
S
end;
-do_dict_delete({dict_delete, {filter, collector}}, S) ->
+do_dict_delete({dict_delete, {filter, ?DEFAULT_FILTER_NAME}}, S) ->
S;
do_dict_delete(Msg = {dict_delete, Key}, S) ->
do_multicast(S#state.subscribers, Msg),
@@ -1202,3 +1262,33 @@ do_multicast([Pid | Pids], Msg) ->
do_multicast(Pids, Msg);
do_multicast([], _Msg) ->
ok.
+
+opt_unlink(Pid) ->
+ if
+ Pid =:= undefined ->
+ ignore;
+ true ->
+ unlink(Pid)
+ end.
+
+reply(Reply, #state{subscribers = []} = S) ->
+ {reply, Reply, S};
+reply(Reply, S) ->
+ {reply, Reply, S, 500}.
+
+noreply(#state{subscribers = []} = S) ->
+ {noreply, S};
+noreply(S) ->
+ {noreply, S, 500}.
+
+check_size(S) ->
+ Size = ets:info(S#state.event_tab, size),
+ if
+ Size =:= S#state.event_tab_size ->
+ S;
+ true ->
+ %% Tell the subscribers that more events are available
+ Msg = {more_events, Size},
+ do_multicast(S#state.subscribers, Msg),
+ S#state{event_tab_size = Size}
+ end.