aboutsummaryrefslogblamecommitdiffstats
path: root/lib/mnesia/src/mnesia_subscr.erl
blob: 9cf955b4d3fd29788a4dfeaab400633c12d0e8c9 (plain) (tree)
1
2
3
4
5
6
7
8
9
10

                   
  
                                                        
  




                                                                      
  



                                                                         
  















                               

                              










                       

                                     
















































                                                                 

                                                 










                                                               

                                                   

















                                                                 








                                                                          

















































                                                                               




                                                                         







                                                         



                                                             
                  


                                                      














































































































                                                                             


                                                                               












                                                                                        


                                                          































                                                                               
                                                                   















                                                                       
                                  


                                                              

                                










                                                                   


                                               




































                                                                      



                                                     
        





                                         


                                      

                                               













































                                                                             

                                                              


















                                                               
%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1997-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%

%%
-module(mnesia_subscr).

-behaviour(gen_server).

-export([start/0,
	 set_debug_level/1,
	 subscribe/2,
	 unsubscribe/2,
	 unsubscribe_table/1,
	 subscribers/0,
	 report_table_event/4,
	 report_table_event/5, 
	 report_table_event/6,
	 report_activity/1
	]).

%% gen_server callbacks
-export([init/1,
	 handle_call/3,
	 handle_cast/2,
	 handle_info/2,
	 terminate/2,
	 code_change/3
	]).

-compile({no_auto_import,[error/2]}).

-include("mnesia.hrl").

-import(mnesia_lib, [error/2]).
-record(state, {supervisor, pid_tab}).

start() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [self()],
			  [{timeout, infinity}]).

set_debug_level(Level) ->
    OldEnv = application:get_env(mnesia, debug),
    case mnesia_monitor:patch_env(debug, Level) of
	{error, Reason} ->
	    {error, Reason};
	NewLevel ->
	    set_debug_level(NewLevel, OldEnv)
    end.

set_debug_level(Level, OldEnv) ->
    case mnesia:system_info(is_running) of
	no when OldEnv == undefined ->
	    none;
	no ->
	    {ok, E} = OldEnv,
	    E;
	_ ->
	    Old = mnesia_lib:val(debug),
	    Local = mnesia:system_info(local_tables),
	    E = whereis(mnesia_event),
	    Sub = fun(Tab) -> subscribe(E, {table, Tab}) end,
	    UnSub = fun(Tab) -> unsubscribe(E, {table, Tab}) end,
	
	    case Level of
		none ->
		    lists:foreach(UnSub, Local);
		verbose ->
		    lists:foreach(UnSub, Local);
		debug ->
		    lists:foreach(UnSub, Local -- [schema]),
		    Sub(schema);
		trace ->
		    lists:foreach(Sub, Local)
	    end,
	    mnesia_lib:set(debug, Level),
	    Old
    end.

subscribe(ClientPid, system) ->
    change_subscr(activate, ClientPid, system);
subscribe(ClientPid, activity) ->
    change_subscr(activate, ClientPid, activity);
subscribe(ClientPid, {table, Tab}) ->
    change_subscr(activate, ClientPid, {table, Tab, simple});
subscribe(ClientPid, {table, Tab, simple}) ->
    change_subscr(activate, ClientPid, {table, Tab, simple});
subscribe(ClientPid, {table, Tab, detailed}) ->
    change_subscr(activate, ClientPid, {table, Tab, detailed});
subscribe(_ClientPid, What) ->
    {error, {badarg, What}}.

unsubscribe(ClientPid, system) ->
    change_subscr(deactivate, ClientPid, system);
unsubscribe(ClientPid, activity) ->
    change_subscr(deactivate, ClientPid, activity);
unsubscribe(ClientPid, {table, Tab}) ->
    change_subscr(deactivate, ClientPid, {table, Tab, simple});
unsubscribe(ClientPid, {table, Tab, simple}) ->
    change_subscr(deactivate, ClientPid, {table, Tab, simple});
unsubscribe(ClientPid, {table, Tab, detailed}) ->
    change_subscr(deactivate, ClientPid, {table, Tab, detailed});
unsubscribe(_ClientPid, What) ->
    {error, {badarg, What}}.

unsubscribe_table(Tab) ->
    call({change, {deactivate_table, Tab}}).

change_subscr(Kind, ClientPid, What) ->
    call({change, {Kind, ClientPid, What}}).

subscribers() ->
    [whereis(mnesia_event) | mnesia_lib:val(subscribers)].

report_activity({dirty, _pid}) -> 
    ok;
report_activity(Tid) ->
    case ?catch_val(activity_subscribers) of
	{'EXIT', _} -> ok;
	Subscribers ->
	    deliver(Subscribers, {mnesia_activity_event, {complete, Tid}})
	end.

