diff options
Diffstat (limited to 'lib/et/src/et_collector.erl')
-rw-r--r-- | lib/et/src/et_collector.erl | 330 |
1 files changed, 210 insertions, 120 deletions
diff --git a/lib/et/src/et_collector.erl b/lib/et/src/et_collector.erl index ea23c188f7..289537541d 100644 --- a/lib/et/src/et_collector.erl +++ b/lib/et/src/et_collector.erl @@ -1,19 +1,19 @@ %% %% %CopyrightBegin% -%% -%% Copyright Ericsson AB 2000-2009. All Rights Reserved. -%% +%% +%% Copyright Ericsson AB 2000-2010. All Rights Reserved. +%% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. -%% +%% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. -%% +%% %% %CopyrightEnd% %% %%---------------------------------------------------------------------- @@ -36,6 +36,7 @@ iterate/3, iterate/5, + lookup/2, start_trace_client/3, start_trace_port/1, @@ -45,6 +46,7 @@ get_global_pid/0, %% get_table_handle/1, + get_table_size/1, change_pattern/2, make_key/2, @@ -59,9 +61,12 @@ -export([init/1,terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). +-include("et_internal.hrl"). -include("../include/et.hrl"). -record(state, {parent_pid, + auto_shutdown, % Optionally shutdown when the last subscriber dies + event_tab_size, event_tab, dict_tab, event_order, @@ -102,7 +107,7 @@ %% stored by the collector. By replacing the default collector filter %% with a customized dito it is possible to allow any trace data as %% input. The collector filter is a dictionary entry with the -%% predefined key {filter, collector} and the value is a fun of +%% predefined key {filter, all} and the value is a fun of %% arity 1. See et_selector:parse_event/2 for interface details, %% such as which erlang:trace/1 tuples that are accepted. %% @@ -126,7 +131,7 @@ %% option() = %% {parent_pid, pid()} | %% {event_order, event_order()} | -%% {dict_insert, {filter, collector}, collector_fun()} | +%% {dict_insert, {filter, all}, collector_fun()} | %% {dict_insert, {filter, event_filter_name()}, event_filter_fun()} | %% {dict_insert, {subscriber, pid()}, dict_val()} | %% {dict_insert, dict_key(), dict_val()} | @@ -159,19 +164,16 @@ start_link(Options) -> case parse_opt(Options, default_state(), [], []) of - {ok, S, Dict2, Clients} when S#state.trace_global == false -> - case gen_server:start_link(?MODULE, [S, Dict2], []) of - {ok, Pid} when S#state.parent_pid /= self() -> - unlink(Pid), - start_clients(Pid, Clients); - {ok,Pid} -> - start_clients(Pid, Clients); - {error, Reason} -> - {error, Reason} - end; - {ok, S, Dict2, Clients} when S#state.trace_global == true -> - case gen_server:start_link({global, ?MODULE}, ?MODULE, [S, Dict2], []) of - {ok, Pid} when S#state.parent_pid /= self() -> + {ok, S, Dict2, Clients} -> + Res = + case S#state.trace_global of + false -> + gen_server:start_link(?MODULE, [S, Dict2], []); + true -> + gen_server:start_link({global, ?MODULE}, ?MODULE, [S, Dict2], []) + end, + case Res of + {ok, Pid} when S#state.parent_pid =/= self() -> unlink(Pid), start_clients(Pid, Clients); {ok,Pid} -> @@ -185,6 +187,7 @@ start_link(Options) -> default_state() -> #state{parent_pid = self(), + auto_shutdown = false, event_order = trace_ts, subscribers = [], trace_global = false, @@ -196,28 +199,30 @@ default_state() -> parse_opt([], S, Dict, Clients) -> {Mod, Pattern} = et_selector:make_pattern(S#state.trace_pattern), Fun = fun(E) -> et_selector:parse_event(Mod, E) end, - Default = {dict_insert, {filter, collector}, Fun}, + Default = {dict_insert, {filter, ?DEFAULT_FILTER_NAME}, Fun}, {ok, S#state{trace_pattern = {Mod, Pattern}}, [Default | Dict], Clients}; parse_opt([H | T], S, Dict, Clients) -> case H of - {parent_pid, Parent} when Parent == undefined -> + {parent_pid, Parent} when Parent =:= undefined -> parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients); - {parent_pid, Parent} when pid(Parent) -> + {parent_pid, Parent} when is_pid(Parent) -> parse_opt(T, S#state{parent_pid = Parent}, Dict, Clients); - {event_order, Order} when Order == trace_ts -> + {auto_shutdown, Bool} when Bool =:= true; Bool =:= false -> + parse_opt(T, S#state{auto_shutdown = Bool}, Dict, Clients); + {event_order, Order} when Order =:= trace_ts -> parse_opt(T, S#state{event_order = Order}, Dict, Clients); - {event_order, Order} when Order == event_ts -> + {event_order, Order} when Order =:= event_ts -> parse_opt(T, S#state{event_order = Order}, Dict, Clients); {dict_insert, {filter, Name}, Fun} -> if - atom(Name), function(Fun) -> + is_atom(Name), is_function(Fun) -> parse_opt(T, S, Dict ++ [H], Clients); true -> {error, {bad_option, H}} end; {dict_insert, {subscriber, Pid}, _Val} -> if - pid(Pid) -> + is_pid(Pid) -> parse_opt(T, S, Dict ++ [H], Clients); true -> {error, {bad_option, H}} @@ -228,17 +233,17 @@ parse_opt([H | T], S, Dict, Clients) -> parse_opt(T, S, Dict ++ [H], Clients); {trace_client, Client = {_, _}} -> parse_opt(T, S, Dict, Clients ++ [Client]); - {trace_global, Bool} when Bool == false -> + {trace_global, Bool} when Bool =:= false -> parse_opt(T, S#state{trace_global = Bool}, Dict, Clients); - {trace_global, Bool} when Bool == true -> + {trace_global, Bool} when Bool =:= true -> parse_opt(T, S#state{trace_global = Bool}, Dict, Clients); - {trace_pattern, {Mod, _} = Pattern} when atom(Mod) -> + {trace_pattern, {Mod, _} = Pattern} when is_atom(Mod) -> parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients); {trace_pattern, undefined = Pattern} -> parse_opt(T, S#state{trace_pattern = Pattern}, Dict, Clients); - {trace_port, Port} when integer(Port) -> + {trace_port, Port} when is_integer(Port) -> parse_opt(T, S#state{trace_port = Port}, Dict, Clients); - {trace_max_queue, MaxQueue} when integer(MaxQueue) -> + {trace_max_queue, MaxQueue} when is_integer(MaxQueue) -> parse_opt(T, S#state{trace_port = MaxQueue}, Dict, Clients); Bad -> {error, {bad_option, Bad}} @@ -352,19 +357,19 @@ do_load_event_file(Fun, Fd, Cont, Acc, FileName, BadBytes) -> %% Returns: {ok, Continuation} | exit(Reason) %%---------------------------------------------------------------------- -report(CollectorPid, TraceOrEvent) when pid(CollectorPid) -> +report(CollectorPid, TraceOrEvent) when is_pid(CollectorPid) -> case get_table_handle(CollectorPid) of - {ok, TH} when record(TH, table_handle) -> + {ok, TH} when is_record(TH, table_handle) -> report(TH, TraceOrEvent); {error, Reason} -> exit(Reason) end; -report(TH, TraceOrEvent) when record(TH, table_handle) -> +report(TH, TraceOrEvent) when is_record(TH, table_handle) -> Fun = TH#table_handle.filter, case Fun(TraceOrEvent) of false -> {ok, TH}; - true when record(TraceOrEvent, event) -> + true when is_record(TraceOrEvent, event) -> Key = make_key(TH, TraceOrEvent), case catch ets:insert(TH#table_handle.event_tab, {Key, TraceOrEvent}) of true -> @@ -373,7 +378,7 @@ report(TH, TraceOrEvent) when record(TH, table_handle) -> %% Refresh the report handle and try again report(TH#table_handle.collector_pid, TraceOrEvent) end; - {true, Event} when record(Event, event) -> + {true, Event} when is_record(Event, event) -> Key = make_key(TH, Event), case catch ets:insert(TH#table_handle.event_tab, {Key, Event}) of true -> @@ -401,7 +406,7 @@ report(TH, TraceOrEvent) when record(TH, table_handle) -> report(TH#table_handle.collector_pid, TraceOrEvent) end end; -report(TH, end_of_trace) when record(TH, table_handle) -> +report(TH, end_of_trace) when is_record(TH, table_handle) -> {ok, TH}; report(_, Bad) -> exit({bad_event, Bad}). @@ -410,7 +415,7 @@ report_event(CollectorPid, DetailLevel, FromTo, Label, Contents) -> report_event(CollectorPid, DetailLevel, FromTo, FromTo, Label, Contents). report_event(CollectorPid, DetailLevel, From, To, Label, Contents) - when integer(DetailLevel), DetailLevel >= 0, DetailLevel =< 100, list(Contents) -> + when is_integer(DetailLevel), DetailLevel >= 0, DetailLevel =< 100 -> TS= erlang:now(), E = #event{detail_level = DetailLevel, trace_ts = TS, @@ -431,32 +436,38 @@ report_event(CollectorPid, DetailLevel, From, To, Label, Contents) %% Key = record(event_ts) | record(trace_ts) %%---------------------------------------------------------------------- -make_key(TH, Stuff) when record(TH, table_handle) -> +make_key(TH, Stuff) when is_record(TH, table_handle) -> make_key(TH#table_handle.event_order, Stuff); make_key(trace_ts, Stuff) -> if - record(Stuff, event) -> + is_record(Stuff, event) -> #event{trace_ts = R, event_ts = P} = Stuff, #trace_ts{trace_ts = R, event_ts = P}; - record(Stuff, trace_ts) -> + is_record(Stuff, trace_ts) -> Stuff; - record(Stuff, event_ts) -> + is_record(Stuff, event_ts) -> #event_ts{trace_ts = R, event_ts = P} = Stuff, #trace_ts{trace_ts = R, event_ts = P} end; make_key(event_ts, Stuff) -> if - record(Stuff, event) -> + is_record(Stuff, event) -> #event{trace_ts = R, event_ts = P} = Stuff, #event_ts{trace_ts = R, event_ts = P}; - record(Stuff, event_ts) -> + is_record(Stuff, event_ts) -> Stuff; - record(Stuff, trace_ts) -> + is_record(Stuff, trace_ts) -> #trace_ts{trace_ts = R, event_ts = P} = Stuff, #event_ts{trace_ts = R, event_ts = P} end. %%---------------------------------------------------------------------- +%%---------------------------------------------------------------------- + +get_table_size(CollectorPid) when is_pid(CollectorPid) -> + call(CollectorPid, get_table_size). + +%%---------------------------------------------------------------------- %% get_table_handle(CollectorPid) -> Handle %% %% Return a table handle @@ -465,7 +476,7 @@ make_key(event_ts, Stuff) -> %% Handle = record(table_handle) %%---------------------------------------------------------------------- -get_table_handle(CollectorPid) when pid(CollectorPid) -> +get_table_handle(CollectorPid) when is_pid(CollectorPid) -> call(CollectorPid, get_table_handle). %%---------------------------------------------------------------------- @@ -480,7 +491,7 @@ get_table_handle(CollectorPid) when pid(CollectorPid) -> get_global_pid() -> case global:whereis_name(?MODULE) of - CollectorPid when pid(CollectorPid) -> + CollectorPid when is_pid(CollectorPid) -> CollectorPid; undefined -> exit(global_collector_not_started) @@ -505,7 +516,7 @@ change_pattern(CollectorPid, RawPattern) -> call(CollectorPid, {change_pattern, Pattern}). %%---------------------------------------------------------------------- -%% dict_insert(CollectorPid, {filter, collector}, FilterFun) -> ok +%% dict_insert(CollectorPid, {filter, all}, FilterFun) -> ok %% dict_insert(CollectorPid, {subscriber, SubscriberPid}, Void) -> ok %% dict_insert(CollectorPid, Key, Val) -> ok %% @@ -532,14 +543,14 @@ change_pattern(CollectorPid, RawPattern) -> dict_insert(CollectorPid, Key = {filter, Name}, Fun) -> if - atom(Name), function(Fun) -> + is_atom(Name), is_function(Fun) -> call(CollectorPid, {dict_insert, Key, Fun}); true -> exit({badarg, Key}) end; dict_insert(CollectorPid, Key = {subscriber, Pid}, Val) -> if - pid(Pid) -> + is_pid(Pid) -> call(CollectorPid, {dict_insert, Key, Val}); true -> exit({badarg, Key}) @@ -626,9 +637,9 @@ multicast(CollectorPid, Msg) -> %% Pid = dbg_trace_client_pid() %%---------------------------------------------------------------------- -start_trace_client(CollectorPid, Type, FileName) when Type == event_file -> +start_trace_client(CollectorPid, Type, FileName) when Type =:= event_file -> load_event_file(CollectorPid, FileName); -start_trace_client(CollectorPid, Type, FileName) when Type == file -> +start_trace_client(CollectorPid, Type, FileName) when Type =:= file -> WaitFor = {make_ref(), end_of_trace}, EventFun = fun(E, {ReplyTo, {ok, TH}}) -> {ReplyTo, report(TH, E)} end, EndFun = fun({ReplyTo, {ok, _TH}}) -> ReplyTo ! WaitFor, ReplyTo end, @@ -658,9 +669,9 @@ start_trace_client(CollectorPid, Type, Parameters) -> {trace_client_pid, Pid}. trace_spec_wrapper(EventFun, EndFun, EventInitialAcc) - when function(EventFun), function(EndFun) -> + when is_function(EventFun), is_function(EndFun) -> {fun(Trace, Acc) -> - case Trace == end_of_trace of + case Trace =:= end_of_trace of true -> EndFun(Acc); false -> EventFun(Trace, Acc) end @@ -702,24 +713,24 @@ iterate(Handle, Prev, Limit) -> %% Acc = NewAcc = term() %%---------------------------------------------------------------------- -iterate(_, _, Limit, _, Acc) when Limit == 0 -> +iterate(_, _, Limit, _, Acc) when Limit =:= 0 -> Acc; -iterate(CollectorPid, Prev, Limit, Fun, Acc) when pid(CollectorPid) -> +iterate(CollectorPid, Prev, Limit, Fun, Acc) when is_pid(CollectorPid) -> case get_table_handle(CollectorPid) of - {ok, TH} when record(TH, table_handle) -> + {ok, TH} when is_record(TH, table_handle) -> iterate(TH, Prev, Limit, Fun, Acc); {error, Reason} -> exit(Reason) end; -iterate(TH, Prev, Limit, Fun, Acc) when record(TH, table_handle) -> +iterate(TH, Prev, Limit, Fun, Acc) when is_record(TH, table_handle) -> if - Limit == infinity -> + Limit =:= infinity -> next_iterate(TH, Prev, Limit, Fun, Acc); - integer(Limit), Limit > 0 -> + is_integer(Limit), Limit > 0 -> next_iterate(TH, Prev, Limit, Fun, Acc); - Limit == '-infinity' -> + Limit =:= '-infinity' -> prev_iterate(TH, Prev, Limit, Fun, Acc); - integer(Limit), Limit < 0 -> + is_integer(Limit), Limit < 0 -> prev_iterate(TH, Prev, Limit, Fun, Acc) end. @@ -793,7 +804,7 @@ prev_iterate(TH, Prev, Limit, Fun, Acc) -> lookup_and_apply(TH, Prev, Next, Limit, 1, Fun, Acc) end. -lookup_and_apply(TH, _Prev, Next, Limit, Incr, Fun, _Acc) when Fun == undefined -> +lookup_and_apply(TH, _Prev, Next, Limit, Incr, Fun, _Acc) when Fun =:= undefined -> Limit2 = incr(Limit, Incr), iterate(TH, Next, Limit2, Fun, Next); lookup_and_apply(TH, Prev, Next, Limit, Incr, Fun, Acc) -> @@ -801,17 +812,33 @@ lookup_and_apply(TH, Prev, Next, Limit, Incr, Fun, Acc) -> case catch ets:lookup_element(Tab, Next, 2) of {'EXIT', _} -> iterate(TH#table_handle.collector_pid, Prev, Limit, Fun, Acc); - E when record(E, event) -> + E when is_record(E, event) -> Acc2 = Fun(E, Acc), Limit2 = incr(Limit, Incr), iterate(TH, Next, Limit2, Fun, Acc2) end. +lookup(CollectorPid, Key) when is_pid(CollectorPid) -> + case get_table_handle(CollectorPid) of + {ok, TH} when is_record(TH, table_handle) -> + lookup(TH, Key); + {error, Reason} -> + {error, Reason} + end; +lookup(TH, Key) when is_record(TH, table_handle) -> + Tab = TH#table_handle.event_tab, + case catch ets:lookup_element(Tab, Key, 2) of + {'EXIT', _} -> + {error, enoent}; + E when is_record(E, event) -> + {ok, E} + end. + incr(Val, Incr) -> if - Val == infinity -> Val; - Val == '-infinity' -> Val; - integer(Val) -> Val + Incr + Val =:= infinity -> Val; + Val =:= '-infinity' -> Val; + is_integer(Val) -> Val + Incr end. %%---------------------------------------------------------------------- @@ -824,13 +851,18 @@ incr(Val, Incr) -> %% table_handle() = record(table_handle) %%---------------------------------------------------------------------- -clear_table(CollectorPid) when pid(CollectorPid) -> +clear_table(CollectorPid) when is_pid(CollectorPid) -> call(CollectorPid, clear_table); -clear_table(TH) when record(TH, table_handle) -> +clear_table(TH) when is_record(TH, table_handle) -> clear_table(TH#table_handle.collector_pid). call(CollectorPid, Request) -> - gen_server:call(CollectorPid, Request, infinity). + try + gen_server:call(CollectorPid, Request, infinity) + catch + exit:{noproc,_} -> + {error, no_collector} + end. %%%---------------------------------------------------------------------- %%% Callback functions from gen_server @@ -849,7 +881,7 @@ init([InitialS, Dict]) -> case InitialS#state.parent_pid of undefined -> ignore; - Pid when pid(Pid) -> + Pid when is_pid(Pid) -> link(Pid) end, Funs = [fun init_tables/1, @@ -860,7 +892,7 @@ init([InitialS, Dict]) -> init_tables(S) -> EventTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]), DictTab = ets:new(et_dict, [ordered_set, {keypos, 1}, public]), - S#state{event_tab = EventTab, dict_tab = DictTab}. + S#state{event_tab = EventTab, dict_tab = DictTab, event_tab_size = 0}. init_global(S) -> case S#state.trace_global of @@ -889,44 +921,53 @@ init_global(S) -> handle_call({multicast, Msg}, _From, S) -> do_multicast(S#state.subscribers, Msg), - {reply, ok, S}; + reply(ok, S); handle_call(Msg = {dict_insert, _Key, _Val}, _From, S) -> S2 = do_dict_insert(Msg, S), - {reply, ok, S2}; + reply(ok, S2); handle_call(Msg = {dict_delete, _Key}, _From, S) -> - S2 = do_dict_delete(Msg, S), - {reply, ok, S2}; - + try + S2 = do_dict_delete(Msg, S), + reply(ok, S2) + catch + throw:{stop, R} -> + opt_unlink(S#state.parent_pid), + {stop, R, S} + end; handle_call({dict_lookup, Key}, _From, S) -> Reply = ets:lookup(S#state.dict_tab, Key), - {reply, Reply, S}; + reply(Reply, S); handle_call({dict_match, Pattern}, _From, S) -> case catch ets:match_object(S#state.dict_tab, Pattern) of {'EXIT', _Reason} -> - {reply, [], S}; + reply([], S); Matching -> - {reply, Matching, S} + reply(Matching, S) end; handle_call(get_table_handle, _From, S) -> - [{_, TableFilter}] = ets:lookup(S#state.dict_tab, {filter, collector}), + [{_, TableFilter}] = ets:lookup(S#state.dict_tab, {filter, ?DEFAULT_FILTER_NAME}), TH = #table_handle{collector_pid = self(), event_tab = S#state.event_tab, event_order = S#state.event_order, filter = TableFilter}, - {reply, {ok, TH}, S}; + reply({ok, TH}, S); + +handle_call(get_table_size, _From, S) -> + Size = ets:info(S#state.event_tab, size), + reply({ok, Size}, S); handle_call(close, _From, S) -> case S#state.file of undefined -> - {reply, {error, file_not_open}, S}; + reply({error, file_not_open}, S); F -> Reply = disk_log:close(F#file.desc), S2 = S#state{file = undefined}, - {reply, Reply, S2} + reply(Reply, S2) end; handle_call({save_event_file, FileName, Options}, _From, S) -> Default = #file{name = FileName, @@ -934,7 +975,7 @@ handle_call({save_event_file, FileName, Options}, _From, S) -> file_opt = write, table_opt = keep}, case parse_file_options(Default, Options) of - {ok, F} when record(F, file) -> + {ok, F} when is_record(F, file) -> case file_open(F) of {ok, Fd} -> F2 = F#file{desc = Fd}, @@ -966,16 +1007,16 @@ handle_call({save_event_file, FileName, Options}, _From, S) -> end, case F2#file.table_opt of keep -> - {reply, Reply2, S3}; + reply(Reply2, S3); clear -> S4 = do_clear_table(S3), - {reply, Reply2, S4} + reply(Reply2, S4) end; {error, Reason} -> - {reply, {error, {file_open, Reason}}, S} + reply({error, {file_open, Reason}}, S) end; {error, Reason} -> - {reply, {error, Reason}, S} + reply({error, Reason}, S) end; handle_call({change_pattern, Pattern}, _From, S) -> @@ -983,11 +1024,11 @@ handle_call({change_pattern, Pattern}, _From, S) -> rpc:multicall(Ns, et_selector, change_pattern, [Pattern]), Reply = {old_pattern, S#state.trace_pattern}, S2 = S#state{trace_pattern = Pattern}, - {reply, Reply, S2}; + reply(Reply, S2); handle_call(clear_table, _From, S) -> S2 = do_clear_table(S), - {reply, ok, S2}; + reply(ok, S2); handle_call(stop, _From, S) -> do_multicast(S#state.subscribers, close), @@ -999,7 +1040,7 @@ handle_call(stop, _From, S) -> handle_call(Request, From, S) -> ok = error_logger:format("~p(~p): handle_call(~p, ~p, ~p)~n", [?MODULE, self(), Request, From, S]), - {reply, {error, {bad_request, Request}}, S}. + reply({error, {bad_request, Request}}, S). %%---------------------------------------------------------------------- %% Func: handle_cast/2 @@ -1011,7 +1052,7 @@ handle_call(Request, From, S) -> handle_cast(Msg, S) -> ok = error_logger:format("~p(~p): handle_cast(~p, ~p)~n", [?MODULE, self(), Msg, S]), - {noreply, S}. + noreply(S). %%---------------------------------------------------------------------- %% Func: handle_info/2 @@ -1020,54 +1061,67 @@ handle_cast(Msg, S) -> %% {stop, Reason, State} (terminate/2 is called) %%---------------------------------------------------------------------- +handle_info(timeout, S) -> + S2 = check_size(S), + noreply(S2); handle_info({nodeup, Node}, S) -> Port = S#state.trace_port, MaxQueue = S#state.trace_max_queue, case rpc:call(Node, ?MODULE, start_trace_port, [{Port, MaxQueue}]) of {ok, _} -> - listen_on_trace_port(Node, Port, S); - {error, Reason} when Reason == already_started-> + S2 = listen_on_trace_port(Node, Port, S), + noreply(S2); + {error, Reason} when Reason =:= already_started-> ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n ~p~n", [?MODULE, self(), Node, Port, Reason]), S2 = S#state{trace_port = Port + 1}, - {noreply, S2}; + noreply(S2); {badrpc, Reason} -> ok = error_logger:format("~p(~p): producer ignored(~p:~p):~n ~p~n", [?MODULE, self(), Node, Port, Reason]), S2 = S#state{trace_port = Port + 1}, - {noreply, S2}; + noreply(S2); {error, Reason} -> self() ! {nodeup, Node}, ok = error_logger:format("~p(~p): producer retry(~p:~p):~n ~p~n", [?MODULE, self(), Node, Port, Reason]), S2 = S#state{trace_port = Port + 1}, - {noreply, S2} + noreply(S2) end; handle_info({nodedown, Node}, S) -> - {noreply, S#state{trace_nodes = S#state.trace_nodes -- [Node]}}; + noreply(S#state{trace_nodes = S#state.trace_nodes -- [Node]}); handle_info({register_trace_client, Pid}, S) -> link(Pid), - {noreply, S}; + noreply(S); -handle_info({'EXIT', Pid, Reason}, S) when Pid == S#state.parent_pid -> +handle_info({'EXIT', Pid, Reason}, S) when Pid =:= S#state.parent_pid -> {stop, Reason, S}; -handle_info(Info = {'EXIT', Pid, _Reason}, S) -> - OldSubscribers = S#state.subscribers, +handle_info(Info = {'EXIT', Pid, Reason}, S) -> + OldSubscribers = S#state.subscribers, case lists:member(Pid, OldSubscribers) of - true -> - S2 = do_dict_delete({dict_delete, {subscriber, Pid}}, S), - {noreply, S2}; + true when Reason =:= shutdown -> + try + S2 = do_dict_delete({dict_delete, {subscriber, Pid}}, S), + noreply(S2) + catch + throw:{stop, R} -> + opt_unlink(S#state.parent_pid), + {stop, R, S} + end; + true -> + opt_unlink(S#state.parent_pid), + {stop, Reason, S}; false -> ok = error_logger:format("~p(~p): handle_info(~p, ~p)~n", [?MODULE, self(), Info, S]), - {noreply, S} + noreply(S) end; handle_info(Info, S) -> ok = error_logger:format("~p(~p): handle_info(~p, ~p)~n", [?MODULE, self(), Info, S]), - {noreply, S}. + noreply(S). listen_on_trace_port(Node, Port, S) -> [_Name, Host] = string:tokens(atom_to_list(Node), [$@]), @@ -1075,20 +1129,17 @@ listen_on_trace_port(Node, Port, S) -> {trace_client_pid, RemotePid} -> rpc:call(Node, et_selector, change_pattern, [S#state.trace_pattern]), link(RemotePid), - S2 = S#state{trace_nodes = [Node | S#state.trace_nodes], - trace_port = Port + 1}, - {noreply, S2}; - {'EXIT', Reason} when Reason == already_started-> + S#state{trace_nodes = [Node | S#state.trace_nodes], + trace_port = Port + 1}; + {'EXIT', Reason} when Reason =:= already_started-> ok = error_logger:format("~p(~p): consumer ignored(~p:~p): ~p~n", [?MODULE, self(), Node, Port, Reason]), - S2 = S#state{trace_port = Port + 1}, - {noreply, S2}; + S#state{trace_port = Port + 1}; {'EXIT', Reason} -> self() ! {nodeup, Node}, ok = error_logger:format("~p(~p): consumer retry(~p:~p):~n ~p~n", [?MODULE, self(), Node, Port, Reason]), - S2 = S#state{trace_port = Port + 1}, - {noreply, S2} + S#state{trace_port = Port + 1} end. %%---------------------------------------------------------------------- @@ -1120,7 +1171,7 @@ do_clear_table(S) -> NewTab = ets:new(et_events, [ordered_set, {keypos, 1}, public]), S#state{event_tab = NewTab}. -do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when pid(Pid) -> +do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when is_pid(Pid) -> OldSubscribers = S#state.subscribers, NewSubscribers = case lists:member(Pid, OldSubscribers) of @@ -1133,6 +1184,8 @@ do_dict_insert(Msg = {dict_insert, Key = {subscriber, Pid}, Val}, S) when pid(Pi [Pid | OldSubscribers] end, do_multicast(NewSubscribers, Msg), + Size = ets:info(S#state.event_tab, size), + do_multicast(NewSubscribers, {more_events, Size}), ets:insert(S#state.dict_tab, {Key, Val}), S#state{subscribers = NewSubscribers}; do_dict_insert(Msg = {dict_insert, Key, Val}, S) -> @@ -1147,11 +1200,18 @@ do_dict_delete(Msg = {dict_delete, Key = {subscriber, Pid}}, S) -> case lists:member(Pid, OldSubscribers) of true -> unlink(Pid), - S#state{subscribers = OldSubscribers -- [Pid]}; + S2 = S#state{subscribers = OldSubscribers -- [Pid]}, + if + S2#state.auto_shutdown, + S2#state.subscribers =:= [] -> + throw({stop, shutdown}); + true -> + S2 + end; false -> S end; -do_dict_delete({dict_delete, {filter, collector}}, S) -> +do_dict_delete({dict_delete, {filter, ?DEFAULT_FILTER_NAME}}, S) -> S; do_dict_delete(Msg = {dict_delete, Key}, S) -> do_multicast(S#state.subscribers, Msg), @@ -1202,3 +1262,33 @@ do_multicast([Pid | Pids], Msg) -> do_multicast(Pids, Msg); do_multicast([], _Msg) -> ok. + +opt_unlink(Pid) -> + if + Pid =:= undefined -> + ignore; + true -> + unlink(Pid) + end. + +reply(Reply, #state{subscribers = []} = S) -> + {reply, Reply, S}; +reply(Reply, S) -> + {reply, Reply, S, 500}. + +noreply(#state{subscribers = []} = S) -> + {noreply, S}; +noreply(S) -> + {noreply, S, 500}. + +check_size(S) -> + Size = ets:info(S#state.event_tab, size), + if + Size =:= S#state.event_tab_size -> + S; + true -> + %% Tell the subscribers that more events are available + Msg = {more_events, Size}, + do_multicast(S#state.subscribers, Msg), + S#state{event_tab_size = Size} + end. |