From 2c3be1dc1a4af5028be7659587c5d5676b7eb83e Mon Sep 17 00:00:00 2001
From: Dan Gudmundsson <dgud@erlang.org>
Date: Thu, 15 Aug 2013 15:54:59 +0200
Subject: mnesia: Improve checkpoint activation

Fixed a race where some parts of a transaction could
be added to the checkpoint.

There are probably more races here but this improves the current
testcases.
---
 lib/mnesia/src/mnesia_checkpoint.erl | 150 ++++++++++++++++++++++-------------
 1 file changed, 97 insertions(+), 53 deletions(-)

(limited to 'lib')

diff --git a/lib/mnesia/src/mnesia_checkpoint.erl b/lib/mnesia/src/mnesia_checkpoint.erl
index 89425d354c..2e39defcbd 100644
--- a/lib/mnesia/src/mnesia_checkpoint.erl
+++ b/lib/mnesia/src/mnesia_checkpoint.erl
@@ -127,6 +127,7 @@ tm_enter_pending(Pending) ->
 tm_enter_pending([], Pending) ->
     Pending;
 tm_enter_pending([Tab | Tabs], Pending) ->
+    %% io:format("Add ~p ~p ~p~n",[Tab, Pending, hd(tl(element(2, process_info(self(), current_stacktrace))))]),
     catch ?ets_insert(Tab, Pending),
     tm_enter_pending(Tabs, Pending).
 
@@ -771,104 +772,144 @@ retainer_delete({dets, Store}) ->
     Fname = tab2retainer(Store),
     file:delete(Fname).
 
