diff options
Diffstat (limited to 'lib/mnesia/src/mnesia_subscr.erl')
-rw-r--r-- | lib/mnesia/src/mnesia_subscr.erl | 494 |
1 files changed, 494 insertions, 0 deletions
diff --git a/lib/mnesia/src/mnesia_subscr.erl b/lib/mnesia/src/mnesia_subscr.erl new file mode 100644 index 0000000000..afd1704dec --- /dev/null +++ b/lib/mnesia/src/mnesia_subscr.erl @@ -0,0 +1,494 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1997-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +-module(mnesia_subscr). + +-behaviour(gen_server). + +-export([start/0, + set_debug_level/1, + subscribe/2, + unsubscribe/2, + unsubscribe_table/1, + subscribers/0, + report_table_event/4, + report_table_event/5, + report_table_event/6 + ]). + +%% gen_server callbacks +-export([init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 + ]). + +-include("mnesia.hrl"). + +-import(mnesia_lib, [error/2]). +-record(state, {supervisor, pid_tab}). + +start() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [self()], + [{timeout, infinity}]). + +set_debug_level(Level) -> + OldEnv = application:get_env(mnesia, debug), + case mnesia_monitor:patch_env(debug, Level) of + {error, Reason} -> + {error, Reason}; + NewLevel -> + set_debug_level(NewLevel, OldEnv) + end. + +set_debug_level(Level, OldEnv) -> + case mnesia:system_info(is_running) of + no when OldEnv == undefined -> + none; + no -> + {ok, E} = OldEnv, + E; + _ -> + Old = mnesia_lib:val(debug), + Local = mnesia:system_info(local_tables), + E = whereis(mnesia_event), + Sub = fun(Tab) -> subscribe(E, {table, Tab}) end, + UnSub = fun(Tab) -> unsubscribe(E, {table, Tab}) end, + + case Level of + none -> + lists:foreach(UnSub, Local); + verbose -> + lists:foreach(UnSub, Local); + debug -> + lists:foreach(UnSub, Local -- [schema]), + Sub(schema); + trace -> + lists:foreach(Sub, Local) + end, + mnesia_lib:set(debug, Level), + Old + end. + +subscribe(ClientPid, system) -> + change_subscr(activate, ClientPid, system); +subscribe(ClientPid, {table, Tab}) -> + change_subscr(activate, ClientPid, {table, Tab, simple}); +subscribe(ClientPid, {table, Tab, simple}) -> + change_subscr(activate, ClientPid, {table, Tab, simple}); +subscribe(ClientPid, {table, Tab, detailed}) -> + change_subscr(activate, ClientPid, {table, Tab, detailed}); +subscribe(_ClientPid, What) -> + {error, {badarg, What}}. + +unsubscribe(ClientPid, system) -> + change_subscr(deactivate, ClientPid, system); +unsubscribe(ClientPid, {table, Tab}) -> + change_subscr(deactivate, ClientPid, {table, Tab, simple}); +unsubscribe(ClientPid, {table, Tab, simple}) -> + change_subscr(deactivate, ClientPid, {table, Tab, simple}); +unsubscribe(ClientPid, {table, Tab, detailed}) -> + change_subscr(deactivate, ClientPid, {table, Tab, detailed}); +unsubscribe(_ClientPid, What) -> + {error, {badarg, What}}. + +unsubscribe_table(Tab) -> + call({change, {deactivate_table, Tab}}). + +change_subscr(Kind, ClientPid, What) -> + call({change, {Kind, ClientPid, What}}). + +subscribers() -> + [whereis(mnesia_event) | mnesia_lib:val(subscribers)]. + +report_table_event(Tab, Tid, Obj, Op) -> + case ?catch_val({Tab, commit_work}) of + {'EXIT', _} -> ok; + Commit -> + case lists:keysearch(subscribers, 1, Commit) of + false -> ok; + {value, Subs} -> + report_table_event(Subs, Tab, Tid, Obj, Op, undefined) + end + end. + +%% Backwards compatible for the moment when mnesia_tm get's updated! +report_table_event(Subscr, Tab, Tid, Obj, Op) -> + report_table_event(Subscr, Tab, Tid, Obj, Op, undefined). + +report_table_event({subscribers, S1, S2}, Tab, Tid, _Obj, clear_table, _Old) -> + What = {delete, {schema, Tab}, Tid}, + deliver(S1, {mnesia_table_event, What}), + TabDef = mnesia_schema:cs2list(?catch_val({Tab, cstruct})), + What2 = {write, {schema, Tab, TabDef}, Tid}, + deliver(S1, {mnesia_table_event, What2}), + What3 = {delete, schema, {schema, Tab}, [{schema, Tab, TabDef}], Tid}, + deliver(S2, {mnesia_table_event, What3}), + What4 = {write, schema, {schema, Tab, TabDef}, [], Tid}, + deliver(S2, {mnesia_table_event, What4}); + +report_table_event({subscribers, Subscr, []}, Tab, Tid, Obj, Op, _Old) -> + What = {Op, patch_record(Tab, Obj), Tid}, + deliver(Subscr, {mnesia_table_event, What}); + +report_table_event({subscribers, S1, S2}, Tab, Tid, Obj, Op, Old) -> + Standard = {Op, patch_record(Tab, Obj), Tid}, + deliver(S1, {mnesia_table_event, Standard}), + Extended = what(Tab, Tid, Obj, Op, Old), + deliver(S2, Extended); + +%% Backwards compatible for the moment when mnesia_tm get's updated! +report_table_event({subscribers, Subscr}, Tab, Tid, Obj, Op, Old) -> + report_table_event({subscribers, Subscr, []}, Tab, Tid, Obj, Op, Old). + + +patch_record(Tab, Obj) -> + case Tab == element(1, Obj) of + true -> + Obj; + false -> + setelement(1, Obj, Tab) + end. + +what(Tab, Tid, {RecName, Key}, delete, undefined) -> + case catch mnesia_lib:db_get(Tab, Key) of + Old when is_list(Old) -> %% Op only allowed for set table. + {mnesia_table_event, {delete, Tab, {RecName, Key}, Old, Tid}}; + _ -> + %% Record just deleted by a dirty_op or + %% the whole table has been deleted + ignore + end; +what(Tab, Tid, Obj, delete, Old) -> + {mnesia_table_event, {delete, Tab, Obj, Old, Tid}}; +what(Tab, Tid, Obj, delete_object, _Old) -> + {mnesia_table_event, {delete, Tab, Obj, [Obj], Tid}}; +what(Tab, Tid, Obj, write, undefined) -> + case catch mnesia_lib:db_get(Tab, element(2, Obj)) of + Old when is_list(Old) -> + {mnesia_table_event, {write, Tab, Obj, Old, Tid}}; + {'EXIT', _} -> + ignore + end. + +deliver(_, ignore) -> + ok; +deliver([Pid | Pids], Msg) -> + Pid ! Msg, + deliver(Pids, Msg); +deliver([], _Msg) -> + ok. + +call(Msg) -> + Pid = whereis(?MODULE), + case Pid of + undefined -> + {error, {node_not_running, node()}}; + Pid -> + Res = gen_server:call(Pid, Msg, infinity), + %% We get an exit signal if server dies + receive + {'EXIT', _Pid, _Reason} -> + {error, {node_not_running, node()}} + after 0 -> + Res + end + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% Callback functions from gen_server + +%%---------------------------------------------------------------------- +%% Func: init/1 +%% Returns: {ok, State} | +%% {ok, State, Timeout} | +%% {stop, Reason} +%%---------------------------------------------------------------------- +init([Parent]) -> + process_flag(trap_exit, true), + ClientPid = whereis(mnesia_event), + link(ClientPid), + mnesia_lib:verbose("~p starting: ~p~n", [?MODULE, self()]), + Tab = ?ets_new_table(mnesia_subscr, [duplicate_bag, private]), + ?ets_insert(Tab, {ClientPid, system}), + {ok, #state{supervisor = Parent, pid_tab = Tab}}. + +%%---------------------------------------------------------------------- +%% Func: handle_call/3 +%% Returns: {reply, Reply, State} | +%% {reply, Reply, State, Timeout} | +%% {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, Reply, State} | (terminate/2 is called) +%%---------------------------------------------------------------------- +handle_call({change, How}, _From, State) -> + Reply = do_change(How, State#state.pid_tab), + {reply, Reply, State}; + +handle_call(Msg, _From, State) -> + error("~p got unexpected call: ~p~n", [?MODULE, Msg]), + {noreply, State}. + +%%---------------------------------------------------------------------- +%% Func: handle_cast/2 +%% Returns: {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} (terminate/2 is called) +%%---------------------------------------------------------------------- +handle_cast(Msg, State) -> + error("~p got unexpected cast: ~p~n", [?MODULE, Msg]), + {noreply, State}. + +%%---------------------------------------------------------------------- +%% Func: handle_info/2 +%% Returns: {noreply, State} | +%% {noreply, State, Timeout} | +%% {stop, Reason, State} (terminate/2 is called) +%%---------------------------------------------------------------------- + +handle_info({'EXIT', Pid, _R}, State) when Pid == State#state.supervisor -> + {stop, shutdown, State}; + +handle_info({'EXIT', Pid, _Reason}, State) -> + handle_exit(Pid, State#state.pid_tab), + {noreply, State}; + +handle_info(Msg, State) -> + error("~p got unexpected info: ~p~n", [?MODULE, Msg]), + {noreply, State}. + +%%---------------------------------------------------------------------- +%% Func: terminate/2 +%% Purpose: Shutdown the server +%% Returns: any (ignored by gen_server) +%%---------------------------------------------------------------------- +terminate(Reason, State) -> + prepare_stop(State#state.pid_tab), + mnesia_monitor:terminate_proc(?MODULE, Reason, State). + +%%---------------------------------------------------------------------- +%% Func: code_change/3 +%% Purpose: Upgrade process when its code is to be changed +%% Returns: {ok, NewState} +%%---------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%---------------------------------------------------------------------- +%%% Internal functions +%%%---------------------------------------------------------------------- + +do_change({activate, ClientPid, system}, SubscrTab) when is_pid(ClientPid) -> + Var = subscribers, + activate(ClientPid, system, Var, subscribers(), SubscrTab); +do_change({activate, ClientPid, {table, Tab, How}}, SubscrTab) when is_pid(ClientPid) -> + case ?catch_val({Tab, where_to_read}) of + Node when Node == node() -> + Var = {Tab, commit_work}, + activate(ClientPid, {table, Tab, How}, Var, mnesia_lib:val(Var), SubscrTab); + {'EXIT', _} -> + {error, {no_exists, Tab}}; + _Node -> + {error, {not_active_local, Tab}} + end; +do_change({deactivate, ClientPid, system}, SubscrTab) -> + Var = subscribers, + deactivate(ClientPid, system, Var, SubscrTab); +do_change({deactivate, ClientPid, {table, Tab, How}}, SubscrTab) -> + Var = {Tab, commit_work}, + deactivate(ClientPid, {table, Tab, How}, Var, SubscrTab); +do_change({deactivate_table, Tab}, SubscrTab) -> + Var = {Tab, commit_work}, + case ?catch_val(Var) of + {'EXIT', _} -> + {error, {no_exists, Tab}}; + CommitWork -> + case lists:keysearch(subscribers, 1, CommitWork) of + false -> + ok; + {value, Subs} -> + Simple = {table, Tab, simple}, + Detailed = {table, Tab, detailed}, + Fs = fun(C) -> deactivate(C, Simple, Var, SubscrTab) end, + Fd = fun(C) -> deactivate(C, Detailed, Var, SubscrTab) end, + case Subs of + {subscribers, L1, L2} -> + lists:foreach(Fs, L1), + lists:foreach(Fd, L2); + {subscribers, L1} -> + lists:foreach(Fs, L1) + end + end, + {ok, node()} + end; +do_change(_, _) -> + {error, badarg}. + +activate(ClientPid, What, Var, OldSubscribers, SubscrTab) -> + Old = + if Var == subscribers -> + OldSubscribers; + true -> + case lists:keysearch(subscribers, 1, OldSubscribers) of + false -> []; + {value, Subs} -> + case Subs of + {subscribers, L1, L2} -> + L1 ++ L2; + {subscribers, L1} -> + L1 + end + end + end, + case lists:member(ClientPid, Old) of + false -> + %% Don't care about checking old links + case catch link(ClientPid) of + true -> + ?ets_insert(SubscrTab, {ClientPid, What}), + add_subscr(Var, What, ClientPid), + {ok, node()}; + {'EXIT', _Reason} -> + {error, {no_exists, ClientPid}} + end; + true -> + {error, {already_exists, What}} + end. + +%%-record(subscribers, {pids = []}). Old subscriber record removed +%% To solve backward compatibility, this code is a cludge.. +add_subscr(subscribers, _What, Pid) -> + mnesia_lib:add(subscribers, Pid), + {ok, node()}; +add_subscr({Tab, commit_work}, What, Pid) -> + Commit = mnesia_lib:val({Tab, commit_work}), + case lists:keysearch(subscribers, 1, Commit) of + false -> + Subscr = + case What of + {table, _, simple} -> + {subscribers, [Pid], []}; + {table, _, detailed} -> + {subscribers, [], [Pid]} + end, + mnesia_lib:add({Tab, subscribers}, Pid), + mnesia_lib:set({Tab, commit_work}, + mnesia_lib:sort_commit([Subscr | Commit])); + {value, Old} -> + {L1, L2} = + case Old of + {subscribers, L} -> %% Old Way + {L, []}; + {subscribers, SL1, SL2} -> + {SL1, SL2} + end, + Subscr = + case What of + {table, _, simple} -> + {subscribers, [Pid | L1], L2}; + {table, _, detailed} -> + {subscribers, L1, [Pid | L2]} + end, + NewC = lists:keyreplace(subscribers, 1, Commit, Subscr), + mnesia_lib:set({Tab, commit_work}, + mnesia_lib:sort_commit(NewC)), + mnesia_lib:add({Tab, subscribers}, Pid) + end. + +deactivate(ClientPid, What, Var, SubscrTab) -> + ?ets_match_delete(SubscrTab, {ClientPid, What}), + case catch ?ets_lookup_element(SubscrTab, ClientPid, 1) of + List when is_list(List) -> + ignore; + {'EXIT', _} -> + unlink(ClientPid) + end, + del_subscr(Var, What, ClientPid), + {ok, node()}. + +del_subscr(subscribers, _What, Pid) -> + mnesia_lib:del(subscribers, Pid); +del_subscr({Tab, commit_work}, What, Pid) -> + Commit = mnesia_lib:val({Tab, commit_work}), + case lists:keysearch(subscribers, 1, Commit) of + false -> + false; + {value, Old} -> + {L1, L2} = + case Old of + {subscribers, L} -> %% Old Way + {L, []}; + {subscribers, SL1, SL2} -> + {SL1, SL2} + end, + Subscr = + case What of %% Ignore user error delete subscr from any list + {table, _, simple} -> + NewL1 = lists:delete(Pid, L1), + NewL2 = lists:delete(Pid, L2), + {subscribers, NewL1, NewL2}; + {table, _, detailed} -> + NewL1 = lists:delete(Pid, L1), + NewL2 = lists:delete(Pid, L2), + {subscribers, NewL1, NewL2} + end, + case Subscr of + {subscribers, [], []} -> + NewC = lists:keydelete(subscribers, 1, Commit), + mnesia_lib:del({Tab, subscribers}, Pid), + mnesia_lib:set({Tab, commit_work}, + mnesia_lib:sort_commit(NewC)); + _ -> + NewC = lists:keyreplace(subscribers, 1, Commit, Subscr), + mnesia_lib:del({Tab, subscribers}, Pid), + mnesia_lib:set({Tab, commit_work}, + mnesia_lib:sort_commit(NewC)) + end + end. + +handle_exit(ClientPid, SubscrTab) -> + do_handle_exit(?ets_lookup(SubscrTab, ClientPid)), + ?ets_delete(SubscrTab, ClientPid). + +do_handle_exit([{ClientPid, What} | Tail]) -> + case What of + system -> + del_subscr(subscribers, What, ClientPid); + {_, Tab, _Level} -> + del_subscr({Tab, commit_work}, What, ClientPid) + end, + do_handle_exit(Tail); +do_handle_exit([]) -> + ok. + +prepare_stop(SubscrTab) -> + mnesia_lib:report_system_event({mnesia_down, node()}), + do_prepare_stop(?ets_first(SubscrTab), SubscrTab). + +do_prepare_stop('$end_of_table', _SubscrTab) -> + ok; +do_prepare_stop(ClientPid, SubscrTab) -> + Next = ?ets_next(SubscrTab, ClientPid), + handle_exit(ClientPid, SubscrTab), + unlink(ClientPid), + do_prepare_stop(Next, SubscrTab). + |