diff options
Diffstat (limited to 'lib/mnesia/src/mnesia_checkpoint.erl')
-rw-r--r-- | lib/mnesia/src/mnesia_checkpoint.erl | 150 |
1 files changed, 97 insertions, 53 deletions
diff --git a/lib/mnesia/src/mnesia_checkpoint.erl b/lib/mnesia/src/mnesia_checkpoint.erl index 89425d354c..2e39defcbd 100644 --- a/lib/mnesia/src/mnesia_checkpoint.erl +++ b/lib/mnesia/src/mnesia_checkpoint.erl @@ -127,6 +127,7 @@ tm_enter_pending(Pending) -> tm_enter_pending([], Pending) -> Pending; tm_enter_pending([Tab | Tabs], Pending) -> + %% io:format("Add ~p ~p ~p~n",[Tab, Pending, hd(tl(element(2, process_info(self(), current_stacktrace))))]), catch ?ets_insert(Tab, Pending), tm_enter_pending(Tabs, Pending). @@ -771,104 +772,144 @@ retainer_delete({dets, Store}) -> Fname = tab2retainer(Store), file:delete(Fname). -retainer_loop(Cp) -> - Name = Cp#checkpoint_args.name, +retainer_loop(Cp = #checkpoint_args{is_activated=false, name=Name}) -> receive - {_From, {retain, Tid, Tab, Key, OldRecs}} - when Cp#checkpoint_args.wait_for_old == [] -> - R = val({Tab, {retainer, Name}}), - PendingTab = Cp#checkpoint_args.pending_tab, - case R#retainer.really_retain of - true when PendingTab =:= undefined -> - Store = R#retainer.store, - case retainer_get(Store, Key) of - [] -> retainer_put(Store, {Tab, Key, OldRecs}); - _ -> already_retained - end; - true -> - case ets:member(PendingTab, Tid) of - true -> ignore; - false -> - Store = R#retainer.store, - case retainer_get(Store, Key) of - [] -> retainer_put(Store, {Tab, Key, OldRecs}); - _ -> already_retained - end - end; - false -> - ignore - end, - retainer_loop(Cp); - %% Adm + {From, {activate, Pending}} -> + StillPending = mnesia_recover:still_pending(Pending), + enter_still_pending(StillPending, Cp#checkpoint_args.pending_tab), + Cp2 = maybe_activate(Cp#checkpoint_args{wait_for_old = StillPending}), + + reply(From, Name, activated), + retainer_loop(Cp2); + + {_From, {exit_pending, Tid}} when is_list(Cp#checkpoint_args.wait_for_old) -> + StillPending = lists:delete(Tid, Cp#checkpoint_args.wait_for_old), + Cp2 = Cp#checkpoint_args{wait_for_old = StillPending}, + Cp3 = maybe_activate(Cp2), + retainer_loop(Cp3); + {From, deactivate} -> do_stop(Cp), reply(From, Name, deactivated), unlink(From), exit(shutdown); + {From, get_checkpoint} -> + reply(From, Name, Cp), + retainer_loop(Cp); + {_From, {add_retainer, R, Node}} -> + Cp2 = do_add_retainer(Cp, R, Node), + retainer_loop(Cp2); + + {From, collect_pending} -> + PendingTab = Cp#checkpoint_args.pending_tab, + del(pending_checkpoints, PendingTab), + Pending = ?ets_match_object(PendingTab, '_'), + reply(From, Name, {ok, Pending}), + retainer_loop(Cp); + + {_From, {mnesia_down, Node}} -> + Cp2 = do_del_retainers(Cp, Node), + retainer_loop(Cp2); + {'EXIT', Parent, _} when Parent == Cp#checkpoint_args.supervisor -> %% do_stop(Cp), %% assume that entire Mnesia is terminating exit(shutdown); - {_From, {mnesia_down, Node}} -> - Cp2 = do_del_retainers(Cp, Node), - retainer_loop(Cp2); + {'EXIT', From, _Reason} -> + Iters = [Iter || Iter <- Cp#checkpoint_args.iterators, + check_iter(From, Iter)], + retainer_loop(Cp#checkpoint_args{iterators = Iters}); + + {system, From, Msg} -> + dbg_out("~p got {system, ~p, ~p}~n", [?MODULE, From, Msg]), + sys:handle_system_msg(Msg, From, Cp#checkpoint_args.supervisor, + ?MODULE, [], Cp) + end; + +retainer_loop(Cp = #checkpoint_args{name=Name}) -> + receive + {_From, {retain, Tid, Tab, Key, OldRecs}} -> + R = val({Tab, {retainer, Name}}), + PendingTab = Cp#checkpoint_args.pending_tab, + case R#retainer.really_retain of + true -> + Store = R#retainer.store, + try true = ets:member(PendingTab, Tid), + %% io:format("CP: ~p ~p ~p ~p~n",[true, Tab, Key, Tid]), + case retainer_get(Store, Key) of + [] -> ignore; + _ -> ets:delete(element(2,Store), Key) + end + catch _:_ -> + %% io:format("CP: ~p ~p ~p ~p~n",[false, Tab, Key, Tid]), + case retainer_get(Store, Key) of + [] -> retainer_put(Store, {Tab, Key, OldRecs}); + _ -> already_retained + end + end; + false -> + ignore + end, + retainer_loop(Cp); + + %% Adm {From, get_checkpoint} -> reply(From, Name, Cp), retainer_loop(Cp); - {From, {add_copy, Tab, Node}} when Cp#checkpoint_args.wait_for_old == [] -> + {From, {add_copy, Tab, Node}} -> {Res, Cp2} = do_add_copy(Cp, Tab, Node), reply(From, Name, Res), retainer_loop(Cp2); - {From, {del_copy, Tab, Node}} when Cp#checkpoint_args.wait_for_old == [] -> + {From, {del_copy, Tab, Node}} -> Cp2 = do_del_copy(Cp, Tab, Node), reply(From, Name, ok), retainer_loop(Cp2); - {From, {change_copy, Tab, From, To}} when Cp#checkpoint_args.wait_for_old == [] -> + {From, {change_copy, Tab, From, To}} -> Cp2 = do_change_copy(Cp, Tab, From, To), reply(From, Name, ok), retainer_loop(Cp2); {_From, {add_retainer, R, Node}} -> Cp2 = do_add_retainer(Cp, R, Node), retainer_loop(Cp2); - {_From, {del_retainer, R, Node}} when Cp#checkpoint_args.wait_for_old == [] -> + {_From, {del_retainer, R, Node}} -> Cp2 = do_del_retainer(Cp, R, Node), retainer_loop(Cp2); %% Iteration - {From, {iter_begin, Iter}} when Cp#checkpoint_args.wait_for_old == [] -> + {From, {iter_begin, Iter}} -> Cp2 = iter_begin(Cp, From, Iter), retainer_loop(Cp2); - {From, {iter_end, Iter}} when Cp#checkpoint_args.wait_for_old == [] -> + {From, {iter_end, Iter}} -> retainer_fixtable(Iter#iter.oid_tab, false), Iters = Cp#checkpoint_args.iterators -- [Iter], reply(From, Name, ok), - retainer_loop(Cp#checkpoint_args{iterators = Iters}); + retainer_loop(Cp#checkpoint_args{iterators = Iters}); - {_From, {exit_pending, Tid}} - when is_list(Cp#checkpoint_args.wait_for_old) -> + {_From, {exit_pending, Tid}} -> StillPending = lists:delete(Tid, Cp#checkpoint_args.wait_for_old), Cp2 = Cp#checkpoint_args{wait_for_old = StillPending}, Cp3 = maybe_activate(Cp2), retainer_loop(Cp3); - {From, collect_pending} -> - PendingTab = Cp#checkpoint_args.pending_tab, - del(pending_checkpoints, PendingTab), - Pending = ?ets_match_object(PendingTab, '_'), - reply(From, Name, {ok, Pending}), - retainer_loop(Cp); + {From, deactivate} -> + do_stop(Cp), + reply(From, Name, deactivated), + unlink(From), + exit(shutdown); - {From, {activate, Pending}} -> - StillPending = mnesia_recover:still_pending(Pending), - enter_still_pending(StillPending, Cp#checkpoint_args.pending_tab), - Cp2 = maybe_activate(Cp#checkpoint_args{wait_for_old = StillPending}), - reply(From, Name, activated), + {_From, {mnesia_down, Node}} -> + Cp2 = do_del_retainers(Cp, Node), retainer_loop(Cp2); + {'EXIT', Parent, _} when Parent == Cp#checkpoint_args.supervisor -> + %% do_stop(Cp), + %% assume that entire Mnesia is terminating + exit(shutdown); + {'EXIT', From, _Reason} -> Iters = [Iter || Iter <- Cp#checkpoint_args.iterators, check_iter(From, Iter)], @@ -877,13 +918,16 @@ retainer_loop(Cp) -> {system, From, Msg} -> dbg_out("~p got {system, ~p, ~p}~n", [?MODULE, From, Msg]), sys:handle_system_msg(Msg, From, Cp#checkpoint_args.supervisor, - ?MODULE, [], Cp) + ?MODULE, [], Cp); + Msg -> + dbg_out("~p got ~p~n", [?MODULE, Msg]) end. maybe_activate(Cp) when Cp#checkpoint_args.wait_for_old == [], Cp#checkpoint_args.is_activated == false -> - Cp#checkpoint_args{pending_tab = undefined, is_activated = true}; + Cp#checkpoint_args{%% pending_tab = undefined, + is_activated = true}; maybe_activate(Cp) -> Cp. |