report_table_event(Tab, Tid, Obj, Op) ->   
    case ?catch_val({Tab, commit_work}) of
	{'EXIT', _} -> ok;
	Commit ->
	    case lists:keysearch(subscribers, 1, Commit) of
		false -> ok;
		{value, Subs} -> 
		    report_table_event(Subs, Tab, Tid, Obj, Op, undefined)
	    end
    end.

%% Backwards compatible for the moment when mnesia_tm get's updated!
report_table_event(Subscr, Tab, Tid, Obj, Op) ->
    report_table_event(Subscr, Tab, Tid, Obj, Op, undefined).

report_table_event({subscribers, S1, S2}, Tab, Tid, _Obj, clear_table, _Old) ->
    What   = {delete, {schema, Tab}, Tid},
    deliver(S1, {mnesia_table_event, What}),
    TabDef = mnesia_schema:cs2list(?catch_val({Tab, cstruct})),
    What2  = {write, {schema, Tab, TabDef}, Tid},
    deliver(S1, {mnesia_table_event, What2}),
    What3  = {delete, schema, {schema, Tab}, [{schema, Tab, TabDef}], Tid},
    deliver(S2, {mnesia_table_event, What3}),
    What4  = {write, schema,  {schema, Tab, TabDef}, [], Tid},
    deliver(S2, {mnesia_table_event, What4});

report_table_event({subscribers, Subscr, []}, Tab, Tid, Obj, Op, _Old) ->
    What = {Op, patch_record(Tab, Obj), Tid},
    deliver(Subscr, {mnesia_table_event, What});

report_table_event({subscribers, S1, S2}, Tab, Tid, Obj, Op, Old) ->
    Standard = {Op, patch_record(Tab, Obj), Tid},
    deliver(S1, {mnesia_table_event, Standard}), 
    Extended = what(Tab, Tid, Obj, Op, Old), 
    deliver(S2, Extended);

%% Backwards compatible for the moment when mnesia_tm get's updated!
report_table_event({subscribers, Subscr}, Tab, Tid, Obj, Op, Old) ->    
    report_table_event({subscribers, Subscr, []}, Tab, Tid, Obj, Op, Old).


patch_record(Tab, Obj) ->
    case Tab == element(1, Obj) of
	true -> 
	    Obj;
	false ->
	    setelement(1, Obj, Tab)
    end.

what(Tab, Tid, {RecName, Key}, delete, undefined) ->
    try mnesia_lib:db_get(Tab, Key) of
	Old -> %% Op only allowed for set table.
	    {mnesia_table_event, {delete, Tab, {RecName, Key}, Old, Tid}}
    catch error:_ ->
	    %% Record just deleted by a dirty_op or
	    %% the whole table has been deleted
	    ignore
    end;
what(Tab, Tid, Obj, delete, Old) ->
    {mnesia_table_event, {delete, Tab, Obj, Old, Tid}};
what(Tab, Tid, Obj, delete_object, _Old) ->
    {mnesia_table_event, {delete, Tab, Obj, [Obj], Tid}};
what(Tab, Tid, Obj, write, undefined) ->
    try	mnesia_lib:db_get(Tab, element(2, Obj)) of
	Old ->
	    {mnesia_table_event, {write, Tab, Obj, Old, Tid}}
    catch error:_ ->
	    ignore
    end;
what(Tab, Tid, Obj, write, Old) ->
    {mnesia_table_event, {write, Tab, Obj, Old, Tid}}.

deliver(_, ignore) -> 
    ok;
deliver([Pid | Pids], Msg) ->
    Pid ! Msg,
    deliver(Pids, Msg);
deliver([], _Msg) ->
    ok.

call(Msg) ->
    Pid = whereis(?MODULE),
    case Pid of
	undefined ->
	    {error, {node_not_running, node()}};
	Pid ->
	    Res = gen_server:call(Pid, Msg, infinity),
            %% We get an exit signal if server dies
            receive
                {'EXIT', _Pid, _Reason} ->
                    {error, {node_not_running, node()}}
            after 0 ->
                    Res
            end
    end.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Callback functions from gen_server

