From 999f1b99b295a69d3c505b601f14554acf4522d3 Mon Sep 17 00:00:00 2001 From: Bernard Duggan Date: Fri, 12 Feb 2010 11:07:37 +1100 Subject: Add mnesia activity subscription message A process that calls mnesia:subscribe(activity) will receive the message: {mnesia_activity_event, ActivityID, complete} when any activity that caused a change to a database has finished committing its changes. This allows a subscriber to collect messages already available through the mnesia:subscribe({table, ...}) system to group them as completed transactions. --- lib/mnesia/src/mnesia_monitor.erl | 1 + lib/mnesia/src/mnesia_subscr.erl | 41 ++++++++++++++++++++++++++++++++------- lib/mnesia/src/mnesia_tm.erl | 4 +++- 3 files changed, 38 insertions(+), 8 deletions(-) (limited to 'lib/mnesia/src') diff --git a/lib/mnesia/src/mnesia_monitor.erl b/lib/mnesia/src/mnesia_monitor.erl index 5df5df4969..5bd93d6b9b 100644 --- a/lib/mnesia/src/mnesia_monitor.erl +++ b/lib/mnesia/src/mnesia_monitor.erl @@ -256,6 +256,7 @@ init([Parent]) -> ?ets_new_table(mnesia_gvar, [set, public, named_table]), ?ets_new_table(mnesia_stats, [set, public, named_table]), set(subscribers, []), + set(activity_subscribers, []), mnesia_lib:verbose("~p starting: ~p~n", [?MODULE, self()]), Version = mnesia:system_info(version), set(version, Version), diff --git a/lib/mnesia/src/mnesia_subscr.erl b/lib/mnesia/src/mnesia_subscr.erl index afd1704dec..93d4a86f7f 100644 --- a/lib/mnesia/src/mnesia_subscr.erl +++ b/lib/mnesia/src/mnesia_subscr.erl @@ -1,19 +1,19 @@ %% %% %CopyrightBegin% -%% -%% Copyright Ericsson AB 1997-2009. All Rights Reserved. -%% +%% +%% Copyright Ericsson AB 1997-2010. All Rights Reserved. +%% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. -%% +%% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. -%% +%% %% %CopyrightEnd% %% @@ -30,7 +30,8 @@ subscribers/0, report_table_event/4, report_table_event/5, - report_table_event/6 + report_table_event/6, + report_activity/1 ]). %% gen_server callbacks @@ -91,6 +92,8 @@ set_debug_level(Level, OldEnv) -> subscribe(ClientPid, system) -> change_subscr(activate, ClientPid, system); +subscribe(ClientPid, activity) -> + change_subscr(activate, ClientPid, activity); subscribe(ClientPid, {table, Tab}) -> change_subscr(activate, ClientPid, {table, Tab, simple}); subscribe(ClientPid, {table, Tab, simple}) -> @@ -102,6 +105,8 @@ subscribe(_ClientPid, What) -> unsubscribe(ClientPid, system) -> change_subscr(deactivate, ClientPid, system); +unsubscribe(ClientPid, activity) -> + change_subscr(deactivate, ClientPid, activity); unsubscribe(ClientPid, {table, Tab}) -> change_subscr(deactivate, ClientPid, {table, Tab, simple}); unsubscribe(ClientPid, {table, Tab, simple}) -> @@ -120,6 +125,15 @@ change_subscr(Kind, ClientPid, What) -> subscribers() -> [whereis(mnesia_event) | mnesia_lib:val(subscribers)]. +report_activity({dirty, _pid}) -> + ok; +report_activity(Tid) -> + case ?catch_val(activity_subscribers) of + {'EXIT', _} -> ok; + Subscribers -> + deliver(Subscribers, {mnesia_activity_event, {complete, Tid}}) + end. + report_table_event(Tab, Tid, Obj, Op) -> case ?catch_val({Tab, commit_work}) of {'EXIT', _} -> ok; @@ -300,6 +314,9 @@ code_change(_OldVsn, State, _Extra) -> do_change({activate, ClientPid, system}, SubscrTab) when is_pid(ClientPid) -> Var = subscribers, activate(ClientPid, system, Var, subscribers(), SubscrTab); +do_change({activate, ClientPid, activity}, SubscrTab) when is_pid(ClientPid) -> + Var = activity_subscribers, + activate(ClientPid, activity, Var, mnesia_lib:val(Var), SubscrTab); do_change({activate, ClientPid, {table, Tab, How}}, SubscrTab) when is_pid(ClientPid) -> case ?catch_val({Tab, where_to_read}) of Node when Node == node() -> @@ -313,6 +330,9 @@ do_change({activate, ClientPid, {table, Tab, How}}, SubscrTab) when is_pid(Clien do_change({deactivate, ClientPid, system}, SubscrTab) -> Var = subscribers, deactivate(ClientPid, system, Var, SubscrTab); +do_change({deactivate, ClientPid, activity}, SubscrTab) -> + Var = activity_subscribers, + deactivate(ClientPid, activity, Var, SubscrTab); do_change({deactivate, ClientPid, {table, Tab, How}}, SubscrTab) -> Var = {Tab, commit_work}, deactivate(ClientPid, {table, Tab, How}, Var, SubscrTab); @@ -345,7 +365,7 @@ do_change(_, _) -> activate(ClientPid, What, Var, OldSubscribers, SubscrTab) -> Old = - if Var == subscribers -> + if Var == subscribers orelse Var == activity_subscribers -> OldSubscribers; true -> case lists:keysearch(subscribers, 1, OldSubscribers) of @@ -379,6 +399,9 @@ activate(ClientPid, What, Var, OldSubscribers, SubscrTab) -> add_subscr(subscribers, _What, Pid) -> mnesia_lib:add(subscribers, Pid), {ok, node()}; +add_subscr(activity_subscribers, _What, Pid) -> + mnesia_lib:add(activity_subscribers, Pid), + {ok, node()}; add_subscr({Tab, commit_work}, What, Pid) -> Commit = mnesia_lib:val({Tab, commit_work}), case lists:keysearch(subscribers, 1, Commit) of @@ -427,6 +450,8 @@ deactivate(ClientPid, What, Var, SubscrTab) -> del_subscr(subscribers, _What, Pid) -> mnesia_lib:del(subscribers, Pid); +del_subscr(activity_subscribers, _What, Pid) -> + mnesia_lib:del(activity_subscribers, Pid); del_subscr({Tab, commit_work}, What, Pid) -> Commit = mnesia_lib:val({Tab, commit_work}), case lists:keysearch(subscribers, 1, Commit) of @@ -473,6 +498,8 @@ do_handle_exit([{ClientPid, What} | Tail]) -> case What of system -> del_subscr(subscribers, What, ClientPid); + activity -> + del_subscr(activity_subscribers, What, ClientPid); {_, Tab, _Level} -> del_subscr({Tab, commit_work}, What, ClientPid) end, diff --git a/lib/mnesia/src/mnesia_tm.erl b/lib/mnesia/src/mnesia_tm.erl index d42109c3da..f3ffac5493 100644 --- a/lib/mnesia/src/mnesia_tm.erl +++ b/lib/mnesia/src/mnesia_tm.erl @@ -1733,7 +1733,9 @@ do_commit(Tid, C, DumperMode) -> R = do_snmp(Tid, C#commit.snmp), R2 = do_update(Tid, ram_copies, C#commit.ram_copies, R), R3 = do_update(Tid, disc_copies, C#commit.disc_copies, R2), - do_update(Tid, disc_only_copies, C#commit.disc_only_copies, R3). + R4 = do_update(Tid, disc_only_copies, C#commit.disc_only_copies, R3), + mnesia_subscr:report_activity(Tid), + R4. %% Update the items do_update(Tid, Storage, [Op | Ops], OldRes) -> -- cgit v1.2.3