diff options
Diffstat (limited to 'lib/mnesia/src/mnesia_tm.erl')
-rw-r--r-- | lib/mnesia/src/mnesia_tm.erl | 99 |
1 files changed, 56 insertions, 43 deletions
diff --git a/lib/mnesia/src/mnesia_tm.erl b/lib/mnesia/src/mnesia_tm.erl index 8b79fca1d7..8a4113422a 100644 --- a/lib/mnesia/src/mnesia_tm.erl +++ b/lib/mnesia/src/mnesia_tm.erl @@ -26,7 +26,7 @@ init/1, non_transaction/5, transaction/6, - commit_participant/5, + commit_participant/6, dirty/2, display_info/2, do_update_op/3, @@ -62,13 +62,14 @@ %% Format on coordinators is [{Tid, EtsTabList} ..... -record(prep, {protocol = sym_trans, - %% async_dirty | sync_dirty | sym_trans | sync_sym_trans | asym_trans + %% async_dirty | sync_dirty | sym_trans | sync_sym_trans | asym_trans | sync_asym_trans records = [], prev_tab = [], % initiate to a non valid table name prev_types, prev_snmp, types, - majority = [] + majority = [], + sync = false }). -record(participant, {tid, pid, commit, disc_nodes = [], @@ -250,11 +251,13 @@ doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor= mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs), Commit = new_cr_format(Commit0), Pid = - case Protocol of - asym_trans when node(Tid#tid.pid) /= node() -> - Args = [tmpid(From), Tid, Commit, DiscNs, RamNs], + if + node(Tid#tid.pid) =:= node() -> + error({internal_error, local_node}); + Protocol =:= asym_trans orelse Protocol =:= sync_asym_trans -> + Args = [Protocol, tmpid(From), Tid, Commit, DiscNs, RamNs], spawn_link(?MODULE, commit_participant, Args); - _ when node(Tid#tid.pid) /= node() -> %% *_sym_trans + true -> %% *_sym_trans reply(From, {vote_yes, Tid}), nopid end, @@ -1190,7 +1193,15 @@ do_arrange(Tid, Store, RestoreKey, Prep, N) when RestoreKey == restore_op -> P2 = Prep#prep{protocol = asym_trans, records = Recs2}, do_arrange(Tid, Store, ?ets_next(Store, RestoreKey), P2, N + 1); do_arrange(_Tid, _Store, '$end_of_table', Prep, N) -> - {N, Prep}; + case Prep of + #prep{sync=true, protocol=asym_trans} -> + {N, Prep#prep{protocol=sync_asym_trans}}; + _ -> + {N, Prep} + end; +do_arrange(Tid, Store, sticky, Prep, N) -> + P2 = Prep#prep{sync=true}, + do_arrange(Tid, Store, ?ets_next(Store, sticky), P2, N); do_arrange(Tid, Store, IgnoredKey, Prep, N) -> %% locks, nodes ... local atoms... do_arrange(Tid, Store, ?ets_next(Store, IgnoredKey), Prep, N). @@ -1448,7 +1459,8 @@ multi_commit(sync_sym_trans, _Maj = [], Tid, CR, Store) -> [{tid, Tid}, {outcome, Outcome}]), Outcome; -multi_commit(asym_trans, Majority, Tid, CR, Store) -> +multi_commit(Protocol, Majority, Tid, CR, Store) + when Protocol =:= asym_trans; Protocol =:= sync_asym_trans -> %% This more expensive commit protocol is used when %% table definitions are changed (schema transactions). %% It is also used when the involved tables are @@ -1515,7 +1527,7 @@ multi_commit(asym_trans, Majority, Tid, CR, Store) -> end, Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs), ?ets_insert(Store, Pending), - {WaitFor, Local} = ask_commit(asym_trans, Tid, CR2, DiscNs, RamNs), + {WaitFor, Local} = ask_commit(Protocol, Tid, CR2, DiscNs, RamNs), SchemaPrep = ?CATCH(mnesia_schema:prepare_commit(Tid, Local, {coord, WaitFor})), {Votes, Pids} = rec_all(WaitFor, Tid, do_commit, []), @@ -1563,38 +1575,38 @@ multi_commit(asym_trans, Majority, Tid, CR, Store) -> %% Returns do_commit or {do_abort, Reason} rec_acc_pre_commit([Pid | Tail], Tid, Store, Commit, Res, DumperMode, - GoodPids, SchemaAckPids) -> + GoodPids, AckPids) -> receive {?MODULE, _, {acc_pre_commit, Tid, Pid, true}} -> rec_acc_pre_commit(Tail, Tid, Store, Commit, Res, DumperMode, - [Pid | GoodPids], [Pid | SchemaAckPids]); + [Pid | GoodPids], [Pid | AckPids]); {?MODULE, _, {acc_pre_commit, Tid, Pid, false}} -> rec_acc_pre_commit(Tail, Tid, Store, Commit, Res, DumperMode, - [Pid | GoodPids], SchemaAckPids); + [Pid | GoodPids], AckPids); {?MODULE, _, {acc_pre_commit, Tid, Pid}} -> %% Kept for backwards compatibility. Remove after Mnesia 4.x rec_acc_pre_commit(Tail, Tid, Store, Commit, Res, DumperMode, - [Pid | GoodPids], [Pid | SchemaAckPids]); + [Pid | GoodPids], [Pid | AckPids]); {?MODULE, _, {do_abort, Tid, Pid, _Reason}} -> AbortRes = {do_abort, {bad_commit, node(Pid)}}, rec_acc_pre_commit(Tail, Tid, Store, Commit, AbortRes, DumperMode, - GoodPids, SchemaAckPids); + GoodPids, AckPids); {mnesia_down, Node} when Node == node(Pid) -> AbortRes = {do_abort, {bad_commit, Node}}, ?SAFE(Pid ! {Tid, AbortRes}), %% Tell him that he has died rec_acc_pre_commit(Tail, Tid, Store, Commit, AbortRes, DumperMode, - GoodPids, SchemaAckPids) + GoodPids, AckPids) end; -rec_acc_pre_commit([], Tid, Store, {Commit,OrigC}, Res, DumperMode, GoodPids, SchemaAckPids) -> +rec_acc_pre_commit([], Tid, Store, {Commit,OrigC}, Res, DumperMode, GoodPids, AckPids) -> D = Commit#commit.decision, case Res of do_commit -> %% Now everybody knows that the others %% has voted yes. We also know that %% everybody are uncertain. - prepare_sync_schema_commit(Store, SchemaAckPids), + prepare_sync_schema_commit(Store, AckPids), tell_participants(GoodPids, {Tid, committed}), D2 = D#decision{outcome = committed}, mnesia_recover:log_decision(D2), @@ -1606,7 +1618,7 @@ rec_acc_pre_commit([], Tid, Store, {Commit,OrigC}, Res, DumperMode, GoodPids, Sc do_commit(Tid, Commit, DumperMode), ?eval_debug_fun({?MODULE, rec_acc_pre_commit_done_commit}, [{tid, Tid}]), - sync_schema_commit(Tid, Store, SchemaAckPids), + sync_schema_commit(Tid, Store, AckPids), mnesia_locker:release_tid(Tid), ?MODULE ! {delete_transaction, Tid}; @@ -1623,6 +1635,7 @@ rec_acc_pre_commit([], Tid, Store, {Commit,OrigC}, Res, DumperMode, GoodPids, Sc Res. %% Note all nodes in case of mnesia_down mgt +%% sync_schema_commit is (ab)used for sync_asym_trans as well. prepare_sync_schema_commit(_Store, []) -> ok; prepare_sync_schema_commit(Store, [Pid | Pids]) -> @@ -1648,17 +1661,17 @@ tell_participants([Pid | Pids], Msg) -> tell_participants([], _Msg) -> ok. --spec commit_participant(_, _, _, _, _) -> no_return(). +-spec commit_participant(_, _, _, _, _, _) -> no_return(). %% Trap exit because we can get a shutdown from application manager -commit_participant(Coord, Tid, Bin, DiscNs, RamNs) when is_binary(Bin) -> +commit_participant(Protocol, Coord, Tid, Bin, DiscNs, RamNs) when is_binary(Bin) -> process_flag(trap_exit, true), Commit = binary_to_term(Bin), - commit_participant(Coord, Tid, Bin, Commit, DiscNs, RamNs); -commit_participant(Coord, Tid, C = #commit{}, DiscNs, RamNs) -> + commit_participant(Protocol, Coord, Tid, Bin, Commit, DiscNs, RamNs); +commit_participant(Protocol, Coord, Tid, C = #commit{}, DiscNs, RamNs) -> process_flag(trap_exit, true), - commit_participant(Coord, Tid, C, C, DiscNs, RamNs). + commit_participant(Protocol, Coord, Tid, C, C, DiscNs, RamNs). -commit_participant(Coord, Tid, Bin, C0, DiscNs, _RamNs) -> +commit_participant(Protocol, Coord, Tid, Bin, C0, DiscNs, _RamNs) -> ?eval_debug_fun({?MODULE, commit_participant, pre}, [{tid, Tid}]), try mnesia_schema:prepare_commit(Tid, C0, {part, Coord}) of {Modified, C = #commit{}, DumperMode} -> @@ -1683,8 +1696,9 @@ commit_participant(Coord, Tid, Bin, C0, DiscNs, _RamNs) -> mnesia_recover:log_decision(D#decision{outcome = unclear}), ?eval_debug_fun({?MODULE, commit_participant, pre_commit}, [{tid, Tid}]), - Expect_schema_ack = C#commit.schema_ops /= [], - reply(Coord, {acc_pre_commit, Tid, self(), Expect_schema_ack}), + ExpectAck = C#commit.schema_ops /= [] + orelse Protocol =:= sync_asym_trans, + reply(Coord, {acc_pre_commit, Tid, self(), ExpectAck}), %% Now we are vulnerable for failures, since %% we cannot decide without asking others @@ -1694,7 +1708,7 @@ commit_participant(Coord, Tid, Bin, C0, DiscNs, _RamNs) -> ?eval_debug_fun({?MODULE, commit_participant, log_commit}, [{tid, Tid}]), do_commit(Tid, C, DumperMode), - case Expect_schema_ack of + case ExpectAck of false -> ignore; true -> reply(Coord, {schema_commit, Tid, self()}) end, @@ -1978,7 +1992,7 @@ sync_send_dirty(Tid, [Head | Tail], Tab, WaitFor) -> Res = do_dirty(Tid, Head), {WF, Res}; true -> - {?MODULE, Node} ! {self(), {sync_dirty, Tid, ext_format(Head), Tab}}, + {?MODULE, Node} ! {self(), {sync_dirty, Tid, Head, Tab}}, sync_send_dirty(Tid, Tail, Tab, [Node | WaitFor]) end; sync_send_dirty(_Tid, [], _Tab, WaitFor) -> @@ -1997,11 +2011,11 @@ async_send_dirty(Tid, [Head | Tail], Tab, ReadNode, WaitFor, Res) -> NewRes = do_dirty(Tid, Head), async_send_dirty(Tid, Tail, Tab, ReadNode, WaitFor, NewRes); ReadNode == Node -> - {?MODULE, Node} ! {self(), {sync_dirty, Tid, ext_format(Head), Tab}}, + {?MODULE, Node} ! {self(), {sync_dirty, Tid, Head, Tab}}, NewRes = {'EXIT', {aborted, {node_not_running, Node}}}, async_send_dirty(Tid, Tail, Tab, ReadNode, [Node | WaitFor], NewRes); true -> - {?MODULE, Node} ! {self(), {async_dirty, Tid, ext_format(Head), Tab}}, + {?MODULE, Node} ! {self(), {async_dirty, Tid, Head, Tab}}, async_send_dirty(Tid, Tail, Tab, ReadNode, WaitFor, Res) end; async_send_dirty(_Tid, [], _Tab, _ReadNode, WaitFor, Res) -> @@ -2058,24 +2072,20 @@ ask_commit(Protocol, Tid, [Head | Tail], DiscNs, RamNs, WaitFor, Local) -> Node == node() -> ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, WaitFor, Head); true -> - CR = ext_format(Head), - Msg = {ask_commit, Protocol, Tid, CR, DiscNs, RamNs}, + Msg = {ask_commit, convert_old(Protocol, Node), Tid, Head, DiscNs, RamNs}, {?MODULE, Node} ! {self(), Msg}, ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, [Node | WaitFor], Local) end; ask_commit(_Protocol, _Tid, [], _DiscNs, _RamNs, WaitFor, Local) -> {WaitFor, Local}. -ext_format(#commit{ext=[]}=CR) -> CR; -ext_format(#commit{node=Node, ext=Ext}=CR) -> +convert_old(sync_asym_trans, Node) -> case mnesia_monitor:needs_protocol_conversion(Node) of - true -> - case lists:keyfind(snmp, 1, Ext) of - false -> CR#commit{ext=[]}; - {snmp, List} -> CR#commit{ext=List} - end; - false -> CR - end. + true -> asym_trans; + false -> sync_asym_trans + end; +convert_old(Protocol, _) -> + Protocol. new_cr_format(#commit{ext=[]}=Cr) -> Cr; new_cr_format(#commit{ext=[{_,_}|_]}=Cr) -> Cr; @@ -2304,7 +2314,7 @@ reconfigure_participants(_, []) -> %% tell mnesia_tm on all involved nodes (including the local node) %% about the outcome. tell_outcome(Tid, Protocol, Node, CheckNodes, TellNodes) -> - Outcome = mnesia_recover:what_happened(Tid, Protocol, CheckNodes), + Outcome = mnesia_recover:what_happened(Tid, proto(Protocol), CheckNodes), case Outcome of aborted -> rpc:abcast(TellNodes, ?MODULE, {Tid,{do_abort, {mnesia_down, Node}}}); @@ -2313,6 +2323,9 @@ tell_outcome(Tid, Protocol, Node, CheckNodes, TellNodes) -> end, Outcome. +proto(sync_asym_trans) -> asym_trans; +proto(Proto) -> Proto. + do_stop(#state{coordinators = Coordinators}) -> Msg = {mnesia_down, node()}, lists:foreach(fun({Tid, _}) -> Tid#tid.pid ! Msg end, gb_trees:to_list(Coordinators)), |