aboutsummaryrefslogtreecommitdiffstats
path: root/lib/mnesia/src/mnesia_checkpoint.erl
diff options
context:
space:
mode:
authorDan Gudmundsson <[email protected]>2013-08-29 10:50:00 +0200
committerDan Gudmundsson <[email protected]>2013-08-29 10:50:00 +0200
commit18d4e3e1a8aa59afbf360c41a540062194ff3dd1 (patch)
treed7fc6345dbdf93b972fa00eec01cc2273bdb207b /lib/mnesia/src/mnesia_checkpoint.erl
parent3bbfd3bd72601f7881115284ebf41bc25711a6d8 (diff)
parent6a6bc2560c60ea790780dcfbc91336a734eff1be (diff)
downloadotp-18d4e3e1a8aa59afbf360c41a540062194ff3dd1.tar.gz
otp-18d4e3e1a8aa59afbf360c41a540062194ff3dd1.tar.bz2
otp-18d4e3e1a8aa59afbf360c41a540062194ff3dd1.zip
Merge branch 'maint'
Diffstat (limited to 'lib/mnesia/src/mnesia_checkpoint.erl')
-rw-r--r--lib/mnesia/src/mnesia_checkpoint.erl279
1 files changed, 118 insertions, 161 deletions
diff --git a/lib/mnesia/src/mnesia_checkpoint.erl b/lib/mnesia/src/mnesia_checkpoint.erl
index eb8fe38908..173e3be2f5 100644
--- a/lib/mnesia/src/mnesia_checkpoint.erl
+++ b/lib/mnesia/src/mnesia_checkpoint.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 1996-2009. All Rights Reserved.
+%% Copyright Ericsson AB 1996-2013
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -31,8 +31,7 @@
tm_retain/5,
tm_enter_pending/1,
tm_enter_pending/3,
- tm_exit_pending/1,
- convert_cp_record/1
+ tm_exit_pending/1
]).
%% Public interface
@@ -88,25 +87,6 @@
pid
}).
-%% Old record definition
--record(checkpoint, {name,
- allow_remote,
- ram_overrides_dump,
- nodes,
- node,
- now,
- min,
- max,
- pending_tab,
- wait_for_old,
- is_activated,
- ignore_new,
- retainers,
- iterators,
- supervisor,
- pid
- }).
-
-record(retainer, {cp_name, tab_name, store, writers = [], really_retain = true}).
-record(iter, {tab_name, oid_tab, main_tab, retainer_tab, source, val, pid}).
@@ -129,15 +109,6 @@ tm_prepare(Cp) when is_record(Cp, checkpoint_args) ->
start_retainer(Cp);
true ->
{error, {already_exists, Name, node()}}
- end;
-tm_prepare(Cp) when is_record(Cp, checkpoint) ->
- %% Node with old protocol sent an old checkpoint record
- %% and we have to convert it
- case convert_cp_record(Cp) of
- {ok, NewCp} ->
- tm_prepare(NewCp);
- {error, Reason} ->
- {error, Reason}
end.
tm_mnesia_down(Node) ->
@@ -156,6 +127,7 @@ tm_enter_pending(Pending) ->
tm_enter_pending([], Pending) ->
Pending;
tm_enter_pending([Tab | Tabs], Pending) ->
+ %% io:format("Add ~p ~p ~p~n",[Tab, Pending, hd(tl(element(2, process_info(self(), current_stacktrace))))]),
catch ?ets_insert(Tab, Pending),
tm_enter_pending(Tabs, Pending).
@@ -371,9 +343,7 @@ activate(Args) ->
end.
args2cp(Args) when is_list(Args)->
- case catch lists:foldl(fun check_arg/2, #checkpoint_args{}, Args) of
- {'EXIT', Reason} ->
- {error, Reason};
+ try lists:foldl(fun check_arg/2, #checkpoint_args{}, Args) of
Cp ->
case check_tables(Cp) of
{error, Reason} ->
@@ -381,6 +351,10 @@ args2cp(Args) when is_list(Args)->
{ok, Overriders, AllTabs} ->
arrange_retainers(Cp, Overriders, AllTabs)
end
+ catch exit:Reason ->
+ {error, Reason};
+ error:Reason ->
+ {error, Reason}
end;
args2cp(Args) ->
{error, {badarg, Args}}.
@@ -390,10 +364,10 @@ check_arg({name, Name}, Cp) ->
true ->
exit({already_exists, Name});
false ->
- case catch tab2retainer({foo, Name}) of
- List when is_list(List) ->
- Cp#checkpoint_args{name = Name};
- _ ->
+ try
+ [_|_] = tab2retainer({foo, Name}),
+ Cp#checkpoint_args{name = Name}
+ catch _:_ ->
exit({badarg, Name})
end
end;
@@ -641,11 +615,7 @@ init(Cp) ->
process_flag(priority, high), %% Needed dets files might starve the system
Name = Cp#checkpoint_args.name,
Props = [set, public, {keypos, 2}],
- case catch ?ets_new_table(mnesia_pending_checkpoint, Props) of
- {'EXIT', Reason} -> %% system limit
- Msg = "Cannot create an ets table for pending transactions",
- Error = {error, {system_limit, Name, Msg, Reason}},
- proc_lib:init_ack(Cp#checkpoint_args.supervisor, Error);
+ try ?ets_new_table(mnesia_pending_checkpoint, Props) of
PendingTab ->
Rs = [prepare_tab(Cp, R) || R <- Cp#checkpoint_args.retainers],
Cp2 = Cp#checkpoint_args{retainers = Rs,
@@ -658,6 +628,10 @@ init(Cp) ->
dbg_out("Checkpoint ~p (~p) started~n", [Name, self()]),
proc_lib:init_ack(Cp2#checkpoint_args.supervisor, {ok, self()}),
retainer_loop(Cp2)
+ catch error:Reason -> %% system limit
+ Msg = "Cannot create an ets table for pending transactions",
+ Error = {error, {system_limit, Name, Msg, Reason}},
+ proc_lib:init_ack(Cp#checkpoint_args.supervisor, Error)
end.
prepare_tab(Cp, R) ->
@@ -798,104 +772,142 @@ retainer_delete({dets, Store}) ->
Fname = tab2retainer(Store),
file:delete(Fname).
-retainer_loop(Cp) ->
- Name = Cp#checkpoint_args.name,
+retainer_loop(Cp = #checkpoint_args{is_activated=false, name=Name}) ->
receive
- {_From, {retain, Tid, Tab, Key, OldRecs}}
- when Cp#checkpoint_args.wait_for_old == [] ->
- R = val({Tab, {retainer, Name}}),
- PendingTab = Cp#checkpoint_args.pending_tab,
- case R#retainer.really_retain of
- true when PendingTab =:= undefined ->
- Store = R#retainer.store,
- case retainer_get(Store, Key) of
- [] -> retainer_put(Store, {Tab, Key, OldRecs});
- _ -> already_retained
- end;
- true ->
- case ets:member(PendingTab, Tid) of
- true -> ignore;
- false ->
- Store = R#retainer.store,
- case retainer_get(Store, Key) of
- [] -> retainer_put(Store, {Tab, Key, OldRecs});
- _ -> already_retained
- end
- end;
- false ->
- ignore
- end,
- retainer_loop(Cp);
-
%% Adm
+ {From, {activate, Pending}} ->
+ StillPending = mnesia_recover:still_pending(Pending),
+ enter_still_pending(StillPending, Cp#checkpoint_args.pending_tab),
+ Local = [Tid || #tid{pid=Pid} = Tid <- StillPending, node(Pid) =/= node()],
+ Cp2 = maybe_activate(Cp#checkpoint_args{wait_for_old = Local}),
+
+ reply(From, Name, activated),
+ retainer_loop(Cp2);
+
+ {_From, {exit_pending, Tid}} when is_list(Cp#checkpoint_args.wait_for_old) ->
+ StillPending = lists:delete(Tid, Cp#checkpoint_args.wait_for_old),
+ Cp2 = Cp#checkpoint_args{wait_for_old = StillPending},
+ Cp3 = maybe_activate(Cp2),
+ retainer_loop(Cp3);
+
{From, deactivate} ->
do_stop(Cp),
reply(From, Name, deactivated),
unlink(From),
exit(shutdown);
+ {From, get_checkpoint} ->
+ reply(From, Name, Cp),
+ retainer_loop(Cp);
+ {_From, {add_retainer, R, Node}} ->
+ Cp2 = do_add_retainer(Cp, R, Node),
+ retainer_loop(Cp2);
+
+ {From, collect_pending} ->
+ PendingTab = Cp#checkpoint_args.pending_tab,
+ del(pending_checkpoints, PendingTab),
+ Pending = ?ets_match_object(PendingTab, '_'),
+ reply(From, Name, {ok, Pending}),
+ retainer_loop(Cp);
+
+ {_From, {mnesia_down, Node}} ->
+ Cp2 = do_del_retainers(Cp, Node),
+ retainer_loop(Cp2);
+
{'EXIT', Parent, _} when Parent == Cp#checkpoint_args.supervisor ->
%% do_stop(Cp),
%% assume that entire Mnesia is terminating
exit(shutdown);
- {_From, {mnesia_down, Node}} ->
- Cp2 = do_del_retainers(Cp, Node),
- retainer_loop(Cp2);
+ {'EXIT', From, _Reason} ->
+ Iters = [Iter || Iter <- Cp#checkpoint_args.iterators,
+ check_iter(From, Iter)],
+ retainer_loop(Cp#checkpoint_args{iterators = Iters});
+
+ {system, From, Msg} ->
+ dbg_out("~p got {system, ~p, ~p}~n", [?MODULE, From, Msg]),
+ sys:handle_system_msg(Msg, From, Cp#checkpoint_args.supervisor,
+ ?MODULE, [], Cp)
+ end;
+
+retainer_loop(Cp = #checkpoint_args{name=Name}) ->
+ receive
+ {_From, {retain, Tid, Tab, Key, OldRecs}} ->
+ R = val({Tab, {retainer, Name}}),
+ PendingTab = Cp#checkpoint_args.pending_tab,
+ case R#retainer.really_retain of
+ true ->
+ Store = R#retainer.store,
+ try true = ets:member(PendingTab, Tid),
+ %% io:format("CP: ~p ~p ~p ~p~n",[true, Tab, Key, Tid]),
+ case retainer_get(Store, Key) of
+ [] -> ignore;
+ _ -> ets:delete(element(2,Store), Key)
+ end
+ catch _:_ ->
+ %% io:format("CP: ~p ~p ~p ~p~n",[false, Tab, Key, Tid]),
+ case retainer_get(Store, Key) of
+ [] -> retainer_put(Store, {Tab, Key, OldRecs});
+ _ -> already_retained
+ end
+ end;
+ false ->
+ ignore
+ end,
+ retainer_loop(Cp);
+
+ %% Adm
{From, get_checkpoint} ->
reply(From, Name, Cp),
retainer_loop(Cp);
- {From, {add_copy, Tab, Node}} when Cp#checkpoint_args.wait_for_old == [] ->
+ {From, {add_copy, Tab, Node}} ->
{Res, Cp2} = do_add_copy(Cp, Tab, Node),
reply(From, Name, Res),
retainer_loop(Cp2);
- {From, {del_copy, Tab, Node}} when Cp#checkpoint_args.wait_for_old == [] ->
+ {From, {del_copy, Tab, Node}} ->
Cp2 = do_del_copy(Cp, Tab, Node),
reply(From, Name, ok),
retainer_loop(Cp2);
- {From, {change_copy, Tab, From, To}} when Cp#checkpoint_args.wait_for_old == [] ->
+ {From, {change_copy, Tab, From, To}} ->
Cp2 = do_change_copy(Cp, Tab, From, To),
reply(From, Name, ok),
retainer_loop(Cp2);
{_From, {add_retainer, R, Node}} ->
Cp2 = do_add_retainer(Cp, R, Node),
retainer_loop(Cp2);
- {_From, {del_retainer, R, Node}} when Cp#checkpoint_args.wait_for_old == [] ->
+ {_From, {del_retainer, R, Node}} ->
Cp2 = do_del_retainer(Cp, R, Node),
retainer_loop(Cp2);
%% Iteration
- {From, {iter_begin, Iter}} when Cp#checkpoint_args.wait_for_old == [] ->
+ {From, {iter_begin, Iter}} ->
Cp2 = iter_begin(Cp, From, Iter),
retainer_loop(Cp2);
- {From, {iter_end, Iter}} when Cp#checkpoint_args.wait_for_old == [] ->
+ {From, {iter_end, Iter}} ->
retainer_fixtable(Iter#iter.oid_tab, false),
Iters = Cp#checkpoint_args.iterators -- [Iter],
reply(From, Name, ok),
- retainer_loop(Cp#checkpoint_args{iterators = Iters});
-
- {_From, {exit_pending, Tid}}
- when is_list(Cp#checkpoint_args.wait_for_old) ->
- StillPending = lists:delete(Tid, Cp#checkpoint_args.wait_for_old),
- Cp2 = Cp#checkpoint_args{wait_for_old = StillPending},
- Cp3 = maybe_activate(Cp2),
- retainer_loop(Cp3);
+ retainer_loop(Cp#checkpoint_args{iterators = Iters});
- {From, collect_pending} ->
- PendingTab = Cp#checkpoint_args.pending_tab,
- del(pending_checkpoints, PendingTab),
- Pending = ?ets_match_object(PendingTab, '_'),
- reply(From, Name, {ok, Pending}),
+ {_From, {exit_pending, _Tid}} ->
retainer_loop(Cp);
- {From, {activate, Pending}} ->
- StillPending = mnesia_recover:still_pending(Pending),
- enter_still_pending(StillPending, Cp#checkpoint_args.pending_tab),
- Cp2 = maybe_activate(Cp#checkpoint_args{wait_for_old = StillPending}),
- reply(From, Name, activated),
+ {From, deactivate} ->
+ do_stop(Cp),
+ reply(From, Name, deactivated),
+ unlink(From),
+ exit(shutdown);
+
+ {_From, {mnesia_down, Node}} ->
+ Cp2 = do_del_retainers(Cp, Node),
retainer_loop(Cp2);
+ {'EXIT', Parent, _} when Parent == Cp#checkpoint_args.supervisor ->
+ %% do_stop(Cp),
+ %% assume that entire Mnesia is terminating
+ exit(shutdown);
+
{'EXIT', From, _Reason} ->
Iters = [Iter || Iter <- Cp#checkpoint_args.iterators,
check_iter(From, Iter)],
@@ -903,13 +915,17 @@ retainer_loop(Cp) ->
{system, From, Msg} ->
dbg_out("~p got {system, ~p, ~p}~n", [?MODULE, From, Msg]),
- sys:handle_system_msg(Msg, From, no_parent, ?MODULE, [], Cp)
+ sys:handle_system_msg(Msg, From, Cp#checkpoint_args.supervisor,
+ ?MODULE, [], Cp);
+ Msg ->
+ dbg_out("~p got ~p~n", [?MODULE, Msg])
end.
maybe_activate(Cp)
- when Cp#checkpoint_args.wait_for_old == [],
- Cp#checkpoint_args.is_activated == false ->
- Cp#checkpoint_args{pending_tab = undefined, is_activated = true};
+ when Cp#checkpoint_args.wait_for_old == [],
+ Cp#checkpoint_args.is_activated == false ->
+ Cp#checkpoint_args{%% pending_tab = undefined,
+ is_activated = true};
maybe_activate(Cp) ->
Cp.
@@ -1226,65 +1242,6 @@ system_terminate(_Reason, _Parent,_Debug, Cp) ->
system_code_change(Cp, _Module, _OldVsn, _Extra) ->
{ok, Cp}.
-convert_cp_record(Cp) when is_record(Cp, checkpoint) ->
- ROD =
- case Cp#checkpoint.ram_overrides_dump of
- true -> Cp#checkpoint.min ++ Cp#checkpoint.max;
- false -> []
- end,
-
- {ok, #checkpoint_args{name = Cp#checkpoint.name,
- allow_remote = Cp#checkpoint.name,
- ram_overrides_dump = ROD,
- nodes = Cp#checkpoint.nodes,
- node = Cp#checkpoint.node,
- now = Cp#checkpoint.now,
- cookie = ?unique_cookie,
- min = Cp#checkpoint.min,
- max = Cp#checkpoint.max,
- pending_tab = Cp#checkpoint.pending_tab,
- wait_for_old = Cp#checkpoint.wait_for_old,
- is_activated = Cp#checkpoint.is_activated,
- ignore_new = Cp#checkpoint.ignore_new,
- retainers = Cp#checkpoint.retainers,
- iterators = Cp#checkpoint.iterators,
- supervisor = Cp#checkpoint.supervisor,
- pid = Cp#checkpoint.pid
- }};
-convert_cp_record(Cp) when is_record(Cp, checkpoint_args) ->
- AllTabs = Cp#checkpoint_args.min ++ Cp#checkpoint_args.max,
- ROD = case Cp#checkpoint_args.ram_overrides_dump of
- [] ->
- false;
- AllTabs ->
- true;
- _ ->
- error
- end,
- if
- ROD == error ->
- {error, {"Old node cannot handle new checkpoint protocol",
- ram_overrides_dump}};
- true ->
- {ok, #checkpoint{name = Cp#checkpoint_args.name,
- allow_remote = Cp#checkpoint_args.name,
- ram_overrides_dump = ROD,
- nodes = Cp#checkpoint_args.nodes,
- node = Cp#checkpoint_args.node,
- now = Cp#checkpoint_args.now,
- min = Cp#checkpoint_args.min,
- max = Cp#checkpoint_args.max,
- pending_tab = Cp#checkpoint_args.pending_tab,
- wait_for_old = Cp#checkpoint_args.wait_for_old,
- is_activated = Cp#checkpoint_args.is_activated,
- ignore_new = Cp#checkpoint_args.ignore_new,
- retainers = Cp#checkpoint_args.retainers,
- iterators = Cp#checkpoint_args.iterators,
- supervisor = Cp#checkpoint_args.supervisor,
- pid = Cp#checkpoint_args.pid
- }}
- end.
-
%%%%%%%%%%%%%%%%%%%%%%%%%%
val(Var) ->