diff options
author | HÃ¥kan Mattsson <[email protected]> | 2010-02-03 08:59:06 +0000 |
---|---|---|
committer | Erlang/OTP <[email protected]> | 2010-02-03 08:59:06 +0000 |
commit | 43f3482adf5eee657e5ba922733dfff6600c4e14 (patch) | |
tree | 7ea7b32a171de1a7690102c403a8a946e8a382a8 /lib/et/src/et_collector.erl | |
parent | 768da5a5f6312496b9b8a09cca5ea1d6b89a2c1c (diff) | |
download | otp-43f3482adf5eee657e5ba922733dfff6600c4e14.tar.gz otp-43f3482adf5eee657e5ba922733dfff6600c4e14.tar.bz2 otp-43f3482adf5eee657e5ba922733dfff6600c4e14.zip |
OTP-8058 The GUI parts are rewritten to use wxWidgets. Thanks Olle
Mattsson!
For the time being it is still possible to use the old GS based
version of the tool, but it is deprecated. The wxWidgets based
version is started by default.
A new tutorial has been added to the documentation. It is based
on Jayson Vantuyl's article
http://souja.net/2009/04/making-sense-of-erlangs-event-tracer.htm
l.
The functions et:trace_me/4 and et:trace_me/5 has been introduced
in order to replace the deprecated functions et:report_event/4
and et:report_event/5. Hopefully the new names makes it a little
more obvious what the intended usage of the functions are.
A print function has been added to the GUI, in order to enable
printing of sequence charts.
More functionality for hiding unwanted events has been added to
the GUI.
The max_events, hide_unknown and display_mode configuration
parameters to et_viewer is not used any more. Now the event cache
in the Viewer only contains those events that actually are
displayed in the GUI.
Diffstat (limited to 'lib/et/src/et_collector.erl')
-rw-r--r-- | lib/et/src/et_collector.erl | 330 |
1 files changed, 210 insertions, 120 deletions
diff --git a/lib/et/src/et_collector.erl b/lib/et/src/et_collector.erl index ea23c188f7..289537541d 100644 --- a/lib/et/src/et_collector.erl +++ b/lib/et/src/et_collector.erl @@ -1,19 +1,19 @@ %% %% %CopyrightBegin% -%% -%% Copyright Ericsson AB 2000-2009. All Rights Reserved. -%% +%% +%% Copyright Ericsson AB 2000-2010. All Rights Reserved. +%% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. -%% +%% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. -%% +%% %% %CopyrightEnd% %% %%---------------------------------------------------------------------- @@ -36,6 +36,7 @@ iterate/3, iterate/5, + lookup/2, start_trace_client/3, start_trace_port/1, @@ -45,6 +46,7 @@ get_global_pid/0, %% get_table_handle/1, + get_table_size/1, change_pattern/2, make_key/2, @@ -59,9 +61,12 @@ -export([init/1,terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). +-include("et_internal.hrl"). -include("../include/et.hrl"). -record(state, {parent_pid, + auto_shutdown, % Optionally shutdown when the last subscriber dies + event_tab_size, event_tab, dict_tab, event_order, @@ -102,7 +107,7 @@ %% stored by the collector. By replacing the default collector filter %% with a customized dito it is possible to allow any trace data as %% input. The collector filter is a dictionary entry with the -%% predefined key {filter, collector} and the value is a fun of +%% predefined key {filter, all} and the value is a fun of %% arity 1. See et_selector:parse_event/2 for interface details, %% such as which erlang:trace/1 tuples that are accepted. %% @@ -126,7 +131,7 @@ %% option() = %% {parent_pid, pid()} | %% {event_order, event_order()} | -%% {dict_insert, {filter, collector}, collector_fun()} | +%% {dict_insert, {filter, all}, collector_fun()} | %% {dict_insert, {filter, event_filter_name()}, event_filter_fun()} | %% {dict_insert, {subscriber, pid()}, dict_val()} | %% {dict_insert, dict_key(), dict_val()} | @@ -159,19 +164,16 @@ start_link(Options) -> case parse_opt(Options, default_state(), [], []) of - {ok, S, Dict2, Clients} when S#state.trace_global == false -> - case gen_server:start_link(?MODULE, [S, Dict2], []) of - {ok, Pid} when S#state.parent_pid /= self() -> - unlink(Pid), - start_clients(Pid, Clients); - {ok,Pid} -> - start_clients(Pid, Clients); - {error, Reason} -> - {error, Reason} - end; - {ok, S, Dict2, Clients} when S#state.trace_global == true -> - case gen_server:start_link({global, ?MODULE}, ?MODULE, [S, Dict2], []) of - {ok, Pid} when S#state.parent_pid /= self() -> + {ok, S, Dict2, Clients} -> + Res = + case S#state.trace_global of + false -> + gen_server:start_link(?MODULE, [S, Dict2], []); + true -> + gen_server:start_link({global, ?MODULE}, ?MODULE, [S, Dict2], []) + end, + case Res of + {ok, Pid} when S#state.parent_pid =/= self() -> unlink(Pid), start_clients(Pid, Clients); {ok,Pid} -> @@ -185,6 +187,7 @@ start_link(Options) -> default_state() -> #state{parent_pid = self(), + auto_shutdown = false, event_order = trace_ts, subscribers = [], trace_global = false, @@ -196,28 +199,30 @@ default_state() -> parse_opt([], S, Dict, Clients) -> {Mod, Pattern} = et_selector:make_pattern(S#state.trace_pattern), Fun = fun(E) -> et_selector:parse_event(Mod, E) end, - Default = {dict_insert, {filter, collector}, Fun}, + Default = {dict_insert, {filter, ?DEFAULT_FILTER_NAME}, Fun}, {ok, S#state{trace_pattern = {Mod, Pattern}}, [Default | Dict], Clients}; parse_opt([H | T], S, Dict, Clients) -> case H of - {parent_pid, Parent} when Parent == undefined -> + {parent_pid, Parent} when Parent =:= undefined -> parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients); - {parent_pid, Parent} when pid(Parent) -> + {parent_pid, Parent} when is_pid(Parent) -> parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients); - {event_order, Order} when Order == trace_ts -> + {auto_shutdown, Bool} when Bool =:= true; Bool =:= false -> + parse_opt(T, S#state{auto_shutdown = Bool}, Dict, Clients); + {event_order, Order} when Order =:= trace_ts -> parse_opt(T, S#state{event_order = Order}, Dict, Clients); - {event_order, Order} when Order == event_ts -> + {event_order, Order} when Order =:= event_ts -> parse_opt(T, S#state{event_order = Order}, Dict, Clients); {dict_insert, {filter, Name}, Fun} -> if - atom(Name), function(Fun) -> + is_atom(Name), is_function(Fun) -> parse_opt(T, S, Dict ++ [H], Clients); true -> {error, {bad_option, H}} end; {dict_insert, {subscriber, Pid}, _Val} -> if - pid(Pid) -> + is_pid(Pid) -> parse_opt(T, S, Dict ++ [H], Clients); true -> {error, {bad_option, H}} @@ -228,17 +233,17 @@ parse_opt([H | T], S, Dict, Clients) -> parse_opt(T, S, Dict ++ [H], Clients); {trace_client, Client = {_, _}} -> parse_opt(T, S, Dict, Clients ++ [Client]); - {trace_global, Bool} when Bool == false -> + {trace_global, Bool} when Bool =:= false -> parse_opt(T, S#state{trace_global = Bool}, Dict, Clients); - {trace_global, Bool} when Bool == true -> + {trace_global, Bool} when Bool =:= true -> parse_opt(T, S#state{trace_global = Bool}, Dict, Clients); - {trace_pattern, {Mod, _} = Pattern} when atom(Mod) -> + {trace_pattern, {Mod, _} = Pattern} when is_atom(Mod) -> parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients); {trace_pattern, undefined = Pattern} -> parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients); - {trace_port, Port} when integer(Port) -> + {trace_port, Port} when is_integer(Port) -> parse_opt(T, S#state{trace_port = Port}, Dict, Clients); - {trace_max_queue, MaxQueue} when integer(MaxQueue) -> + {trace_max_queue, MaxQueue} when is_integer(MaxQueue) -> parse_opt(T, S#state{trace_port = MaxQueue}, Dict, Clients); Bad -> {error, {bad_option, Bad}} @@ -352,19 +357,19 @@ do_load_event_file(Fun, Fd, Cont, Acc, FileName, BadBytes) -> %% Returns: {ok, Continuation} | exit(Reason) %%---------------------------------------------------------------------- -report(CollectorPid, TraceOrEvent) when pid(CollectorPid) -> +report(CollectorPid, TraceOrEvent) when is_pid(CollectorPid) -> case get_table_handle(CollectorPid) of - {ok, TH} when record(TH, table_handle) -> + {ok, TH} when is_record(TH, table_handle) -> report(TH, TraceOrEvent); {error, Reason} -> exit(Reason) end; -report(TH, TraceOrEvent) when record(TH, table_handle) -> +report(TH, TraceOrEvent) when is_record(TH, table_handle) -> Fun = TH#table_handle.filter, case Fun(TraceOrEvent) of false -> {ok, TH}; - true when record(TraceOrEvent, event) -> + true when is_record(TraceOrEvent, event) -> Key = make_key(TH, TraceOrEvent), case catch ets:insert(TH#table_handle.event_tab, {Key, TraceOrEvent}) of true -> @@ -373,7 +378,7 @@ report(TH, TraceOrEvent) when record(TH, table_handle) -> %% Refresh the report handle and try again report(TH#table_handle.collector_pid, TraceOrEvent) end; - {true, Event} when record(Event, event) -> + {true, Event} when is_record(Event, event) -> Key = make_key(TH, Event), case catch ets:insert(TH#table_handle.event_tab, {Key, Event}) of true -> @@ -401,7 +406,7 @@ report(TH, TraceOrEvent) when record(TH, table_handle) -> report(TH#table_handle.collector_pid, TraceOrEvent) end end; -report(TH, end_of_trace) when record(TH, table_handle) -> +report(TH, end_of_trace) when is_record(TH, table_handle) -> {ok, TH}; report(_, Bad) -> exit({bad_event, Bad}). @@ -410,7 +415,7 @@ report_event(CollectorPid, DetailLevel, FromTo, Label, Contents) -> report_event(CollectorPid, DetailLevel, FromTo, FromTo, Label, Contents). report_event(CollectorPid, DetailLevel, From, To, Label, Contents) - when integer(DetailLevel), DetailLevel >= 0, DetailLevel =< 100, list(Contents) -> + when is_integer(DetailLevel), DetailLevel >= 0, DetailLevel =< 100 -> TS= erlang:now(), E = #event{detail_level = DetailLevel, trace_ts = TS, @@ -431,32 +436,38 @@ report_event(CollectorPid, DetailLevel, From, To, Label, Contents) %% Key = record(event_ts) | record(trace_ts) %%---------------------------------------------------------------------- -make_key(TH, Stuff) when record(TH, table_handle) -> +make_key(TH, Stuff) when is_record(TH, table_handle) -> make_key(TH#table_handle.event_order, Stuff); make_key(trace_ts, Stuff) -> if - record(Stuff, event) -> + is_record(Stuff, event) -> #event{trace_ts = R, event_ts = P} = Stuff, #trace_ts{trace_ts = R, event_ts = P}; - record(Stuff, trace_ts) -> + is_record(Stuff, trace_ts) -> Stuff; - record(Stuff, event_ts) -> + is_record(Stuff, event_ts) -> #event_ts{trace_ts = R, event_ts = P} = Stuff, #trace_ts{trace_ts = R, event_ts = P} end; make_key(event_ts, Stuff) -> if - record(Stuff, event) -> + is_record(Stuff, event) -> #event{trace_ts = R, event_ts = P} = Stuff, #event_ts{trace_ts = R, event_ts = P}; - record(Stuff, event_ts) -> + is_record(Stuff, event_ts) -> Stuff; - record(Stuff, trace_ts) -> + is_record(Stuff, trace_ts) -> #trace_ts{trace_ts = R, event_ts = P} = Stuff, #event_ts{trace_ts = R, event_ts = P} end. %%---------------------------------------------------------------------- +%%---------------------------------------------------------------------- + +get_table_size(CollectorPid) when is_pid(CollectorPid) -> + call(CollectorPid, get_table_size). + +%%---------------------------------------------------------------------- %% get_table_handle(CollectorPid) -> Handle %% %% Return a table handle @@ -465,7 +476,7 @@ make_key(event_ts, Stuff) -> %% Handle = record(table_handle) %%---------------------------------------------------------------------- -get_table_handle(CollectorPid) when pid(CollectorPid) -> +get_table_handle(CollectorPid) when is_pid(CollectorPid) -> call(CollectorPid, get_table_handle). %%---------------------------------------------------------------------- @@ -480,7 +491,7 @@ get_table_handle(CollectorPid) when pid(CollectorPid) -> get_global_pid() -> case global:whereis_name(?MODULE) of - CollectorPid when pid(CollectorPid) -> + CollectorPid when is_pid(CollectorPid) -> CollectorPid; undefined -> exit(global_collector_not_started) @@ -505,7 +516,7 @@ change_pattern(CollectorPid, RawPattern) -> call(CollectorPid, {change_pattern, Pattern}). %%---------------------------------------------------------------------- -%% dict_insert(CollectorPid, {filter, collector}, FilterFun) -> ok +%% dict_insert(CollectorPid, {filter, all}, FilterFun) -> ok %% dict_insert(CollectorPid, {subscriber, SubscriberPid}, Void) -> ok %% dict_insert(CollectorPid, Key, Val) -> ok %% @@ -532,14 +543,14 @@ change_pattern(CollectorPid, RawPattern) -> dict_insert(CollectorPid, Key = {filter, Name}, Fun) -> if - atom(Name), function(Fun) -> + is_atom(Name), is_function(Fun) -> call(CollectorPid, {dict_insert, Key, Fun}); true -> exit({badarg, Key}) end; dict_insert(CollectorPid, Key = {subscriber, Pid}, Val) -> if - pid(Pid) -> + is_pid(Pid) -> call(CollectorPid, {dict_insert, Key, Val}); true -> exit({badarg, Key}) @@ -626,9 +637,9 @@ multicast(CollectorPid, Msg) -> %% Pid = dbg_trace_client_pid() %%---------------------------------------------------------------------- -start_trace_client(CollectorPid, Type, FileName) when Type == event_file -> +start_trace_client(CollectorPid, Type, FileName) when Type =:= event_file -> load_event_file(CollectorPid, FileName); -start_trace_client(CollectorPid, Type, FileName) when Type == file -> +start_trace_client(CollectorPid, Type, FileName) when Type =:= file -> WaitFor = {make_ref(), end_of_trace}, EventFun = fun(E, {ReplyTo, {ok, TH}}) -> {ReplyTo, report(TH, E)} end, EndFun = fun({ReplyTo, {ok, _TH}}) -> ReplyTo ! WaitFor, ReplyTo end, @@ -658,9 +669,9 @@ start_trace_client(CollectorPid, Type, Parameters) -> {trace_client_pid, Pid}. trace_spec_wrapper(EventFun, EndFun, EventInitialAcc) - when function(EventFun), function(EndFun) -> + when is_function(EventFun), is_function(EndFun) -> {fun(Trace, Acc) -> - case Trace == end_of_trace of + case Trace =:= end_of_trace of true -> EndFun(Acc); false -> EventFun(Trace, Acc) end @@ -702,24 +713,24 @@ iterate(Handle, Prev, Limit) -> %% Acc = NewAcc = term() %%---------------------------------------------------------------------- -iterate(_, _, Limit, _, Acc) when Limit == 0 -> +iterate(_, _, Limit, _, Acc) when Limit =:= 0 -> Acc; -iterate(CollectorPid, Prev, Limit, Fun, Acc) when pid(CollectorPid) -> +iterate(CollectorPid, Prev, Limit, Fun, Acc) when is_pid(CollectorPid) -> case get_table_handle(CollectorPid) of - {ok, TH} when record(TH, table_handle) -> + {ok, TH} when is_record(TH, table_handle) -> iterate(TH, Prev, Limit, Fun, Acc); {error, Reason} -> exit(Reason) end; -iterate(TH, Prev, Limit, Fun, Acc) when record(TH, table_handle) -> +iterate(TH, Prev, Limit, Fun, Acc) when is_record(TH, table_handle) -> if - Limit == infinity -> + Limit =:= infinity -> next_iterate(TH, Prev, Limit, Fun, Acc); - integer(Limit), Limit > 0 -> + is_integer(Limit), Limit > 0 -> next_iterate(TH, Prev, Limit, Fun, Acc); - Limit == '-infinity' -> + Limit =:= '-infinity' -> prev_iterate(TH, Prev, Limit, Fun, Acc); - integer(Limit), Limit < 0 -> + is_integer(Limit), Limit < 0 -> prev_iterate(TH, Prev, Limit, Fun, Acc) end. @@ -793,7 +804,7 @@ prev_iterate(TH, Prev, Limit, Fun, Acc) -> lookup_and_apply(TH, Prev, Next, Limit, 1, Fun, Acc) end. -lookup_and_apply(TH, _Prev, Next, Limit, Incr, Fun, _Acc) when Fun == undefined -> +lookup_and_apply(TH, _Prev, Next, Limit, Incr, Fun, _Acc) when Fun =:= undefined -> Limit2 = incr(Limit, Incr), iterate(TH, Next, Limit2, Fun, Next); lookup_and_apply(TH, Prev, Next, Limit, Incr, Fun, Acc) -> @@ -801,17 +812,33 @@ lookup_and_apply(TH, Prev, Next, Limit, Incr, Fun, Acc) -> case catch ets:lookup_element(Tab, Next, 2) of {'EXIT', _} -> iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); - E when record(E, event) -> + E when is_record(E, event) -> Acc2 = Fun(E, Acc), Limit2 = incr(Limit, Incr), iterate(TH, Next, Limit2, Fun, Acc2) end. +lookup(CollectorPid, Key) when is_pid(CollectorPid) -> + case get_table_handle(CollectorPid) of + {ok, TH} when is_record(TH, table_handle) -> + lookup(TH, Key); + {error, Reason} -> + {error, Reason} + end; +lookup(TH, Key) when is_record(TH, table_handle) -> + Tab = TH#table_handle.event_tab, + case catch ets:lookup_element(Tab, Key, 2) of + {'EXIT', _} -> + {error, enoent}; + E when is_record(E, event) -> + {ok, E} + end. + incr(Val, Incr) -> if - Val == infinity -> Val; - Val == '-infinity' -> Val; - integer(Val) -> Val + Incr + Val =:= infinity -> Val; + Val =:= '-infinity' -> Val; + is_integer(Val) -> Val + Incr end. %%---------------------------------------------------------------------- @@ -824,13 +851,18 @@ incr(Val, Incr) -> %% table_handle() = record(table_handle) %%---------------------------------------------------------------------- -clear_table(CollectorPid) when pid(CollectorPid) -> +clear_table(CollectorPid) when is_pid(CollectorPid) -> call(CollectorPid, clear_table); -clear_table(TH) when record(TH, table_handle) -> +clear_table(TH) when is_record(TH, table_handle) -> clear_table(TH#table_handle.collector_pid). call(CollectorPid, Request) -> - gen_server:call(CollectorPid, Request, infinity). + try + gen_server:call(CollectorPid, Request, infinity) + catch + exit:{noproc,_} -> + {error, no_collector} + end. %%%---------------------------------------------------------------------- %%% Callback functions from gen_server @@ -849,7 +881,7 @@ init([InitialS, Dict]) -> case InitialS#state.parent_pid of undefined -> ignore; - Pid when pid(Pid) -> + Pid when is_pid(Pid) -> link(Pid) end, Funs = [fun init_tables/1, @@ -860,7 +892,7 @@ init([InitialS, Dict]) -> init_tables(S) -> EventTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]), DictTab = ets:new(et_dict, [ordered_set, {keypos, 1}, public]), - S#state{event_tab = EventTab, dict_tab = DictTab}. + S#state{event_tab = EventTab, dict_tab = DictTab, event_tab_size = 0}. init_global(S) -> case S#state.trace_global of @@ -889,44 +921,53 @@ init_global(S) -> handle_call({multicast, Msg}, _From, S) -> do_multicast(S#state.subscribers, Msg), - {reply, ok, S}; + reply(ok, S); handle_call(Msg = {dict_insert, _Key, _Val}, _From, S) -> S2 = do_dict_insert(Msg, S), - {reply, ok, S2}; + reply(ok, S2); handle_call(Msg = {dict_delete, _Key}, _From, S) -> - S2 = do_dict_delete(Msg, S), - {reply, ok, S2}; - + try + S2 = do_dict_delete(Msg, S), + reply(ok, S2) + catch + throw:{stop, R} -> + opt_unlink(S#state.parent_pid), + {stop, R, S} + end; handle_call({dict_lookup, Key}, _From, S) -> Reply = ets:lookup(S#state.dict_tab, Key), - {reply, Reply, S}; + reply(Reply, S); handle_call({dict_match, Pattern}, _From, S) -> case catch ets:match_object(S#state.dict_tab, Pattern) of {'EXIT', _Reason} -> - {reply, [], S}; + reply([], S); Matching -> - {reply, Matching, S} + reply(Matching, S) end; handle_call(get_table_handle, _From, S) -> - [{_, TableFilter}] = ets:lookup(S#state.dict_tab, {filter, collector}), + [{_, TableFilter}] = ets:lookup(S#state.dict_tab, {filter, ?DEFAULT_FILTER_NAME}), TH = #table_handle{collector_pid = self(), event_tab = S#state.event_tab, event_order = S#state.event_order, filter = TableFilter}, - {reply, {ok, TH}, S}; + reply({ok, TH}, S); + +handle_call(get_table_size, _From, S) -> + Size = ets:info(S#state.event_tab, size), + reply({ok, Size}, S); handle_call(close, _From, S) -> case S#state.file of undefined -> - {reply, {error, file_not_open}, S}; + reply({error, file_not_open}, S); F -> Reply = disk_log:close(F#file.desc), S2 = S#state{file = undefined}, - {reply, Reply, S2} + reply(Reply, S2) end; handle_call({save_event_file, FileName, Options}, _From, S) -> Default = #file{name = FileName, @@ -934,7 +975,7 @@ handle_call({save_event_file, FileName, Options}, _From, S) -> file_opt = write, table_opt = keep}, case parse_file_options(Default, Options) of - {ok, F} when record(F, file) -> + {ok, F} when is_record(F, file) -> case file_open(F) of {ok, Fd} -> F2 = F#file{desc = Fd}, @@ -966,16 +1007,16 @@ handle_call({save_event_file, FileName, Options}, _From, S) -> end, case F2#file.table_opt of keep -> - {reply, Reply2, S3}; + reply(Reply2, S3); clear -> S4 = do_clear_table(S3), - {reply, Reply2, S4} + reply(Reply2, S4) end; {error, Reason} -> - {reply, {error, {file_open, Reason}}, S} + reply({error, {file_open, Reason}}, S) end; {error, Reason} -> - {reply, {error, Reason}, S} + reply({error, Reason}, S) end; handle_call({change_pattern, Pattern}, _From, S) -> @@ -983,11 +1024,11 @@ handle_call({change_pattern, Pattern}, _From, S) -> rpc:multicall(Ns, et_selector, change_pattern, [Pattern]), Reply = {old_pattern, S#state.trace_pattern}, S2 = S#state{trace_pattern = Pattern}, - {reply, Reply, S2}; + reply(Reply, S2); handle_call(clear_table, _From, S) -> S2 = do_clear_table(S), - {reply, ok, S2}; + reply(ok, S2); handle_call(stop, _From, S) -> do_multicast(S#state.subscribers, close), @@ -999,7 +1040,7 @@ handle_call(stop, _From, S) -> handle_call(Request, From, S) -> ok = error_logger:format("~p(~p): handle_call(~p, ~p, ~p)~n", [?MODULE, self(), Request, From, S]), - {reply, {error, {bad_request, Request}}, S}. + reply({error, {bad_request, Request}}, S). %%---------------------------------------------------------------------- %% Func: handle_cast/2 @@ -1011,7 +1052,7 @@ handle_call(Request, From, S) -> handle_cast(Msg, S) -> ok = error_logger:format("~p(~p): handle_cast(~p, ~p)~n", [?MODULE, self(), Msg, S]), - {noreply, S}. + noreply(S). %%---------------------------------------------------------------------- %% Func: handle_info/2 @@ -1020,54 +1061,67 @@ handle_cast(Msg, S) -> %% {stop, Reason, State} (terminate/2 is called) %%---------------------------------------------------------------------- +handle_info(timeout, S) -> + S2 = check_size(S), + noreply(S2); handle_info({nodeup, Node}, S) -> Port = S#state.trace_port, MaxQueue = S#state.trace_max_queue, case rpc:call(Node, ?MODULE, start_trace_port, [{Port, MaxQueue}]) of {ok, _} -> - listen_on_trace_port(Node, Port, S); - {error, Reason} when Reason == already_started-> + S2 = listen_on_trace_port(Node, Port, S), + noreply(S2); + {error, Reason} when Reason =:= already_started-> ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n ~p~n", [?MODULE, self(), Node, Port, Reason]), S2 = S#state{trace_port = Port + 1}, - {noreply, S2}; + noreply(S2); {badrpc, Reason} -> ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n ~p~n", [?MODULE, self(), Node, Port, Reason]), S2 = S#state{trace_port = Port + 1}, - {noreply, S2}; + noreply(S2); {error, Reason} -> self() ! {nodeup, Node}, ok = error_logger:format("~p(~p): producer retry(~p:~p):~n ~p~n", [?MODULE, self(), Node, Port, Reason]), S2 = S#state{trace_port = Port + 1}, - {noreply, S2} + noreply(S2) end; handle_info({nodedown, Node}, S) -> - {noreply, S#state{trace_nodes = S#state.trace_nodes -- [Node]}}; + noreply(S#state{trace_nodes = S#state.trace_nodes -- [Node]}); handle_info({register_trace_client, Pid}, S) -> link(Pid), - {noreply, S}; + noreply(S); -handle_info({'EXIT', Pid, Reason}, S) when Pid == S#state.parent_pid -> +handle_info({'EXIT', Pid, Reason}, S) when Pid =:= S#state.parent_pid -> {stop, Reason, S}; -handle_info(Info = {'EXIT', Pid, _Reason}, S) -> - OldSubscribers = S#state.subscribers, +handle_info(Info = {'EXIT', Pid, Reason}, S) -> + OldSubscribers = S#state.subscribers, case lists:member(Pid, OldSubscribers) of - true -> - S2 = do_dict_delete({dict_delete, {subscriber, Pid}}, S), - {noreply, S2}; + true when Reason =:= shutdown -> + try + S2 = do_dict_delete({dict_delete, {subscriber, Pid}}, S), + noreply(S2) + catch + throw:{stop, R} -> + opt_unlink(S#state.parent_pid), + {stop, R, S} + end; + true -> + opt_unlink(S#state.parent_pid), + {stop, Reason, S}; false -> ok = error_logger:format("~p(~p): handle_info(~p, ~p)~n", [?MODULE, self(), Info, S]), - {noreply, S} + noreply(S) end; handle_info(Info, S) -> ok = error_logger:format("~p(~p): handle_info(~p, ~p)~n", [?MODULE, self(), Info, S]), - {noreply, S}. + noreply(S). listen_on_trace_port(Node, Port, S) -> [_Name, Host] = string:tokens(atom_to_list(Node), [$@]), @@ -1075,20 +1129,17 @@ listen_on_trace_port(Node, Port, S) -> {trace_client_pid, RemotePid} -> rpc:call(Node, et_selector, change_pattern, [S#state.trace_pattern]), link(RemotePid), - S2 = S#state{trace_nodes = [Node | S#state.trace_nodes], - trace_port = Port + 1}, - {noreply, S2}; - {'EXIT', Reason} when Reason == already_started-> + S#state{trace_nodes = [Node | S#state.trace_nodes], + trace_port = Port + 1}; + {'EXIT', Reason} when Reason =:= already_started-> ok = error_logger:format("~p(~p): consumer ignored(~p:~p): ~p~n", [?MODULE, self(), Node, Port, Reason]), - S2 = S#state{trace_port = Port + 1}, - {noreply, S2}; + S#state{trace_port = Port + 1}; {'EXIT', Reason} -> self() ! {nodeup, Node}, ok = error_logger:format("~p(~p): consumer retry(~p:~p):~n ~p~n", [?MODULE, self(), Node, Port, Reason]), - S2 = S#state{trace_port = Port + 1}, - {noreply, S2} + S#state{trace_port = Port + 1} end. %%---------------------------------------------------------------------- @@ -1120,7 +1171,7 @@ do_clear_table(S) -> NewTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]), S#state{event_tab = NewTab}. -do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when pid(Pid) -> +do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when is_pid(Pid) -> OldSubscribers = S#state.subscribers, NewSubscribers = case lists:member(Pid, OldSubscribers) of @@ -1133,6 +1184,8 @@ do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when pid(Pi [Pid | OldSubscribers] end, do_multicast(NewSubscribers, Msg), + Size = ets:info(S#state.event_tab, size), + do_multicast(NewSubscribers, {more_events, Size}), ets:insert(S#state.dict_tab, {Key, Val}), S#state{subscribers = NewSubscribers}; do_dict_insert(Msg = {dict_insert, Key, Val}, S) -> @@ -1147,11 +1200,18 @@ do_dict_delete(Msg = {dict_delete, Key = {subscriber, Pid}}, S) -> case lists:member(Pid, OldSubscribers) of true -> unlink(Pid), - S#state{subscribers = OldSubscribers -- [Pid]}; + S2 = S#state{subscribers = OldSubscribers -- [Pid]}, + if + S2#state.auto_shutdown, + S2#state.subscribers =:= [] -> + throw({stop, shutdown}); + true -> + S2 + end; false -> S end; -do_dict_delete({dict_delete, {filter, collector}}, S) -> +do_dict_delete({dict_delete, {filter, ?DEFAULT_FILTER_NAME}}, S) -> S; do_dict_delete(Msg = {dict_delete, Key}, S) -> do_multicast(S#state.subscribers, Msg), @@ -1202,3 +1262,33 @@ do_multicast([Pid | Pids], Msg) -> do_multicast(Pids, Msg); do_multicast([], _Msg) -> ok. + +opt_unlink(Pid) -> + if + Pid =:= undefined -> + ignore; + true -> + unlink(Pid) + end. + +reply(Reply, #state{subscribers = []} = S) -> + {reply, Reply, S}; +reply(Reply, S) -> + {reply, Reply, S, 500}. + +noreply(#state{subscribers = []} = S) -> + {noreply, S}; +noreply(S) -> + {noreply, S, 500}. + +check_size(S) -> + Size = ets:info(S#state.event_tab, size), + if + Size =:= S#state.event_tab_size -> + S; + true -> + %% Tell the subscribers that more events are available + Msg = {more_events, Size}, + do_multicast(S#state.subscribers, Msg), + S#state{event_tab_size = Size} + end. |