-retainer_loop(Cp) ->
-    Name = Cp#checkpoint_args.name,
+retainer_loop(Cp = #checkpoint_args{is_activated=false, name=Name}) ->
     receive
-	{_From, {retain, Tid, Tab, Key, OldRecs}}
-	when Cp#checkpoint_args.wait_for_old == [] ->
-	    R = val({Tab, {retainer, Name}}),
-	    PendingTab = Cp#checkpoint_args.pending_tab,
-	    case R#retainer.really_retain of
-		true when PendingTab =:= undefined ->
-		    Store = R#retainer.store,
-		    case retainer_get(Store, Key) of
-			[] ->  retainer_put(Store, {Tab, Key, OldRecs});
-			_ ->   already_retained
-		    end;
-		true ->
-		    case ets:member(PendingTab, Tid) of
-			true -> ignore;
-			false -> 
-			    Store = R#retainer.store,
-			    case retainer_get(Store, Key) of
-				[] ->  retainer_put(Store, {Tab, Key, OldRecs});
-				_ ->   already_retained
-			    end			
-		    end;
-		false ->
-		    ignore
-	    end,
-	    retainer_loop(Cp);
-	
 	%% Adm
+	{From, {activate, Pending}} ->
+            StillPending = mnesia_recover:still_pending(Pending),
+            enter_still_pending(StillPending, Cp#checkpoint_args.pending_tab),
+            Cp2 = maybe_activate(Cp#checkpoint_args{wait_for_old = StillPending}),
+
+            reply(From, Name, activated),
+	    retainer_loop(Cp2);
+
+	{_From, {exit_pending, Tid}} when is_list(Cp#checkpoint_args.wait_for_old) ->
+	    StillPending = lists:delete(Tid, Cp#checkpoint_args.wait_for_old),
+	    Cp2 = Cp#checkpoint_args{wait_for_old = StillPending},
+	    Cp3 = maybe_activate(Cp2),
+	    retainer_loop(Cp3);
+
 	{From, deactivate} ->
 	    do_stop(Cp),
 	    reply(From, Name, deactivated),
 	    unlink(From),
 	    exit(shutdown);
 
+	{From, get_checkpoint} ->
+	    reply(From, Name, Cp),
+	    retainer_loop(Cp);
+	{_From, {add_retainer, R, Node}} ->
+	    Cp2 = do_add_retainer(Cp, R, Node),
+	    retainer_loop(Cp2);
+
+	{From, collect_pending} ->
+	    PendingTab = Cp#checkpoint_args.pending_tab,
+	    del(pending_checkpoints, PendingTab),
+	    Pending = ?ets_match_object(PendingTab, '_'),
+	    reply(From, Name, {ok, Pending}),
+	    retainer_loop(Cp);
+
+	{_From, {mnesia_down, Node}} ->
+	    Cp2 = do_del_retainers(Cp, Node),
+	    retainer_loop(Cp2);
+
 	{'EXIT', Parent, _} when Parent == Cp#checkpoint_args.supervisor ->
 	    %% do_stop(Cp),
 	    %% assume that entire Mnesia is terminating
 	    exit(shutdown);
 
-	{_From, {mnesia_down, Node}} ->
-	    Cp2 = do_del_retainers(Cp, Node),
-	    retainer_loop(Cp2);
+	{'EXIT', From, _Reason} ->
+	    Iters = [Iter || Iter <- Cp#checkpoint_args.iterators,
+			     check_iter(From, Iter)],
+	    retainer_loop(Cp#checkpoint_args{iterators = Iters});
+
+	{system, From, Msg} ->
+	    dbg_out("~p got {system, ~p, ~p}~n", [?MODULE, From, Msg]),
+	    sys:handle_system_msg(Msg, From, Cp#checkpoint_args.supervisor,
+				  ?MODULE, [], Cp)
+    end;
+
+retainer_loop(Cp = #checkpoint_args{name=Name}) ->
+    receive
+	{_From, {retain, Tid, Tab, Key, OldRecs}} ->
+	    R = val({Tab, {retainer, Name}}),
+	    PendingTab = Cp#checkpoint_args.pending_tab,
+	    case R#retainer.really_retain of
+		true ->
+		    Store = R#retainer.store,
+		    try true = ets:member(PendingTab, Tid),
+			 %% io:format("CP: ~p ~p ~p ~p~n",[true, Tab, Key, Tid]),
+			 case retainer_get(Store, Key) of
+			     [] -> ignore;
+			     _  -> ets:delete(element(2,Store), Key)
+			 end
+		    catch _:_ ->
+			    %% io:format("CP: ~p ~p ~p ~p~n",[false, Tab, Key, Tid]),
+			    case retainer_get(Store, Key) of
+				[] -> retainer_put(Store, {Tab, Key, OldRecs});
+				_  -> already_retained
+			    end
+		    end;
+		false ->
+		    ignore
+	    end,
+	    retainer_loop(Cp);
+
+	%% Adm
 	{From, get_checkpoint} ->
 	    reply(From, Name, Cp),
 	    retainer_loop(Cp);
-	{From, {add_copy, Tab, Node}} when Cp#checkpoint_args.wait_for_old == [] ->
+	{From, {add_copy, Tab, Node}} ->
 	    {Res, Cp2} = do_add_copy(Cp, Tab, Node),
 	    reply(From, Name, Res),
 	    retainer_loop(Cp2);
-	{From, {del_copy, Tab, Node}} when Cp#checkpoint_args.wait_for_old == [] ->
+	{From, {del_copy, Tab, Node}} ->
 	    Cp2 = do_del_copy(Cp, Tab, Node),
 	    reply(From, Name, ok),
 	    retainer_loop(Cp2);
-	{From, {change_copy, Tab, From, To}} when Cp#checkpoint_args.wait_for_old == [] ->
+	{From, {change_copy, Tab, From, To}} ->
 	    Cp2 = do_change_copy(Cp, Tab, From, To),
 	    reply(From, Name, ok),
 	    retainer_loop(Cp2);
 	{_From, {add_retainer, R, Node}} ->
 	    Cp2 = do_add_retainer(Cp, R, Node),
 	    retainer_loop(Cp2);
-	{_From, {del_retainer, R, Node}} when Cp#checkpoint_args.wait_for_old == [] ->
+	{_From, {del_retainer, R, Node}} ->
 	    Cp2 = do_del_retainer(Cp, R, Node),
 	    retainer_loop(Cp2);
 
 	%% Iteration
-	{From, {iter_begin, Iter}} when Cp#checkpoint_args.wait_for_old == [] ->
+	{From, {iter_begin, Iter}} ->
 	    Cp2 = iter_begin(Cp, From, Iter),
 	    retainer_loop(Cp2);
 
-	{From, {iter_end, Iter}} when Cp#checkpoint_args.wait_for_old == [] ->
+	{From, {iter_end, Iter}}  ->
 	    retainer_fixtable(Iter#iter.oid_tab, false),
 	    Iters = Cp#checkpoint_args.iterators -- [Iter],
 	    reply(From, Name, ok),
-	    retainer_loop(Cp#checkpoint_args{iterators = Iters});	
+	    retainer_loop(Cp#checkpoint_args{iterators = Iters});
 
-	{_From, {exit_pending, Tid}}
-	    when is_list(Cp#checkpoint_args.wait_for_old) ->
+	{_From, {exit_pending, Tid}} ->
 	    StillPending = lists:delete(Tid, Cp#checkpoint_args.wait_for_old),
 	    Cp2 = Cp#checkpoint_args{wait_for_old = StillPending},
 	    Cp3 = maybe_activate(Cp2),
 	    retainer_loop(Cp3);
 
-	{From, collect_pending} ->
-	    PendingTab = Cp#checkpoint_args.pending_tab,
-	    del(pending_checkpoints, PendingTab),
-	    Pending = ?ets_match_object(PendingTab, '_'),
-	    reply(From, Name, {ok, Pending}),
-	    retainer_loop(Cp);
+	{From, deactivate} ->
+	    do_stop(Cp),
+	    reply(From, Name, deactivated),
+	    unlink(From),
+	    exit(shutdown);
 
-	{From, {activate, Pending}} ->
-            StillPending = mnesia_recover:still_pending(Pending),
-            enter_still_pending(StillPending, Cp#checkpoint_args.pending_tab),
-            Cp2 = maybe_activate(Cp#checkpoint_args{wait_for_old = StillPending}),
-            reply(From, Name, activated),
+	{_From, {mnesia_down, Node}} ->
+	    Cp2 = do_del_retainers(Cp, Node),
 	    retainer_loop(Cp2);
 
+	{'EXIT', Parent, _} when Parent == Cp#checkpoint_args.supervisor ->
+	    %% do_stop(Cp),
+	    %% assume that entire Mnesia is terminating
+	    exit(shutdown);
+
 	{'EXIT', From, _Reason} ->
 	    Iters = [Iter || Iter <- Cp#checkpoint_args.iterators,
 			     check_iter(From, Iter)],
@@ -877,13 +918,16 @@ retainer_loop(Cp) ->
 	{system, From, Msg} ->
 	    dbg_out("~p got {system, ~p, ~p}~n", [?MODULE, From, Msg]),
 	    sys:handle_system_msg(Msg, From, Cp#checkpoint_args.supervisor,
-				  ?MODULE, [], Cp)
+				  ?MODULE, [], Cp);
+	Msg ->
+	    dbg_out("~p got ~p~n", [?MODULE, Msg])
     end.
 
 maybe_activate(Cp)
   when Cp#checkpoint_args.wait_for_old == [],
        Cp#checkpoint_args.is_activated == false ->
-    Cp#checkpoint_args{pending_tab = undefined, is_activated = true};
+    Cp#checkpoint_args{%% pending_tab = undefined,
+		       is_activated = true};
 maybe_activate(Cp) ->
     Cp.
 
-- 
cgit v1.2.3