diff options
author | Dan Gudmundsson <[email protected]> | 2013-08-29 10:50:00 +0200 |
---|---|---|
committer | Dan Gudmundsson <[email protected]> | 2013-08-29 10:50:00 +0200 |
commit | 18d4e3e1a8aa59afbf360c41a540062194ff3dd1 (patch) | |
tree | d7fc6345dbdf93b972fa00eec01cc2273bdb207b /lib/mnesia/src/mnesia_checkpoint.erl | |
parent | 3bbfd3bd72601f7881115284ebf41bc25711a6d8 (diff) | |
parent | 6a6bc2560c60ea790780dcfbc91336a734eff1be (diff) | |
download | otp-18d4e3e1a8aa59afbf360c41a540062194ff3dd1.tar.gz otp-18d4e3e1a8aa59afbf360c41a540062194ff3dd1.tar.bz2 otp-18d4e3e1a8aa59afbf360c41a540062194ff3dd1.zip |
Merge branch 'maint'
Diffstat (limited to 'lib/mnesia/src/mnesia_checkpoint.erl')
-rw-r--r-- | lib/mnesia/src/mnesia_checkpoint.erl | 279 |
1 files changed, 118 insertions, 161 deletions
diff --git a/lib/mnesia/src/mnesia_checkpoint.erl b/lib/mnesia/src/mnesia_checkpoint.erl index eb8fe38908..173e3be2f5 100644 --- a/lib/mnesia/src/mnesia_checkpoint.erl +++ b/lib/mnesia/src/mnesia_checkpoint.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 1996-2009. All Rights Reserved. +%% Copyright Ericsson AB 1996-2013 %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -31,8 +31,7 @@ tm_retain/5, tm_enter_pending/1, tm_enter_pending/3, - tm_exit_pending/1, - convert_cp_record/1 + tm_exit_pending/1 ]). %% Public interface @@ -88,25 +87,6 @@ pid }). -%% Old record definition --record(checkpoint, {name, - allow_remote, - ram_overrides_dump, - nodes, - node, - now, - min, - max, - pending_tab, - wait_for_old, - is_activated, - ignore_new, - retainers, - iterators, - supervisor, - pid - }). - -record(retainer, {cp_name, tab_name, store, writers = [], really_retain = true}). -record(iter, {tab_name, oid_tab, main_tab, retainer_tab, source, val, pid}). @@ -129,15 +109,6 @@ tm_prepare(Cp) when is_record(Cp, checkpoint_args) -> start_retainer(Cp); true -> {error, {already_exists, Name, node()}} - end; -tm_prepare(Cp) when is_record(Cp, checkpoint) -> - %% Node with old protocol sent an old checkpoint record - %% and we have to convert it - case convert_cp_record(Cp) of - {ok, NewCp} -> - tm_prepare(NewCp); - {error, Reason} -> - {error, Reason} end. tm_mnesia_down(Node) -> @@ -156,6 +127,7 @@ tm_enter_pending(Pending) -> tm_enter_pending([], Pending) -> Pending; tm_enter_pending([Tab | Tabs], Pending) -> + %% io:format("Add ~p ~p ~p~n",[Tab, Pending, hd(tl(element(2, process_info(self(), current_stacktrace))))]), catch ?ets_insert(Tab, Pending), tm_enter_pending(Tabs, Pending). @@ -371,9 +343,7 @@ activate(Args) -> end. args2cp(Args) when is_list(Args)-> - case catch lists:foldl(fun check_arg/2, #checkpoint_args{}, Args) of - {'EXIT', Reason} -> - {error, Reason}; + try lists:foldl(fun check_arg/2, #checkpoint_args{}, Args) of Cp -> case check_tables(Cp) of {error, Reason} -> @@ -381,6 +351,10 @@ args2cp(Args) when is_list(Args)-> {ok, Overriders, AllTabs} -> arrange_retainers(Cp, Overriders, AllTabs) end + catch exit:Reason -> + {error, Reason}; + error:Reason -> + {error, Reason} end; args2cp(Args) -> {error, {badarg, Args}}. @@ -390,10 +364,10 @@ check_arg({name, Name}, Cp) -> true -> exit({already_exists, Name}); false -> - case catch tab2retainer({foo, Name}) of - List when is_list(List) -> - Cp#checkpoint_args{name = Name}; - _ -> + try + [_|_] = tab2retainer({foo, Name}), + Cp#checkpoint_args{name = Name} + catch _:_ -> exit({badarg, Name}) end end; @@ -641,11 +615,7 @@ init(Cp) -> process_flag(priority, high), %% Needed dets files might starve the system Name = Cp#checkpoint_args.name, Props = [set, public, {keypos, 2}], - case catch ?ets_new_table(mnesia_pending_checkpoint, Props) of - {'EXIT', Reason} -> %% system limit - Msg = "Cannot create an ets table for pending transactions", - Error = {error, {system_limit, Name, Msg, Reason}}, - proc_lib:init_ack(Cp#checkpoint_args.supervisor, Error); + try ?ets_new_table(mnesia_pending_checkpoint, Props) of PendingTab -> Rs = [prepare_tab(Cp, R) || R <- Cp#checkpoint_args.retainers], Cp2 = Cp#checkpoint_args{retainers = Rs, @@ -658,6 +628,10 @@ init(Cp) -> dbg_out("Checkpoint ~p (~p) started~n", [Name, self()]), proc_lib:init_ack(Cp2#checkpoint_args.supervisor, {ok, self()}), retainer_loop(Cp2) + catch error:Reason -> %% system limit + Msg = "Cannot create an ets table for pending transactions", + Error = {error, {system_limit, Name, Msg, Reason}}, + proc_lib:init_ack(Cp#checkpoint_args.supervisor, Error) end. prepare_tab(Cp, R) -> @@ -798,104 +772,142 @@ retainer_delete({dets, Store}) -> Fname = tab2retainer(Store), file:delete(Fname). -retainer_loop(Cp) -> - Name = Cp#checkpoint_args.name, +retainer_loop(Cp = #checkpoint_args{is_activated=false, name=Name}) -> receive - {_From, {retain, Tid, Tab, Key, OldRecs}} - when Cp#checkpoint_args.wait_for_old == [] -> - R = val({Tab, {retainer, Name}}), - PendingTab = Cp#checkpoint_args.pending_tab, - case R#retainer.really_retain of - true when PendingTab =:= undefined -> - Store = R#retainer.store, - case retainer_get(Store, Key) of - [] -> retainer_put(Store, {Tab, Key, OldRecs}); - _ -> already_retained - end; - true -> - case ets:member(PendingTab, Tid) of - true -> ignore; - false -> - Store = R#retainer.store, - case retainer_get(Store, Key) of - [] -> retainer_put(Store, {Tab, Key, OldRecs}); - _ -> already_retained - end - end; - false -> - ignore - end, - retainer_loop(Cp); - %% Adm + {From, {activate, Pending}} -> + StillPending = mnesia_recover:still_pending(Pending), + enter_still_pending(StillPending, Cp#checkpoint_args.pending_tab), + Local = [Tid || #tid{pid=Pid} = Tid <- StillPending, node(Pid) =/= node()], + Cp2 = maybe_activate(Cp#checkpoint_args{wait_for_old = Local}), + + reply(From, Name, activated), + retainer_loop(Cp2); + + {_From, {exit_pending, Tid}} when is_list(Cp#checkpoint_args.wait_for_old) -> + StillPending = lists:delete(Tid, Cp#checkpoint_args.wait_for_old), + Cp2 = Cp#checkpoint_args{wait_for_old = StillPending}, + Cp3 = maybe_activate(Cp2), + retainer_loop(Cp3); + {From, deactivate} -> do_stop(Cp), reply(From, Name, deactivated), unlink(From), exit(shutdown); + {From, get_checkpoint} -> + reply(From, Name, Cp), + retainer_loop(Cp); + {_From, {add_retainer, R, Node}} -> + Cp2 = do_add_retainer(Cp, R, Node), + retainer_loop(Cp2); + + {From, collect_pending} -> + PendingTab = Cp#checkpoint_args.pending_tab, + del(pending_checkpoints, PendingTab), + Pending = ?ets_match_object(PendingTab, '_'), + reply(From, Name, {ok, Pending}), + retainer_loop(Cp); + + {_From, {mnesia_down, Node}} -> + Cp2 = do_del_retainers(Cp, Node), + retainer_loop(Cp2); + {'EXIT', Parent, _} when Parent == Cp#checkpoint_args.supervisor -> %% do_stop(Cp), %% assume that entire Mnesia is terminating exit(shutdown); - {_From, {mnesia_down, Node}} -> - Cp2 = do_del_retainers(Cp, Node), - retainer_loop(Cp2); + {'EXIT', From, _Reason} -> + Iters = [Iter || Iter <- Cp#checkpoint_args.iterators, + check_iter(From, Iter)], + retainer_loop(Cp#checkpoint_args{iterators = Iters}); + + {system, From, Msg} -> + dbg_out("~p got {system, ~p, ~p}~n", [?MODULE, From, Msg]), + sys:handle_system_msg(Msg, From, Cp#checkpoint_args.supervisor, + ?MODULE, [], Cp) + end; + +retainer_loop(Cp = #checkpoint_args{name=Name}) -> + receive + {_From, {retain, Tid, Tab, Key, OldRecs}} -> + R = val({Tab, {retainer, Name}}), + PendingTab = Cp#checkpoint_args.pending_tab, + case R#retainer.really_retain of + true -> + Store = R#retainer.store, + try true = ets:member(PendingTab, Tid), + %% io:format("CP: ~p ~p ~p ~p~n",[true, Tab, Key, Tid]), + case retainer_get(Store, Key) of + [] -> ignore; + _ -> ets:delete(element(2,Store), Key) + end + catch _:_ -> + %% io:format("CP: ~p ~p ~p ~p~n",[false, Tab, Key, Tid]), + case retainer_get(Store, Key) of + [] -> retainer_put(Store, {Tab, Key, OldRecs}); + _ -> already_retained + end + end; + false -> + ignore + end, + retainer_loop(Cp); + + %% Adm {From, get_checkpoint} -> reply(From, Name, Cp), retainer_loop(Cp); - {From, {add_copy, Tab, Node}} when Cp#checkpoint_args.wait_for_old == [] -> + {From, {add_copy, Tab, Node}} -> {Res, Cp2} = do_add_copy(Cp, Tab, Node), reply(From, Name, Res), retainer_loop(Cp2); - {From, {del_copy, Tab, Node}} when Cp#checkpoint_args.wait_for_old == [] -> + {From, {del_copy, Tab, Node}} -> Cp2 = do_del_copy(Cp, Tab, Node), reply(From, Name, ok), retainer_loop(Cp2); - {From, {change_copy, Tab, From, To}} when Cp#checkpoint_args.wait_for_old == [] -> + {From, {change_copy, Tab, From, To}} -> Cp2 = do_change_copy(Cp, Tab, From, To), reply(From, Name, ok), retainer_loop(Cp2); {_From, {add_retainer, R, Node}} -> Cp2 = do_add_retainer(Cp, R, Node), retainer_loop(Cp2); - {_From, {del_retainer, R, Node}} when Cp#checkpoint_args.wait_for_old == [] -> + {_From, {del_retainer, R, Node}} -> Cp2 = do_del_retainer(Cp, R, Node), retainer_loop(Cp2); %% Iteration - {From, {iter_begin, Iter}} when Cp#checkpoint_args.wait_for_old == [] -> + {From, {iter_begin, Iter}} -> Cp2 = iter_begin(Cp, From, Iter), retainer_loop(Cp2); - {From, {iter_end, Iter}} when Cp#checkpoint_args.wait_for_old == [] -> + {From, {iter_end, Iter}} -> retainer_fixtable(Iter#iter.oid_tab, false), Iters = Cp#checkpoint_args.iterators -- [Iter], reply(From, Name, ok), - retainer_loop(Cp#checkpoint_args{iterators = Iters}); - - {_From, {exit_pending, Tid}} - when is_list(Cp#checkpoint_args.wait_for_old) -> - StillPending = lists:delete(Tid, Cp#checkpoint_args.wait_for_old), - Cp2 = Cp#checkpoint_args{wait_for_old = StillPending}, - Cp3 = maybe_activate(Cp2), - retainer_loop(Cp3); + retainer_loop(Cp#checkpoint_args{iterators = Iters}); - {From, collect_pending} -> - PendingTab = Cp#checkpoint_args.pending_tab, - del(pending_checkpoints, PendingTab), - Pending = ?ets_match_object(PendingTab, '_'), - reply(From, Name, {ok, Pending}), + {_From, {exit_pending, _Tid}} -> retainer_loop(Cp); - {From, {activate, Pending}} -> - StillPending = mnesia_recover:still_pending(Pending), - enter_still_pending(StillPending, Cp#checkpoint_args.pending_tab), - Cp2 = maybe_activate(Cp#checkpoint_args{wait_for_old = StillPending}), - reply(From, Name, activated), + {From, deactivate} -> + do_stop(Cp), + reply(From, Name, deactivated), + unlink(From), + exit(shutdown); + + {_From, {mnesia_down, Node}} -> + Cp2 = do_del_retainers(Cp, Node), retainer_loop(Cp2); + {'EXIT', Parent, _} when Parent == Cp#checkpoint_args.supervisor -> + %% do_stop(Cp), + %% assume that entire Mnesia is terminating + exit(shutdown); + {'EXIT', From, _Reason} -> Iters = [Iter || Iter <- Cp#checkpoint_args.iterators, check_iter(From, Iter)], @@ -903,13 +915,17 @@ retainer_loop(Cp) -> {system, From, Msg} -> dbg_out("~p got {system, ~p, ~p}~n", [?MODULE, From, Msg]), - sys:handle_system_msg(Msg, From, no_parent, ?MODULE, [], Cp) + sys:handle_system_msg(Msg, From, Cp#checkpoint_args.supervisor, + ?MODULE, [], Cp); + Msg -> + dbg_out("~p got ~p~n", [?MODULE, Msg]) end. maybe_activate(Cp) - when Cp#checkpoint_args.wait_for_old == [], - Cp#checkpoint_args.is_activated == false -> - Cp#checkpoint_args{pending_tab = undefined, is_activated = true}; + when Cp#checkpoint_args.wait_for_old == [], + Cp#checkpoint_args.is_activated == false -> + Cp#checkpoint_args{%% pending_tab = undefined, + is_activated = true}; maybe_activate(Cp) -> Cp. @@ -1226,65 +1242,6 @@ system_terminate(_Reason, _Parent,_Debug, Cp) -> system_code_change(Cp, _Module, _OldVsn, _Extra) -> {ok, Cp}. -convert_cp_record(Cp) when is_record(Cp, checkpoint) -> - ROD = - case Cp#checkpoint.ram_overrides_dump of - true -> Cp#checkpoint.min ++ Cp#checkpoint.max; - false -> [] - end, - - {ok, #checkpoint_args{name = Cp#checkpoint.name, - allow_remote = Cp#checkpoint.name, - ram_overrides_dump = ROD, - nodes = Cp#checkpoint.nodes, - node = Cp#checkpoint.node, - now = Cp#checkpoint.now, - cookie = ?unique_cookie, - min = Cp#checkpoint.min, - max = Cp#checkpoint.max, - pending_tab = Cp#checkpoint.pending_tab, - wait_for_old = Cp#checkpoint.wait_for_old, - is_activated = Cp#checkpoint.is_activated, - ignore_new = Cp#checkpoint.ignore_new, - retainers = Cp#checkpoint.retainers, - iterators = Cp#checkpoint.iterators, - supervisor = Cp#checkpoint.supervisor, - pid = Cp#checkpoint.pid - }}; -convert_cp_record(Cp) when is_record(Cp, checkpoint_args) -> - AllTabs = Cp#checkpoint_args.min ++ Cp#checkpoint_args.max, - ROD = case Cp#checkpoint_args.ram_overrides_dump of - [] -> - false; - AllTabs -> - true; - _ -> - error - end, - if - ROD == error -> - {error, {"Old node cannot handle new checkpoint protocol", - ram_overrides_dump}}; - true -> - {ok, #checkpoint{name = Cp#checkpoint_args.name, - allow_remote = Cp#checkpoint_args.name, - ram_overrides_dump = ROD, - nodes = Cp#checkpoint_args.nodes, - node = Cp#checkpoint_args.node, - now = Cp#checkpoint_args.now, - min = Cp#checkpoint_args.min, - max = Cp#checkpoint_args.max, - pending_tab = Cp#checkpoint_args.pending_tab, - wait_for_old = Cp#checkpoint_args.wait_for_old, - is_activated = Cp#checkpoint_args.is_activated, - ignore_new = Cp#checkpoint_args.ignore_new, - retainers = Cp#checkpoint_args.retainers, - iterators = Cp#checkpoint_args.iterators, - supervisor = Cp#checkpoint_args.supervisor, - pid = Cp#checkpoint_args.pid - }} - end. - %%%%%%%%%%%%%%%%%%%%%%%%%% val(Var) -> |