aboutsummaryrefslogtreecommitdiffstats
path: root/lib/mnesia/src/mnesia_tm.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/mnesia/src/mnesia_tm.erl')
-rw-r--r--lib/mnesia/src/mnesia_tm.erl99
1 files changed, 56 insertions, 43 deletions
diff --git a/lib/mnesia/src/mnesia_tm.erl b/lib/mnesia/src/mnesia_tm.erl
index 8b79fca1d7..8a4113422a 100644
--- a/lib/mnesia/src/mnesia_tm.erl
+++ b/lib/mnesia/src/mnesia_tm.erl
@@ -26,7 +26,7 @@
init/1,
non_transaction/5,
transaction/6,
- commit_participant/5,
+ commit_participant/6,
dirty/2,
display_info/2,
do_update_op/3,
@@ -62,13 +62,14 @@
%% Format on coordinators is [{Tid, EtsTabList} .....
-record(prep, {protocol = sym_trans,
- %% async_dirty | sync_dirty | sym_trans | sync_sym_trans | asym_trans
+ %% async_dirty | sync_dirty | sym_trans | sync_sym_trans | asym_trans | sync_asym_trans
records = [],
prev_tab = [], % initiate to a non valid table name
prev_types,
prev_snmp,
types,
- majority = []
+ majority = [],
+ sync = false
}).
-record(participant, {tid, pid, commit, disc_nodes = [],
@@ -250,11 +251,13 @@ doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=
mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
Commit = new_cr_format(Commit0),
Pid =
- case Protocol of
- asym_trans when node(Tid#tid.pid) /= node() ->
- Args = [tmpid(From), Tid, Commit, DiscNs, RamNs],
+ if
+ node(Tid#tid.pid) =:= node() ->
+ error({internal_error, local_node});
+ Protocol =:= asym_trans orelse Protocol =:= sync_asym_trans ->
+ Args = [Protocol, tmpid(From), Tid, Commit, DiscNs, RamNs],
spawn_link(?MODULE, commit_participant, Args);
- _ when node(Tid#tid.pid) /= node() -> %% *_sym_trans
+ true -> %% *_sym_trans
reply(From, {vote_yes, Tid}),
nopid
end,
@@ -1190,7 +1193,15 @@ do_arrange(Tid, Store, RestoreKey, Prep, N) when RestoreKey == restore_op ->
P2 = Prep#prep{protocol = asym_trans, records = Recs2},
do_arrange(Tid, Store, ?ets_next(Store, RestoreKey), P2, N + 1);
do_arrange(_Tid, _Store, '$end_of_table', Prep, N) ->
- {N, Prep};
+ case Prep of
+ #prep{sync=true, protocol=asym_trans} ->
+ {N, Prep#prep{protocol=sync_asym_trans}};
+ _ ->
+ {N, Prep}
+ end;
+do_arrange(Tid, Store, sticky, Prep, N) ->
+ P2 = Prep#prep{sync=true},
+ do_arrange(Tid, Store, ?ets_next(Store, sticky), P2, N);
do_arrange(Tid, Store, IgnoredKey, Prep, N) -> %% locks, nodes ... local atoms...
do_arrange(Tid, Store, ?ets_next(Store, IgnoredKey), Prep, N).
@@ -1448,7 +1459,8 @@ multi_commit(sync_sym_trans, _Maj = [], Tid, CR, Store) ->
[{tid, Tid}, {outcome, Outcome}]),
Outcome;
-multi_commit(asym_trans, Majority, Tid, CR, Store) ->
+multi_commit(Protocol, Majority, Tid, CR, Store)
+ when Protocol =:= asym_trans; Protocol =:= sync_asym_trans ->
%% This more expensive commit protocol is used when
%% table definitions are changed (schema transactions).
%% It is also used when the involved tables are
@@ -1515,7 +1527,7 @@ multi_commit(asym_trans, Majority, Tid, CR, Store) ->
end,
Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
?ets_insert(Store, Pending),
- {WaitFor, Local} = ask_commit(asym_trans, Tid, CR2, DiscNs, RamNs),
+ {WaitFor, Local} = ask_commit(Protocol, Tid, CR2, DiscNs, RamNs),
SchemaPrep = ?CATCH(mnesia_schema:prepare_commit(Tid, Local, {coord, WaitFor})),
{Votes, Pids} = rec_all(WaitFor, Tid, do_commit, []),
@@ -1563,38 +1575,38 @@ multi_commit(asym_trans, Majority, Tid, CR, Store) ->
%% Returns do_commit or {do_abort, Reason}
rec_acc_pre_commit([Pid | Tail], Tid, Store, Commit, Res, DumperMode,
- GoodPids, SchemaAckPids) ->
+ GoodPids, AckPids) ->
receive
{?MODULE, _, {acc_pre_commit, Tid, Pid, true}} ->
rec_acc_pre_commit(Tail, Tid, Store, Commit, Res, DumperMode,
- [Pid | GoodPids], [Pid | SchemaAckPids]);
+ [Pid | GoodPids], [Pid | AckPids]);
{?MODULE, _, {acc_pre_commit, Tid, Pid, false}} ->
rec_acc_pre_commit(Tail, Tid, Store, Commit, Res, DumperMode,
- [Pid | GoodPids], SchemaAckPids);
+ [Pid | GoodPids], AckPids);
{?MODULE, _, {acc_pre_commit, Tid, Pid}} ->
%% Kept for backwards compatibility. Remove after Mnesia 4.x
rec_acc_pre_commit(Tail, Tid, Store, Commit, Res, DumperMode,
- [Pid | GoodPids], [Pid | SchemaAckPids]);
+ [Pid | GoodPids], [Pid | AckPids]);
{?MODULE, _, {do_abort, Tid, Pid, _Reason}} ->
AbortRes = {do_abort, {bad_commit, node(Pid)}},
rec_acc_pre_commit(Tail, Tid, Store, Commit, AbortRes, DumperMode,
- GoodPids, SchemaAckPids);
+ GoodPids, AckPids);
{mnesia_down, Node} when Node == node(Pid) ->
AbortRes = {do_abort, {bad_commit, Node}},
?SAFE(Pid ! {Tid, AbortRes}), %% Tell him that he has died
rec_acc_pre_commit(Tail, Tid, Store, Commit, AbortRes, DumperMode,
- GoodPids, SchemaAckPids)
+ GoodPids, AckPids)
end;
-rec_acc_pre_commit([], Tid, Store, {Commit,OrigC}, Res, DumperMode, GoodPids, SchemaAckPids) ->
+rec_acc_pre_commit([], Tid, Store, {Commit,OrigC}, Res, DumperMode, GoodPids, AckPids) ->
D = Commit#commit.decision,
case Res of
do_commit ->
%% Now everybody knows that the others
%% has voted yes. We also know that
%% everybody are uncertain.
- prepare_sync_schema_commit(Store, SchemaAckPids),
+ prepare_sync_schema_commit(Store, AckPids),
tell_participants(GoodPids, {Tid, committed}),
D2 = D#decision{outcome = committed},
mnesia_recover:log_decision(D2),
@@ -1606,7 +1618,7 @@ rec_acc_pre_commit([], Tid, Store, {Commit,OrigC}, Res, DumperMode, GoodPids, Sc
do_commit(Tid, Commit, DumperMode),
?eval_debug_fun({?MODULE, rec_acc_pre_commit_done_commit},
[{tid, Tid}]),
- sync_schema_commit(Tid, Store, SchemaAckPids),
+ sync_schema_commit(Tid, Store, AckPids),
mnesia_locker:release_tid(Tid),
?MODULE ! {delete_transaction, Tid};
@@ -1623,6 +1635,7 @@ rec_acc_pre_commit([], Tid, Store, {Commit,OrigC}, Res, DumperMode, GoodPids, Sc
Res.
%% Note all nodes in case of mnesia_down mgt
+%% sync_schema_commit is (ab)used for sync_asym_trans as well.
prepare_sync_schema_commit(_Store, []) ->
ok;
prepare_sync_schema_commit(Store, [Pid | Pids]) ->
@@ -1648,17 +1661,17 @@ tell_participants([Pid | Pids], Msg) ->
tell_participants([], _Msg) ->
ok.
--spec commit_participant(_, _, _, _, _) -> no_return().
+-spec commit_participant(_, _, _, _, _, _) -> no_return().
%% Trap exit because we can get a shutdown from application manager
-commit_participant(Coord, Tid, Bin, DiscNs, RamNs) when is_binary(Bin) ->
+commit_participant(Protocol, Coord, Tid, Bin, DiscNs, RamNs) when is_binary(Bin) ->
process_flag(trap_exit, true),
Commit = binary_to_term(Bin),
- commit_participant(Coord, Tid, Bin, Commit, DiscNs, RamNs);
-commit_participant(Coord, Tid, C = #commit{}, DiscNs, RamNs) ->
+ commit_participant(Protocol, Coord, Tid, Bin, Commit, DiscNs, RamNs);
+commit_participant(Protocol, Coord, Tid, C = #commit{}, DiscNs, RamNs) ->
process_flag(trap_exit, true),
- commit_participant(Coord, Tid, C, C, DiscNs, RamNs).
+ commit_participant(Protocol, Coord, Tid, C, C, DiscNs, RamNs).
-commit_participant(Coord, Tid, Bin, C0, DiscNs, _RamNs) ->
+commit_participant(Protocol, Coord, Tid, Bin, C0, DiscNs, _RamNs) ->
?eval_debug_fun({?MODULE, commit_participant, pre}, [{tid, Tid}]),
try mnesia_schema:prepare_commit(Tid, C0, {part, Coord}) of
{Modified, C = #commit{}, DumperMode} ->
@@ -1683,8 +1696,9 @@ commit_participant(Coord, Tid, Bin, C0, DiscNs, _RamNs) ->
mnesia_recover:log_decision(D#decision{outcome = unclear}),
?eval_debug_fun({?MODULE, commit_participant, pre_commit},
[{tid, Tid}]),
- Expect_schema_ack = C#commit.schema_ops /= [],
- reply(Coord, {acc_pre_commit, Tid, self(), Expect_schema_ack}),
+ ExpectAck = C#commit.schema_ops /= []
+ orelse Protocol =:= sync_asym_trans,
+ reply(Coord, {acc_pre_commit, Tid, self(), ExpectAck}),
%% Now we are vulnerable for failures, since
%% we cannot decide without asking others
@@ -1694,7 +1708,7 @@ commit_participant(Coord, Tid, Bin, C0, DiscNs, _RamNs) ->
?eval_debug_fun({?MODULE, commit_participant, log_commit},
[{tid, Tid}]),
do_commit(Tid, C, DumperMode),
- case Expect_schema_ack of
+ case ExpectAck of
false -> ignore;
true -> reply(Coord, {schema_commit, Tid, self()})
end,
@@ -1978,7 +1992,7 @@ sync_send_dirty(Tid, [Head | Tail], Tab, WaitFor) ->
Res = do_dirty(Tid, Head),
{WF, Res};
true ->
- {?MODULE, Node} ! {self(), {sync_dirty, Tid, ext_format(Head), Tab}},
+ {?MODULE, Node} ! {self(), {sync_dirty, Tid, Head, Tab}},
sync_send_dirty(Tid, Tail, Tab, [Node | WaitFor])
end;
sync_send_dirty(_Tid, [], _Tab, WaitFor) ->
@@ -1997,11 +2011,11 @@ async_send_dirty(Tid, [Head | Tail], Tab, ReadNode, WaitFor, Res) ->
NewRes = do_dirty(Tid, Head),
async_send_dirty(Tid, Tail, Tab, ReadNode, WaitFor, NewRes);
ReadNode == Node ->
- {?MODULE, Node} ! {self(), {sync_dirty, Tid, ext_format(Head), Tab}},
+ {?MODULE, Node} ! {self(), {sync_dirty, Tid, Head, Tab}},
NewRes = {'EXIT', {aborted, {node_not_running, Node}}},
async_send_dirty(Tid, Tail, Tab, ReadNode, [Node | WaitFor], NewRes);
true ->
- {?MODULE, Node} ! {self(), {async_dirty, Tid, ext_format(Head), Tab}},
+ {?MODULE, Node} ! {self(), {async_dirty, Tid, Head, Tab}},
async_send_dirty(Tid, Tail, Tab, ReadNode, WaitFor, Res)
end;
async_send_dirty(_Tid, [], _Tab, _ReadNode, WaitFor, Res) ->
@@ -2058,24 +2072,20 @@ ask_commit(Protocol, Tid, [Head | Tail], DiscNs, RamNs, WaitFor, Local) ->
Node == node() ->
ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, WaitFor, Head);
true ->
- CR = ext_format(Head),
- Msg = {ask_commit, Protocol, Tid, CR, DiscNs, RamNs},
+ Msg = {ask_commit, convert_old(Protocol, Node), Tid, Head, DiscNs, RamNs},
{?MODULE, Node} ! {self(), Msg},
ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, [Node | WaitFor], Local)
end;
ask_commit(_Protocol, _Tid, [], _DiscNs, _RamNs, WaitFor, Local) ->
{WaitFor, Local}.
-ext_format(#commit{ext=[]}=CR) -> CR;
-ext_format(#commit{node=Node, ext=Ext}=CR) ->
+convert_old(sync_asym_trans, Node) ->
case mnesia_monitor:needs_protocol_conversion(Node) of
- true ->
- case lists:keyfind(snmp, 1, Ext) of
- false -> CR#commit{ext=[]};
- {snmp, List} -> CR#commit{ext=List}
- end;
- false -> CR
- end.
+ true -> asym_trans;
+ false -> sync_asym_trans
+ end;
+convert_old(Protocol, _) ->
+ Protocol.
new_cr_format(#commit{ext=[]}=Cr) -> Cr;
new_cr_format(#commit{ext=[{_,_}|_]}=Cr) -> Cr;
@@ -2304,7 +2314,7 @@ reconfigure_participants(_, []) ->
%% tell mnesia_tm on all involved nodes (including the local node)
%% about the outcome.
tell_outcome(Tid, Protocol, Node, CheckNodes, TellNodes) ->
- Outcome = mnesia_recover:what_happened(Tid, Protocol, CheckNodes),
+ Outcome = mnesia_recover:what_happened(Tid, proto(Protocol), CheckNodes),
case Outcome of
aborted ->
rpc:abcast(TellNodes, ?MODULE, {Tid,{do_abort, {mnesia_down, Node}}});
@@ -2313,6 +2323,9 @@ tell_outcome(Tid, Protocol, Node, CheckNodes, TellNodes) ->
end,
Outcome.
+proto(sync_asym_trans) -> asym_trans;
+proto(Proto) -> Proto.
+
do_stop(#state{coordinators = Coordinators}) ->
Msg = {mnesia_down, node()},
lists:foreach(fun({Tid, _}) -> Tid#tid.pid ! Msg end, gb_trees:to_list(Coordinators)),