%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State}          |
%%          {ok, State, Timeout} |
%%          {stop, Reason}
%%----------------------------------------------------------------------
init([Parent]) ->
    process_flag(trap_exit, true),
    ClientPid = whereis(mnesia_event),
    link(ClientPid),
    mnesia_lib:verbose("~p starting: ~p~n", [?MODULE, self()]),
    Tab = ?ets_new_table(mnesia_subscr, [duplicate_bag, private]),
    ?ets_insert(Tab, {ClientPid, system}),
    {ok, #state{supervisor = Parent, pid_tab = Tab}}.

%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State}          |
%%          {reply, Reply, State, Timeout} |
%%          {noreply, State}               |
%%          {noreply, State, Timeout}      |
%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call({change, How}, _From, State) ->
    Reply = do_change(How, State#state.pid_tab),
    {reply, Reply, State};

handle_call(Msg, _From, State) ->
    error("~p got unexpected call: ~p~n", [?MODULE, Msg]),
    {noreply, State}.

%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%----------------------------------------------------------------------
handle_cast(Msg, State) ->
    error("~p got unexpected cast: ~p~n", [?MODULE, Msg]),
    {noreply, State}.

%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%----------------------------------------------------------------------

handle_info({'EXIT', Pid, _R}, State) when Pid == State#state.supervisor ->
    {stop, shutdown, State};

handle_info({'EXIT', Pid, _Reason}, State) ->
    handle_exit(Pid, State#state.pid_tab),
    {noreply, State};

handle_info(Msg, State) ->
    error("~p got unexpected info: ~p~n", [?MODULE, Msg]),
    {noreply, State}.

%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(Reason, State) ->
    prepare_stop(State#state.pid_tab),
    mnesia_monitor:terminate_proc(?MODULE, Reason, State).

%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Upgrade process when its code is to be changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------

do_change({activate, ClientPid, system}, SubscrTab) when is_pid(ClientPid) ->
    Var = subscribers,
    activate(ClientPid, system, Var, subscribers(), SubscrTab);
do_change({activate, ClientPid, activity}, SubscrTab) when is_pid(ClientPid) ->
    Var = activity_subscribers,
    activate(ClientPid, activity, Var, mnesia_lib:val(Var), SubscrTab);
do_change({activate, ClientPid, {table, Tab, How}}, SubscrTab) when is_pid(ClientPid) ->
    case ?catch_val({Tab, where_to_read}) of
	Node when Node == node() ->
	    Var = {Tab, commit_work},
	    activate(ClientPid, {table, Tab, How}, Var, mnesia_lib:val(Var), SubscrTab);
	{'EXIT', _} ->
	    {error, {no_exists, Tab}};
	_Node ->
	    {error, {not_active_local, Tab}}
    end;
do_change({deactivate, ClientPid, system}, SubscrTab) ->
    Var = subscribers,
    deactivate(ClientPid, system, Var, SubscrTab);
do_change({deactivate, ClientPid, activity}, SubscrTab) ->
    Var = activity_subscribers,
    deactivate(ClientPid, activity, Var, SubscrTab);
do_change({deactivate, ClientPid, {table, Tab, How}}, SubscrTab) ->
    Var = {Tab, commit_work},
    deactivate(ClientPid, {table, Tab, How}, Var, SubscrTab);
do_change({deactivate_table, Tab}, SubscrTab) ->
    Var = {Tab, commit_work},
    case ?catch_val(Var) of
	{'EXIT', _} ->
	    {error, {no_exists, Tab}};
	CommitWork ->
	    case lists:keysearch(subscribers, 1, CommitWork) of
		false ->
		    ok;
		{value, Subs} -> 
		    Simple   = {table, Tab, simple}, 
		    Detailed = {table, Tab, detailed}, 
		    Fs = fun(C) -> deactivate(C, Simple, Var, SubscrTab) end,
		    Fd = fun(C) -> deactivate(C, Detailed, Var, SubscrTab) end,
		    case Subs of
			{subscribers, L1, L2} -> 
			    lists:foreach(Fs, L1),
			    lists:foreach(Fd, L2);
			{subscribers, L1} ->
			    lists:foreach(Fs, L1)
		    end
	    end,
	    {ok, node()}
    end;
do_change(_, _) ->
    {error, badarg}.

activate(ClientPid, What, Var, OldSubscribers, SubscrTab) ->
    Old = 
	if Var == subscribers orelse Var == activity_subscribers ->
		OldSubscribers;
	   true -> 
		case lists:keysearch(subscribers, 1, OldSubscribers) of
		    false -> [];
		{value, Subs} -> 
			case Subs of
			    {subscribers, L1, L2} -> 
				L1 ++ L2;
			    {subscribers, L1} ->
				L1
			end
		end
	end,
    case lists:member(ClientPid, Old) of
	false ->
	    %% Don't care about checking old links
	    try link(ClientPid) of
		true ->
		    ?ets_insert(SubscrTab, {ClientPid, What}),
		    add_subscr(Var, What, ClientPid),
		    {ok, node()}
	    catch error:_ ->
		    {error, {no_exists, ClientPid}}
	    end;
	true ->
	    {error, {already_exists, What}}
    end.

%%-record(subscribers, {pids = []}).  Old subscriber record removed
%% To solve backward compatibility, this code is a cludge.. 
add_subscr(subscribers, _What, Pid) ->
    mnesia_lib:add(subscribers, Pid),
    {ok, node()};
add_subscr(activity_subscribers, _What, Pid) ->
    mnesia_lib:add(activity_subscribers, Pid),
    {ok, node()};
add_subscr({Tab, commit_work}, What, Pid) ->
    Commit = mnesia_lib:val({Tab, commit_work}),
    case lists:keysearch(subscribers, 1, Commit) of
	false ->
	    Subscr = 
		case What of 
		    {table, _, simple} -> 
			{subscribers, [Pid], []};
		    {table, _, detailed} ->
			{subscribers, [], [Pid]}
		end,
	    mnesia_lib:add({Tab, subscribers}, Pid),
	    mnesia_lib:set({Tab, commit_work}, 
			   mnesia_lib:sort_commit([Subscr | Commit]));
	{value, Old} ->
	    {L1, L2} = 
		case Old of
		    {subscribers, L} ->  %% Old Way
			{L, []};
		    {subscribers, SL1, SL2} -> 
			{SL1, SL2}
		end,
	    Subscr = 
		case What of 
		    {table, _, simple} -> 
			{subscribers, [Pid | L1], L2};
		    {table, _, detailed} ->  
			{subscribers, L1, [Pid | L2]}
		end,
	    NewC  = lists:keyreplace(subscribers, 1, Commit, Subscr),
	    mnesia_lib:set({Tab, commit_work}, 
			   mnesia_lib:sort_commit(NewC)),
	    mnesia_lib:add({Tab, subscribers}, Pid)
    end.

deactivate(ClientPid, What, Var, SubscrTab) ->
    ?ets_match_delete(SubscrTab, {ClientPid, What}),
    try
	?ets_lookup_element(SubscrTab, ClientPid, 1),
	ignore
    catch error:_ -> unlink(ClientPid)
    end,
    try
	del_subscr(Var, What, ClientPid),
	{ok, node()}
    catch _:_ ->
	    {error, badarg}
    end.

del_subscr(subscribers, _What, Pid) ->
    mnesia_lib:del(subscribers, Pid);
del_subscr(activity_subscribers, _What, Pid) ->
    mnesia_lib:del(activity_subscribers, Pid);
del_subscr({Tab, commit_work}, What, Pid) ->
    Commit = mnesia_lib:val({Tab, commit_work}),
    case lists:keysearch(subscribers, 1, Commit) of
	false ->
	    false;
	{value, Old} ->
	    {L1, L2} = 
		case Old of
		    {subscribers, L} ->  %% Old Way
			{L, []};
		    {subscribers, SL1, SL2} -> 
			{SL1, SL2}
		end,
	    Subscr = 
		case What of %% Ignore user error delete subscr from any list
		    {table, _, simple} -> 
			NewL1 = lists:delete(Pid, L1),
			NewL2 = lists:delete(Pid, L2),
			{subscribers, NewL1, NewL2};
		    {table, _, detailed} ->
			NewL1 = lists:delete(Pid, L1),
			NewL2 = lists:delete(Pid, L2),
			{subscribers, NewL1, NewL2}
		end,
	    case Subscr of
		{subscribers, [], []} ->
		    NewC = lists:keydelete(subscribers, 1, Commit),
		    mnesia_lib:del({Tab, subscribers}, Pid),
		    mnesia_lib:set({Tab, commit_work}, 
				   mnesia_lib:sort_commit(NewC));
		_ ->
		    NewC = lists:keyreplace(subscribers, 1, Commit, Subscr),
		    mnesia_lib:del({Tab, subscribers}, Pid),
		    mnesia_lib:set({Tab, commit_work}, 
				   mnesia_lib:sort_commit(NewC))
	    end
    end.

handle_exit(ClientPid, SubscrTab) ->
    do_handle_exit(?ets_lookup(SubscrTab, ClientPid)),
    ?ets_delete(SubscrTab, ClientPid).

do_handle_exit([{ClientPid, What} | Tail]) ->
    case What of
	system ->
	    del_subscr(subscribers, What, ClientPid);
	activity ->
	    del_subscr(activity_subscribers, What, ClientPid);
	{_, Tab, _Level} ->
	    del_subscr({Tab, commit_work}, What, ClientPid)    
    end,
    do_handle_exit(Tail);
do_handle_exit([]) ->
    ok.

prepare_stop(SubscrTab) ->
    mnesia_lib:report_system_event({mnesia_down, node()}),
    do_prepare_stop(?ets_first(SubscrTab), SubscrTab).

do_prepare_stop('$end_of_table', _SubscrTab) ->
    ok;
do_prepare_stop(ClientPid, SubscrTab) ->
    Next = ?ets_next(SubscrTab, ClientPid),
    handle_exit(ClientPid, SubscrTab),
    unlink(ClientPid),
    do_prepare_stop(Next, SubscrTab).