aboutsummaryrefslogtreecommitdiffstats
path: root/lib/mnesia/src
diff options
context:
space:
mode:
authorDan Gudmundsson <[email protected]>2019-08-09 12:52:59 +0200
committerDan Gudmundsson <[email protected]>2019-08-09 12:52:59 +0200
commitd48dd93bee87d039b80553031e7fe6625feff908 (patch)
tree87a837c49ef8255c0df11a0d811f6166d6b0d030 /lib/mnesia/src
parent29a73cf252143a09457a22dca433c4486d1c497b (diff)
parent7e92eb6666f3d73eb683516bc4b74b827953ec4a (diff)
downloadotp-d48dd93bee87d039b80553031e7fe6625feff908.tar.gz
otp-d48dd93bee87d039b80553031e7fe6625feff908.tar.bz2
otp-d48dd93bee87d039b80553031e7fe6625feff908.zip
Merge branch 'dgud/mnesia/sticky-bug/ERL-768/OTP-15979' into maint
* dgud/mnesia/sticky-bug/ERL-768/OTP-15979: mnesia: Bump protocol version mnesia: Introduce sync_asym_trans protocol
Diffstat (limited to 'lib/mnesia/src')
-rw-r--r--lib/mnesia/src/mnesia_locker.erl2
-rw-r--r--lib/mnesia/src/mnesia_monitor.erl6
-rw-r--r--lib/mnesia/src/mnesia_schema.erl68
-rw-r--r--lib/mnesia/src/mnesia_tm.erl99
4 files changed, 71 insertions, 104 deletions
diff --git a/lib/mnesia/src/mnesia_locker.erl b/lib/mnesia/src/mnesia_locker.erl
index f68626413e..0222c5b1a0 100644
--- a/lib/mnesia/src/mnesia_locker.erl
+++ b/lib/mnesia/src/mnesia_locker.erl
@@ -774,10 +774,12 @@ do_sticky_lock(Tid, Store, {Tab, Key} = Oid, Lock) ->
N = node(),
receive
{?MODULE, N, granted} ->
+ ?ets_insert(Store, {sticky, true}),
?ets_insert(Store, {{locks, Tab, Key}, write}),
[?ets_insert(Store, {nodes, Node}) || Node <- WNodes],
granted;
{?MODULE, N, {granted, Val}} -> %% for rwlocks
+ ?ets_insert(Store, {sticky, true}),
case opt_lookup_in_client(Val, Oid, write) of
C = #cyclic{} ->
exit({aborted, C});
diff --git a/lib/mnesia/src/mnesia_monitor.erl b/lib/mnesia/src/mnesia_monitor.erl
index 4cfe16dec0..4e50b46da8 100644
--- a/lib/mnesia/src/mnesia_monitor.erl
+++ b/lib/mnesia/src/mnesia_monitor.erl
@@ -83,9 +83,9 @@
going_down = [], tm_started = false, early_connects = [],
connecting, mq = [], remote_node_status = []}).
--define(current_protocol_version, {8,3}).
+-define(current_protocol_version, {8,4}).
--define(previous_protocol_version, {8,2}).
+-define(previous_protocol_version, {8,3}).
start() ->
gen_server:start_link({local, ?MODULE}, ?MODULE,
@@ -196,7 +196,7 @@ protocol_version() ->
%% A sorted list of acceptable protocols the
%% preferred protocols are first in the list
acceptable_protocol_versions() ->
- [protocol_version(), ?previous_protocol_version, {8,1}].
+ [protocol_version(), ?previous_protocol_version].
needs_protocol_conversion(Node) ->
case {?catch_val({protocol, Node}), protocol_version()} of
diff --git a/lib/mnesia/src/mnesia_schema.erl b/lib/mnesia/src/mnesia_schema.erl
index 4a86aeb375..d0f5d0e07b 100644
--- a/lib/mnesia/src/mnesia_schema.erl
+++ b/lib/mnesia/src/mnesia_schema.erl
@@ -730,7 +730,10 @@ api_list2cs(Other) ->
mnesia:abort({badarg, Other}).
vsn_cs2list(Cs) ->
- cs2list(need_old_cstructs(), Cs).
+ cs2list(Cs).
+
+cs2list(false, Cs) ->
+ cs2list(Cs).
cs2list(Cs) when is_record(Cs, cstruct) ->
Tags = record_info(fields, cstruct),
@@ -755,25 +758,6 @@ cs2list(Cs) when element(1, Cs) == cstruct, tuple_size(Cs) == 19 ->
cookie,version],
rec2list(Tags, Tags, 2, Cs).
-cs2list(false, Cs) ->
- cs2list(Cs);
-cs2list({8,3}, Cs) ->
- cs2list(Cs);
-cs2list({8,Minor}, Cs) when Minor =:= 2; Minor =:= 1 ->
- Orig = record_info(fields, cstruct),
- Tags = [name,type,ram_copies,disc_copies,disc_only_copies,
- load_order,access_mode,majority,index,snmp,local_content,
- record_name,attributes,
- user_properties,frag_properties,storage_properties,
- cookie,version],
- CsList = rec2list(Tags, Orig, 2, Cs),
- case proplists:get_value(index, CsList, []) of
- [] -> CsList;
- NewFormat ->
- OldFormat = [Pos || {Pos, _Pref} <- NewFormat],
- lists:keyreplace(index, 1, CsList, {index, OldFormat})
- end.
-
rec2list([index | Tags], [index|Orig], Pos, Rec) ->
Val = element(Pos, Rec),
[{index, lists:map(
@@ -796,19 +780,8 @@ rec2list([], _, _Pos, _Rec) ->
rec2list(Tags, [_|Orig], Pos, Rec) ->
rec2list(Tags, Orig, Pos+1, Rec).
-normalize_cs(Cstructs, Node) ->
- %% backward-compatibility hack; normalize before returning
- case need_old_cstructs([Node]) of
- false ->
- Cstructs;
- Version ->
- %% some other format
- [convert_cs(Version, Cs) || Cs <- Cstructs]
- end.
-
-convert_cs(Version, Cs) ->
- Fields = [Value || {_, Value} <- cs2list(Version, Cs)],
- list_to_tuple([cstruct|Fields]).
+normalize_cs(Cstructs, _Node) ->
+ Cstructs.
list2cs(List) ->
list2cs(List, get_ext_types()).
@@ -1864,11 +1837,7 @@ do_move_table(schema, _FromNode, _ToNode) ->
mnesia:abort({bad_type, schema});
do_move_table(Tab, FromNode, ToNode) when is_atom(FromNode), is_atom(ToNode) ->
TidTs = get_tid_ts_and_lock(schema, write),
- AnyOld = lists:any(fun(Node) -> mnesia_monitor:needs_protocol_conversion(Node) end,
- [ToNode|val({Tab, where_to_write})]),
- if AnyOld -> ignore; %% Leads to deadlock on old nodes
- true -> get_tid_ts_and_lock(Tab, write)
- end,
+ get_tid_ts_and_lock(Tab, write),
insert_schema_ops(TidTs, make_move_table(Tab, FromNode, ToNode));
do_move_table(Tab, FromNode, ToNode) ->
mnesia:abort({badarg, Tab, FromNode, ToNode}).
@@ -3438,15 +3407,14 @@ do_merge_schema(LockTabs0) ->
mnesia_lib:intersect(Ns,NeedsLock))
|| {T,Ns} <- LockTabs],
- NeedsConversion = need_old_cstructs(NeedsLock ++ LockedAlready),
{value, SchemaCs} = lists:keysearch(schema, #cstruct.name, Cstructs),
- SchemaDef = cs2list(NeedsConversion, SchemaCs),
+ SchemaDef = cs2list(false, SchemaCs),
%% Announce that Node is running
A = [{op, announce_im_running, node(), SchemaDef, Running, RemoteRunning}],
do_insert_schema_ops(Store, A),
%% Introduce remote tables to local node
- do_insert_schema_ops(Store, make_merge_schema(Node, NeedsConversion, Cstructs)),
+ do_insert_schema_ops(Store, make_merge_schema(Node, false, Cstructs)),
%% Introduce local tables to remote nodes
Tabs = val({schema, tables}),
@@ -3471,23 +3439,7 @@ do_merge_schema(LockTabs0) ->
end.
fetch_cstructs(Node) ->
- Convert = mnesia_monitor:needs_protocol_conversion(Node),
- case rpc:call(Node, mnesia_controller, get_remote_cstructs, []) of
- {cstructs, Cs0, RemoteRunning1} when Convert ->
- {cstructs, [list2cs(cs2list(Cs)) || Cs <- Cs0], RemoteRunning1};
- Result ->
- Result
- end.
-
-need_old_cstructs() ->
- need_old_cstructs(val({schema, where_to_write})).
-
-need_old_cstructs(Nodes) ->
- Filter = fun(Node) -> mnesia_monitor:needs_protocol_conversion(Node) end,
- case lists:filter(Filter, Nodes) of
- [] -> false;
- Ns -> lists:min([element(1, ?catch_val({protocol, Node})) || Node <- Ns])
- end.
+ rpc:call(Node, mnesia_controller, get_remote_cstructs, []).
tab_to_nodes(Tab) when is_atom(Tab) ->
Cs = val({Tab, cstruct}),
diff --git a/lib/mnesia/src/mnesia_tm.erl b/lib/mnesia/src/mnesia_tm.erl
index 8b79fca1d7..8a4113422a 100644
--- a/lib/mnesia/src/mnesia_tm.erl
+++ b/lib/mnesia/src/mnesia_tm.erl
@@ -26,7 +26,7 @@
init/1,
non_transaction/5,
transaction/6,
- commit_participant/5,
+ commit_participant/6,
dirty/2,
display_info/2,
do_update_op/3,
@@ -62,13 +62,14 @@
%% Format on coordinators is [{Tid, EtsTabList} .....
-record(prep, {protocol = sym_trans,
- %% async_dirty | sync_dirty | sym_trans | sync_sym_trans | asym_trans
+ %% async_dirty | sync_dirty | sym_trans | sync_sym_trans | asym_trans | sync_asym_trans
records = [],
prev_tab = [], % initiate to a non valid table name
prev_types,
prev_snmp,
types,
- majority = []
+ majority = [],
+ sync = false
}).
-record(participant, {tid, pid, commit, disc_nodes = [],
@@ -250,11 +251,13 @@ doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=
mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
Commit = new_cr_format(Commit0),
Pid =
- case Protocol of
- asym_trans when node(Tid#tid.pid) /= node() ->
- Args = [tmpid(From), Tid, Commit, DiscNs, RamNs],
+ if
+ node(Tid#tid.pid) =:= node() ->
+ error({internal_error, local_node});
+ Protocol =:= asym_trans orelse Protocol =:= sync_asym_trans ->
+ Args = [Protocol, tmpid(From), Tid, Commit, DiscNs, RamNs],
spawn_link(?MODULE, commit_participant, Args);
- _ when node(Tid#tid.pid) /= node() -> %% *_sym_trans
+ true -> %% *_sym_trans
reply(From, {vote_yes, Tid}),
nopid
end,
@@ -1190,7 +1193,15 @@ do_arrange(Tid, Store, RestoreKey, Prep, N) when RestoreKey == restore_op ->
P2 = Prep#prep{protocol = asym_trans, records = Recs2},
do_arrange(Tid, Store, ?ets_next(Store, RestoreKey), P2, N + 1);
do_arrange(_Tid, _Store, '$end_of_table', Prep, N) ->
- {N, Prep};
+ case Prep of
+ #prep{sync=true, protocol=asym_trans} ->
+ {N, Prep#prep{protocol=sync_asym_trans}};
+ _ ->
+ {N, Prep}
+ end;
+do_arrange(Tid, Store, sticky, Prep, N) ->
+ P2 = Prep#prep{sync=true},
+ do_arrange(Tid, Store, ?ets_next(Store, sticky), P2, N);
do_arrange(Tid, Store, IgnoredKey, Prep, N) -> %% locks, nodes ... local atoms...
do_arrange(Tid, Store, ?ets_next(Store, IgnoredKey), Prep, N).
@@ -1448,7 +1459,8 @@ multi_commit(sync_sym_trans, _Maj = [], Tid, CR, Store) ->
[{tid, Tid}, {outcome, Outcome}]),
Outcome;
-multi_commit(asym_trans, Majority, Tid, CR, Store) ->
+multi_commit(Protocol, Majority, Tid, CR, Store)
+ when Protocol =:= asym_trans; Protocol =:= sync_asym_trans ->
%% This more expensive commit protocol is used when
%% table definitions are changed (schema transactions).
%% It is also used when the involved tables are
@@ -1515,7 +1527,7 @@ multi_commit(asym_trans, Majority, Tid, CR, Store) ->
end,
Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
?ets_insert(Store, Pending),
- {WaitFor, Local} = ask_commit(asym_trans, Tid, CR2, DiscNs, RamNs),
+ {WaitFor, Local} = ask_commit(Protocol, Tid, CR2, DiscNs, RamNs),
SchemaPrep = ?CATCH(mnesia_schema:prepare_commit(Tid, Local, {coord, WaitFor})),
{Votes, Pids} = rec_all(WaitFor, Tid, do_commit, []),
@@ -1563,38 +1575,38 @@ multi_commit(asym_trans, Majority, Tid, CR, Store) ->
%% Returns do_commit or {do_abort, Reason}
rec_acc_pre_commit([Pid | Tail], Tid, Store, Commit, Res, DumperMode,
- GoodPids, SchemaAckPids) ->
+ GoodPids, AckPids) ->
receive
{?MODULE, _, {acc_pre_commit, Tid, Pid, true}} ->
rec_acc_pre_commit(Tail, Tid, Store, Commit, Res, DumperMode,
- [Pid | GoodPids], [Pid | SchemaAckPids]);
+ [Pid | GoodPids], [Pid | AckPids]);
{?MODULE, _, {acc_pre_commit, Tid, Pid, false}} ->
rec_acc_pre_commit(Tail, Tid, Store, Commit, Res, DumperMode,
- [Pid | GoodPids], SchemaAckPids);
+ [Pid | GoodPids], AckPids);
{?MODULE, _, {acc_pre_commit, Tid, Pid}} ->
%% Kept for backwards compatibility. Remove after Mnesia 4.x
rec_acc_pre_commit(Tail, Tid, Store, Commit, Res, DumperMode,
- [Pid | GoodPids], [Pid | SchemaAckPids]);
+ [Pid | GoodPids], [Pid | AckPids]);
{?MODULE, _, {do_abort, Tid, Pid, _Reason}} ->
AbortRes = {do_abort, {bad_commit, node(Pid)}},
rec_acc_pre_commit(Tail, Tid, Store, Commit, AbortRes, DumperMode,
- GoodPids, SchemaAckPids);
+ GoodPids, AckPids);
{mnesia_down, Node} when Node == node(Pid) ->
AbortRes = {do_abort, {bad_commit, Node}},
?SAFE(Pid ! {Tid, AbortRes}), %% Tell him that he has died
rec_acc_pre_commit(Tail, Tid, Store, Commit, AbortRes, DumperMode,
- GoodPids, SchemaAckPids)
+ GoodPids, AckPids)
end;
-rec_acc_pre_commit([], Tid, Store, {Commit,OrigC}, Res, DumperMode, GoodPids, SchemaAckPids) ->
+rec_acc_pre_commit([], Tid, Store, {Commit,OrigC}, Res, DumperMode, GoodPids, AckPids) ->
D = Commit#commit.decision,
case Res of
do_commit ->
%% Now everybody knows that the others
%% has voted yes. We also know that
%% everybody are uncertain.
- prepare_sync_schema_commit(Store, SchemaAckPids),
+ prepare_sync_schema_commit(Store, AckPids),
tell_participants(GoodPids, {Tid, committed}),
D2 = D#decision{outcome = committed},
mnesia_recover:log_decision(D2),
@@ -1606,7 +1618,7 @@ rec_acc_pre_commit([], Tid, Store, {Commit,OrigC}, Res, DumperMode, GoodPids, Sc
do_commit(Tid, Commit, DumperMode),
?eval_debug_fun({?MODULE, rec_acc_pre_commit_done_commit},
[{tid, Tid}]),
- sync_schema_commit(Tid, Store, SchemaAckPids),
+ sync_schema_commit(Tid, Store, AckPids),
mnesia_locker:release_tid(Tid),
?MODULE ! {delete_transaction, Tid};
@@ -1623,6 +1635,7 @@ rec_acc_pre_commit([], Tid, Store, {Commit,OrigC}, Res, DumperMode, GoodPids, Sc
Res.
%% Note all nodes in case of mnesia_down mgt
+%% sync_schema_commit is (ab)used for sync_asym_trans as well.
prepare_sync_schema_commit(_Store, []) ->
ok;
prepare_sync_schema_commit(Store, [Pid | Pids]) ->
@@ -1648,17 +1661,17 @@ tell_participants([Pid | Pids], Msg) ->
tell_participants([], _Msg) ->
ok.
--spec commit_participant(_, _, _, _, _) -> no_return().
+-spec commit_participant(_, _, _, _, _, _) -> no_return().
%% Trap exit because we can get a shutdown from application manager
-commit_participant(Coord, Tid, Bin, DiscNs, RamNs) when is_binary(Bin) ->
+commit_participant(Protocol, Coord, Tid, Bin, DiscNs, RamNs) when is_binary(Bin) ->
process_flag(trap_exit, true),
Commit = binary_to_term(Bin),
- commit_participant(Coord, Tid, Bin, Commit, DiscNs, RamNs);
-commit_participant(Coord, Tid, C = #commit{}, DiscNs, RamNs) ->
+ commit_participant(Protocol, Coord, Tid, Bin, Commit, DiscNs, RamNs);
+commit_participant(Protocol, Coord, Tid, C = #commit{}, DiscNs, RamNs) ->
process_flag(trap_exit, true),
- commit_participant(Coord, Tid, C, C, DiscNs, RamNs).
+ commit_participant(Protocol, Coord, Tid, C, C, DiscNs, RamNs).
-commit_participant(Coord, Tid, Bin, C0, DiscNs, _RamNs) ->
+commit_participant(Protocol, Coord, Tid, Bin, C0, DiscNs, _RamNs) ->
?eval_debug_fun({?MODULE, commit_participant, pre}, [{tid, Tid}]),
try mnesia_schema:prepare_commit(Tid, C0, {part, Coord}) of
{Modified, C = #commit{}, DumperMode} ->
@@ -1683,8 +1696,9 @@ commit_participant(Coord, Tid, Bin, C0, DiscNs, _RamNs) ->
mnesia_recover:log_decision(D#decision{outcome = unclear}),
?eval_debug_fun({?MODULE, commit_participant, pre_commit},
[{tid, Tid}]),
- Expect_schema_ack = C#commit.schema_ops /= [],
- reply(Coord, {acc_pre_commit, Tid, self(), Expect_schema_ack}),
+ ExpectAck = C#commit.schema_ops /= []
+ orelse Protocol =:= sync_asym_trans,
+ reply(Coord, {acc_pre_commit, Tid, self(), ExpectAck}),
%% Now we are vulnerable for failures, since
%% we cannot decide without asking others
@@ -1694,7 +1708,7 @@ commit_participant(Coord, Tid, Bin, C0, DiscNs, _RamNs) ->
?eval_debug_fun({?MODULE, commit_participant, log_commit},
[{tid, Tid}]),
do_commit(Tid, C, DumperMode),
- case Expect_schema_ack of
+ case ExpectAck of
false -> ignore;
true -> reply(Coord, {schema_commit, Tid, self()})
end,
@@ -1978,7 +1992,7 @@ sync_send_dirty(Tid, [Head | Tail], Tab, WaitFor) ->
Res = do_dirty(Tid, Head),
{WF, Res};
true ->
- {?MODULE, Node} ! {self(), {sync_dirty, Tid, ext_format(Head), Tab}},
+ {?MODULE, Node} ! {self(), {sync_dirty, Tid, Head, Tab}},
sync_send_dirty(Tid, Tail, Tab, [Node | WaitFor])
end;
sync_send_dirty(_Tid, [], _Tab, WaitFor) ->
@@ -1997,11 +2011,11 @@ async_send_dirty(Tid, [Head | Tail], Tab, ReadNode, WaitFor, Res) ->
NewRes = do_dirty(Tid, Head),
async_send_dirty(Tid, Tail, Tab, ReadNode, WaitFor, NewRes);
ReadNode == Node ->
- {?MODULE, Node} ! {self(), {sync_dirty, Tid, ext_format(Head), Tab}},
+ {?MODULE, Node} ! {self(), {sync_dirty, Tid, Head, Tab}},
NewRes = {'EXIT', {aborted, {node_not_running, Node}}},
async_send_dirty(Tid, Tail, Tab, ReadNode, [Node | WaitFor], NewRes);
true ->
- {?MODULE, Node} ! {self(), {async_dirty, Tid, ext_format(Head), Tab}},
+ {?MODULE, Node} ! {self(), {async_dirty, Tid, Head, Tab}},
async_send_dirty(Tid, Tail, Tab, ReadNode, WaitFor, Res)
end;
async_send_dirty(_Tid, [], _Tab, _ReadNode, WaitFor, Res) ->
@@ -2058,24 +2072,20 @@ ask_commit(Protocol, Tid, [Head | Tail], DiscNs, RamNs, WaitFor, Local) ->
Node == node() ->
ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, WaitFor, Head);
true ->
- CR = ext_format(Head),
- Msg = {ask_commit, Protocol, Tid, CR, DiscNs, RamNs},
+ Msg = {ask_commit, convert_old(Protocol, Node), Tid, Head, DiscNs, RamNs},
{?MODULE, Node} ! {self(), Msg},
ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, [Node | WaitFor], Local)
end;
ask_commit(_Protocol, _Tid, [], _DiscNs, _RamNs, WaitFor, Local) ->
{WaitFor, Local}.
-ext_format(#commit{ext=[]}=CR) -> CR;
-ext_format(#commit{node=Node, ext=Ext}=CR) ->
+convert_old(sync_asym_trans, Node) ->
case mnesia_monitor:needs_protocol_conversion(Node) of
- true ->
- case lists:keyfind(snmp, 1, Ext) of
- false -> CR#commit{ext=[]};
- {snmp, List} -> CR#commit{ext=List}
- end;
- false -> CR
- end.
+ true -> asym_trans;
+ false -> sync_asym_trans
+ end;
+convert_old(Protocol, _) ->
+ Protocol.
new_cr_format(#commit{ext=[]}=Cr) -> Cr;
new_cr_format(#commit{ext=[{_,_}|_]}=Cr) -> Cr;
@@ -2304,7 +2314,7 @@ reconfigure_participants(_, []) ->
%% tell mnesia_tm on all involved nodes (including the local node)
%% about the outcome.
tell_outcome(Tid, Protocol, Node, CheckNodes, TellNodes) ->
- Outcome = mnesia_recover:what_happened(Tid, Protocol, CheckNodes),
+ Outcome = mnesia_recover:what_happened(Tid, proto(Protocol), CheckNodes),
case Outcome of
aborted ->
rpc:abcast(TellNodes, ?MODULE, {Tid,{do_abort, {mnesia_down, Node}}});
@@ -2313,6 +2323,9 @@ tell_outcome(Tid, Protocol, Node, CheckNodes, TellNodes) ->
end,
Outcome.
+proto(sync_asym_trans) -> asym_trans;
+proto(Proto) -> Proto.
+
do_stop(#state{coordinators = Coordinators}) ->
Msg = {mnesia_down, node()},
lists:foreach(fun({Tid, _}) -> Tid#tid.pid ! Msg end, gb_trees:to_list(Coordinators)),