diff options
Diffstat (limited to 'lib/mnesia/src/mnesia_tm.erl')
-rw-r--r-- | lib/mnesia/src/mnesia_tm.erl | 226 |
1 files changed, 131 insertions, 95 deletions
diff --git a/lib/mnesia/src/mnesia_tm.erl b/lib/mnesia/src/mnesia_tm.erl index 6888f61dbd..b116b48312 100644 --- a/lib/mnesia/src/mnesia_tm.erl +++ b/lib/mnesia/src/mnesia_tm.erl @@ -42,7 +42,8 @@ put_activity_id/2, block_tab/1, unblock_tab/1, - fixtable/3 + fixtable/3, + new_cr_format/1 ]). %% sys callback functions @@ -206,10 +207,10 @@ doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor= {_From, {async_dirty, Tid, Commit, Tab}} -> case lists:member(Tab, State#state.blocked_tabs) of false -> - do_async_dirty(Tid, Commit, Tab), + do_async_dirty(Tid, new_cr_format(Commit), Tab), doit_loop(State); true -> - Item = {async_dirty, Tid, Commit, Tab}, + Item = {async_dirty, Tid, new_cr_format(Commit), Tab}, State2 = State#state{dirty_queue = [Item | State#state.dirty_queue]}, doit_loop(State2) end; @@ -217,10 +218,10 @@ doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor= {From, {sync_dirty, Tid, Commit, Tab}} -> case lists:member(Tab, State#state.blocked_tabs) of false -> - do_sync_dirty(From, Tid, Commit, Tab), + do_sync_dirty(From, Tid, new_cr_format(Commit), Tab), doit_loop(State); true -> - Item = {sync_dirty, From, Tid, Commit, Tab}, + Item = {sync_dirty, From, Tid, new_cr_format(Commit), Tab}, State2 = State#state{dirty_queue = [Item | State#state.dirty_queue]}, doit_loop(State2) end; @@ -241,10 +242,11 @@ doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor= reply(From, {error, {system_limit, Msg, Reason}}, State) end; - {From, {ask_commit, Protocol, Tid, Commit, DiscNs, RamNs}} -> + {From, {ask_commit, Protocol, Tid, Commit0, DiscNs, RamNs}} -> ?eval_debug_fun({?MODULE, doit_ask_commit}, [{tid, Tid}, {prot, Protocol}]), mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs), + Commit = new_cr_format(Commit0), Pid = case Protocol of asym_trans when node(Tid#tid.pid) /= node() -> @@ -1137,14 +1139,14 @@ arrange(Tid, Store, Type) -> reverse([]) -> []; reverse([H=#commit{ram_copies=Ram, disc_copies=DC, - disc_only_copies=DOC,snmp = Snmp} + disc_only_copies=DOC, ext=Ext} |R]) -> [ H#commit{ - ram_copies = lists:reverse(Ram), - disc_copies = lists:reverse(DC), - disc_only_copies = lists:reverse(DOC), - snmp = lists:reverse(Snmp) + ram_copies = lists:reverse(Ram), + disc_copies = lists:reverse(DC), + disc_only_copies = lists:reverse(DOC), + ext = [{Type, lists:reverse(E)} || {Type,E} <- Ext] } | reverse(R)]. @@ -1311,8 +1313,13 @@ pick_node({dirty,_}, Node, [], Done) -> pick_node(_Tid, Node, [], _Done) -> mnesia:abort({bad_commit, {missing_lock, Node}}). -prepare_node(Node, Storage, [Item | Items], Rec, Kind) when Kind == snmp -> - Rec2 = Rec#commit{snmp = [Item | Rec#commit.snmp]}, +prepare_node(Node, Storage, [Item | Items], #commit{ext=Ext0}=Rec, Kind) when Kind == snmp -> + Rec2 = case lists:keytake(snmp, 1, Ext0) of + false -> + Rec#commit{ext = [{snmp,[Item]}|Ext0]}; + {_, {snmp,Snmp},Ext} -> + Rec#commit{ext = [{snmp,[Item|Snmp]}|Ext]} + end, prepare_node(Node, Storage, Items, Rec2, Kind); prepare_node(Node, Storage, [Item | Items], Rec, Kind) when Kind /= schema -> Rec2 = @@ -1323,7 +1330,15 @@ prepare_node(Node, Storage, [Item | Items], Rec, Kind) when Kind /= schema -> Rec#commit{disc_copies = [Item | Rec#commit.disc_copies]}; disc_only_copies -> Rec#commit{disc_only_copies = - [Item | Rec#commit.disc_only_copies]} + [Item | Rec#commit.disc_only_copies]}; + {ext, Alias, Mod} -> + Ext0 = Rec#commit.ext, + case lists:keytake(ext_copies, 1, Ext0) of + false -> + Rec#commit{ext = [{ext_copies, [{{ext,Alias,Mod}, Item}]}|Ext0]}; + {_,{_,EC},Ext} -> + Rec#commit{ext = [{ext_copies, [{{ext,Alias,Mod}, Item}|EC]}|Ext]} + end end, prepare_node(Node, Storage, Items, Rec2, Kind); prepare_node(_Node, _Storage, Items, Rec, Kind) @@ -1750,16 +1765,30 @@ do_commit(Tid, Bin) when is_binary(Bin) -> do_commit(Tid, binary_to_term(Bin)); do_commit(Tid, C) -> do_commit(Tid, C, optional). + do_commit(Tid, Bin, DumperMode) when is_binary(Bin) -> do_commit(Tid, binary_to_term(Bin), DumperMode); do_commit(Tid, C, DumperMode) -> mnesia_dumper:update(Tid, C#commit.schema_ops, DumperMode), - R = do_snmp(Tid, C#commit.snmp), + R = do_snmp(Tid, proplists:get_value(snmp, C#commit.ext, [])), R2 = do_update(Tid, ram_copies, C#commit.ram_copies, R), R3 = do_update(Tid, disc_copies, C#commit.disc_copies, R2), R4 = do_update(Tid, disc_only_copies, C#commit.disc_only_copies, R3), + R5 = do_update_ext(Tid, C#commit.ext, R4), mnesia_subscr:report_activity(Tid), - R4. + R5. + +%% This could/should be optimized +do_update_ext(_Tid, [], OldRes) -> OldRes; +do_update_ext(Tid, Ext, OldRes) -> + case lists:keyfind(ext_copies, 1, Ext) of + false -> OldRes; + {_, Ops} -> + Do = fun({{ext, _,_} = Storage, Op}, R) -> + do_update(Tid, Storage, [Op], R) + end, + lists:foldl(Do, OldRes, Ops) + end. %% Update the items do_update(Tid, Storage, [Op | Ops], OldRes) -> @@ -1782,12 +1811,12 @@ do_update(_Tid, _Storage, [], Res) -> Res. do_update_op(Tid, Storage, {{Tab, K}, Obj, write}) -> - commit_write(?catch_val({Tab, commit_work}), Tid, + commit_write(?catch_val({Tab, commit_work}), Tid, Storage, Tab, K, Obj, undefined), mnesia_lib:db_put(Storage, Tab, Obj); do_update_op(Tid, Storage, {{Tab, K}, Val, delete}) -> - commit_delete(?catch_val({Tab, commit_work}), Tid, Tab, K, Val, undefined), + commit_delete(?catch_val({Tab, commit_work}), Tid, Storage, Tab, K, Val, undefined), mnesia_lib:db_erase(Storage, Tab, K); do_update_op(Tid, Storage, {{Tab, K}, {RecName, Incr}, update_counter}) -> @@ -1805,86 +1834,84 @@ do_update_op(Tid, Storage, {{Tab, K}, {RecName, Incr}, update_counter}) -> mnesia_lib:db_put(Storage, Tab, Zero), {Zero, []} end, - commit_update(?catch_val({Tab, commit_work}), Tid, Tab, + commit_update(?catch_val({Tab, commit_work}), Tid, Storage, Tab, K, NewObj, OldObjs), element(3, NewObj); do_update_op(Tid, Storage, {{Tab, Key}, Obj, delete_object}) -> commit_del_object(?catch_val({Tab, commit_work}), - Tid, Tab, Key, Obj, undefined), + Tid, Storage, Tab, Key, Obj), mnesia_lib:db_match_erase(Storage, Tab, Obj); do_update_op(Tid, Storage, {{Tab, Key}, Obj, clear_table}) -> - commit_clear(?catch_val({Tab, commit_work}), Tid, Tab, Key, Obj), + commit_clear(?catch_val({Tab, commit_work}), Tid, Storage, Tab, Key, Obj), mnesia_lib:db_match_erase(Storage, Tab, Obj). -commit_write([], _, _, _, _, _) -> ok; -commit_write([{checkpoints, CpList}|R], Tid, Tab, K, Obj, Old) -> +commit_write([], _, _, _, _, _, _) -> ok; +commit_write([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj, Old) -> mnesia_checkpoint:tm_retain(Tid, Tab, K, write, CpList), - commit_write(R, Tid, Tab, K, Obj, Old); -commit_write([H|R], Tid, Tab, K, Obj, Old) + commit_write(R, Tid, Storage, Tab, K, Obj, Old); +commit_write([H|R], Tid, Storage, Tab, K, Obj, Old) when element(1, H) == subscribers -> mnesia_subscr:report_table_event(H, Tab, Tid, Obj, write, Old), - commit_write(R, Tid, Tab, K, Obj, Old); -commit_write([H|R], Tid, Tab, K, Obj, Old) + commit_write(R, Tid, Storage, Tab, K, Obj, Old); +commit_write([H|R], Tid, Storage, Tab, K, Obj, Old) when element(1, H) == index -> - mnesia_index:add_index(H, Tab, K, Obj, Old), - commit_write(R, Tid, Tab, K, Obj, Old). + mnesia_index:add_index(H, Storage, Tab, K, Obj, Old), + commit_write(R, Tid, Storage, Tab, K, Obj, Old). -commit_update([], _, _, _, _, _) -> ok; -commit_update([{checkpoints, CpList}|R], Tid, Tab, K, Obj, _) -> +commit_update([], _, _, _, _, _, _) -> ok; +commit_update([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj, _) -> Old = mnesia_checkpoint:tm_retain(Tid, Tab, K, write, CpList), - commit_update(R, Tid, Tab, K, Obj, Old); -commit_update([H|R], Tid, Tab, K, Obj, Old) + commit_update(R, Tid, Storage, Tab, K, Obj, Old); +commit_update([H|R], Tid, Storage, Tab, K, Obj, Old) when element(1, H) == subscribers -> mnesia_subscr:report_table_event(H, Tab, Tid, Obj, write, Old), - commit_update(R, Tid, Tab, K, Obj, Old); -commit_update([H|R], Tid, Tab, K, Obj, Old) + commit_update(R, Tid, Storage, Tab, K, Obj, Old); +commit_update([H|R], Tid,Storage, Tab, K, Obj, Old) when element(1, H) == index -> - mnesia_index:add_index(H, Tab, K, Obj, Old), - commit_update(R, Tid, Tab, K, Obj, Old). + mnesia_index:add_index(H, Storage, Tab, K, Obj, Old), + commit_update(R, Tid, Storage, Tab, K, Obj, Old). -commit_delete([], _, _, _, _, _) -> ok; -commit_delete([{checkpoints, CpList}|R], Tid, Tab, K, Obj, _) -> +commit_delete([], _, _, _, _, _, _) -> ok; +commit_delete([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj, _) -> Old = mnesia_checkpoint:tm_retain(Tid, Tab, K, delete, CpList), - commit_delete(R, Tid, Tab, K, Obj, Old); -commit_delete([H|R], Tid, Tab, K, Obj, Old) + commit_delete(R, Tid, Storage, Tab, K, Obj, Old); +commit_delete([H|R], Tid, Storage, Tab, K, Obj, Old) when element(1, H) == subscribers -> mnesia_subscr:report_table_event(H, Tab, Tid, Obj, delete, Old), - commit_delete(R, Tid, Tab, K, Obj, Old); -commit_delete([H|R], Tid, Tab, K, Obj, Old) + commit_delete(R, Tid, Storage, Tab, K, Obj, Old); +commit_delete([H|R], Tid, Storage, Tab, K, Obj, Old) when element(1, H) == index -> - mnesia_index:delete_index(H, Tab, K), - commit_delete(R, Tid, Tab, K, Obj, Old). + mnesia_index:delete_index(H, Storage, Tab, K), + commit_delete(R, Tid, Storage, Tab, K, Obj, Old). commit_del_object([], _, _, _, _, _) -> ok; -commit_del_object([{checkpoints, CpList}|R], Tid, Tab, K, Obj, _) -> - Old = mnesia_checkpoint:tm_retain(Tid, Tab, K, delete_object, CpList), - commit_del_object(R, Tid, Tab, K, Obj, Old); -commit_del_object([H|R], Tid, Tab, K, Obj, Old) - when element(1, H) == subscribers -> - mnesia_subscr:report_table_event(H, Tab, Tid, Obj, delete_object, Old), - commit_del_object(R, Tid, Tab, K, Obj, Old); -commit_del_object([H|R], Tid, Tab, K, Obj, Old) - when element(1, H) == index -> - mnesia_index:del_object_index(H, Tab, K, Obj, Old), - commit_del_object(R, Tid, Tab, K, Obj, Old). - -commit_clear([], _, _, _, _) -> ok; -commit_clear([{checkpoints, CpList}|R], Tid, Tab, K, Obj) -> +commit_del_object([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj) -> + mnesia_checkpoint:tm_retain(Tid, Tab, K, delete_object, CpList), + commit_del_object(R, Tid, Storage, Tab, K, Obj); +commit_del_object([H|R], Tid, Storage, Tab, K, Obj) when element(1, H) == subscribers -> + mnesia_subscr:report_table_event(H, Tab, Tid, Obj, delete_object), + commit_del_object(R, Tid, Storage, Tab, K, Obj); +commit_del_object([H|R], Tid, Storage, Tab, K, Obj) when element(1, H) == index -> + mnesia_index:del_object_index(H, Storage, Tab, K, Obj), + commit_del_object(R, Tid, Storage, Tab, K, Obj). + +commit_clear([], _, _, _, _, _) -> ok; +commit_clear([{checkpoints, CpList}|R], Tid, Storage, Tab, K, Obj) -> mnesia_checkpoint:tm_retain(Tid, Tab, K, clear_table, CpList), - commit_clear(R, Tid, Tab, K, Obj); -commit_clear([H|R], Tid, Tab, K, Obj) + commit_clear(R, Tid, Storage, Tab, K, Obj); +commit_clear([H|R], Tid, Storage, Tab, K, Obj) when element(1, H) == subscribers -> mnesia_subscr:report_table_event(H, Tab, Tid, Obj, clear_table, undefined), - commit_clear(R, Tid, Tab, K, Obj); -commit_clear([H|R], Tid, Tab, K, Obj) + commit_clear(R, Tid, Storage, Tab, K, Obj); +commit_clear([H|R], Tid, Storage, Tab, K, Obj) when element(1, H) == index -> mnesia_index:clear_index(H, Tab, K, Obj), - commit_clear(R, Tid, Tab, K, Obj). + commit_clear(R, Tid, Storage, Tab, K, Obj). do_snmp(_, []) -> ok; -do_snmp(Tid, [Head | Tail]) -> +do_snmp(Tid, [Head|Tail]) -> try mnesia_snmp_hook:update(Head) catch _:Reason -> %% This should only happen when we recently have @@ -1896,31 +1923,34 @@ do_snmp(Tid, [Head | Tail]) -> end, do_snmp(Tid, Tail). -commit_nodes([C | Tail], AccD, AccR) - when C#commit.disc_copies == [], - C#commit.disc_only_copies == [], - C#commit.schema_ops == [] -> - commit_nodes(Tail, AccD, [C#commit.node | AccR]); commit_nodes([C | Tail], AccD, AccR) -> - commit_nodes(Tail, [C#commit.node | AccD], AccR); + case C of + #commit{disc_copies=[], disc_only_copies=[], schema_ops=[], ext=Ext} -> + case lists:keyfind(ext_copies, 1, Ext) of + false -> commit_nodes(Tail, AccD, [C#commit.node | AccR]); + _ -> commit_nodes(Tail, [C#commit.node | AccD], AccR) + end; + _ -> + commit_nodes(Tail, [C#commit.node | AccD], AccR) + end; commit_nodes([], AccD, AccR) -> {AccD, AccR}. commit_decision(D, [C | Tail], AccD, AccR) -> N = C#commit.node, {D2, Tail2} = - case C#commit.schema_ops of - [] when C#commit.disc_copies == [], - C#commit.disc_only_copies == [] -> - commit_decision(D, Tail, AccD, [N | AccR]); - [] -> + case C of + #commit{disc_copies=[], disc_only_copies=[], schema_ops=[], ext=Ext} -> + case lists:keyfind(ext_copies, 1, Ext) of + false -> commit_decision(D, Tail, AccD, [N | AccR]); + _ -> commit_decision(D, Tail, [N | AccD], AccR) + end; + #commit{schema_ops=[]} -> commit_decision(D, Tail, [N | AccD], AccR); - Ops -> + #commit{schema_ops=Ops} -> case ram_only_ops(N, Ops) of - true -> - commit_decision(D, Tail, AccD, [N | AccR]); - false -> - commit_decision(D, Tail, [N | AccD], AccR) + true -> commit_decision(D, Tail, AccD, [N | AccR]); + false -> commit_decision(D, Tail, [N | AccD], AccR) end end, {D2, [C#commit{decision = D2} | Tail2]}; @@ -1948,7 +1978,7 @@ sync_send_dirty(Tid, [Head | Tail], Tab, WaitFor) -> Res = do_dirty(Tid, Head), {WF, Res}; true -> - {?MODULE, Node} ! {self(), {sync_dirty, Tid, Head, Tab}}, + {?MODULE, Node} ! {self(), {sync_dirty, Tid, ext_format(Head), Tab}}, sync_send_dirty(Tid, Tail, Tab, [Node | WaitFor]) end; sync_send_dirty(_Tid, [], _Tab, WaitFor) -> @@ -1967,11 +1997,11 @@ async_send_dirty(Tid, [Head | Tail], Tab, ReadNode, WaitFor, Res) -> NewRes = do_dirty(Tid, Head), async_send_dirty(Tid, Tail, Tab, ReadNode, WaitFor, NewRes); ReadNode == Node -> - {?MODULE, Node} ! {self(), {sync_dirty, Tid, Head, Tab}}, + {?MODULE, Node} ! {self(), {sync_dirty, Tid, ext_format(Head), Tab}}, NewRes = {'EXIT', {aborted, {node_not_running, Node}}}, async_send_dirty(Tid, Tail, Tab, ReadNode, [Node | WaitFor], NewRes); true -> - {?MODULE, Node} ! {self(), {async_dirty, Tid, Head, Tab}}, + {?MODULE, Node} ! {self(), {async_dirty, Tid, ext_format(Head), Tab}}, async_send_dirty(Tid, Tail, Tab, ReadNode, WaitFor, Res) end; async_send_dirty(_Tid, [], _Tab, _ReadNode, WaitFor, Res) -> @@ -2028,23 +2058,29 @@ ask_commit(Protocol, Tid, [Head | Tail], DiscNs, RamNs, WaitFor, Local) -> Node == node() -> ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, WaitFor, Head); true -> - Bin = opt_term_to_binary(Protocol, Head, DiscNs++RamNs), - Msg = {ask_commit, Protocol, Tid, Bin, DiscNs, RamNs}, + CR = ext_format(Head), + Msg = {ask_commit, Protocol, Tid, CR, DiscNs, RamNs}, {?MODULE, Node} ! {self(), Msg}, ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, [Node | WaitFor], Local) end; ask_commit(_Protocol, _Tid, [], _DiscNs, _RamNs, WaitFor, Local) -> {WaitFor, Local}. -%% This used to test protocol conversion between mnesia-nodes -%% but it is really dependent on the emulator version on the -%% two nodes (if funs are sent which they are in transform table op). -%% to be safe we let erts do the translation (many times maybe and thus -%% slower but it works. -% opt_term_to_binary(asym_trans, Head, Nodes) -> -% opt_term_to_binary(Nodes, Head); -opt_term_to_binary(_Protocol, Head, _Nodes) -> - Head. +ext_format(#commit{ext=[]}=CR) -> CR; +ext_format(#commit{node=Node, ext=Ext}=CR) -> + case mnesia_monitor:needs_protocol_conversion(Node) of + true -> + case lists:keyfind(snmp, 1, Ext) of + false -> CR#commit{ext=[]}; + {snmp, List} -> CR#commit{ext=List} + end; + false -> CR + end. + +new_cr_format(#commit{ext=[]}=Cr) -> Cr; +new_cr_format(#commit{ext=[{_,_}|_]}=Cr) -> Cr; +new_cr_format(#commit{ext=Snmp}=Cr) -> + Cr#commit{ext=[{snmp,Snmp}]}. rec_all([Node | Tail], Tid, Res, Pids) -> receive |