diff options
-rw-r--r-- | lib/mnesia/src/mnesia_controller.erl | 18 | ||||
-rw-r--r-- | lib/mnesia/src/mnesia_loader.erl | 36 | ||||
-rw-r--r-- | lib/mnesia/src/mnesia_locker.erl | 20 | ||||
-rw-r--r-- | lib/mnesia/src/mnesia_monitor.erl | 8 | ||||
-rw-r--r-- | lib/mnesia/src/mnesia_schema.erl | 104 |
5 files changed, 75 insertions, 111 deletions
diff --git a/lib/mnesia/src/mnesia_controller.erl b/lib/mnesia/src/mnesia_controller.erl index 02c175760e..69ccc1d2c9 100644 --- a/lib/mnesia/src/mnesia_controller.erl +++ b/lib/mnesia/src/mnesia_controller.erl @@ -78,7 +78,7 @@ change_table_majority/1, del_active_replica/2, wait_for_tables/2, - get_network_copy/2, + get_network_copy/3, merge_schema/0, start_remote_sender/4, schedule_late_disc_load/2 @@ -329,14 +329,14 @@ release_schema_commit_lock() -> unlink(whereis(?SERVER_NAME)). %% Special for preparation of add table copy -get_network_copy(Tab, Cs) -> +get_network_copy(Tid, Tab, Cs) -> % We can't let the controller queue this one % because that may cause a deadlock between schema_operations % and initial tableloadings which both takes schema locks. % But we have to get copier_done msgs when the other side % goes down. call({add_other, self()}), - Reason = {dumper,add_table_copy}, + Reason = {dumper,{add_table_copy, Tid}}, Work = #net_load{table = Tab,reason = Reason,cstruct = Cs}, %% I'll need this cause it's linked trough the subscriber %% might be solved by using monitor in subscr instead. @@ -775,7 +775,7 @@ handle_call({net_load, Tab, Cs}, From, State) -> true -> Worker = #net_load{table = Tab, opt_reply_to = From, - reason = {dumper,add_table_copy}, + reason = {dumper,{add_table_copy, unknown}}, cstruct = Cs }, add_worker(Worker, State); @@ -1180,11 +1180,11 @@ handle_info(Done = #loader_done{worker_pid=WPid, table_name=Tab}, State0) -> Done#loader_done.needs_announce == true, Done#loader_done.needs_reply == true -> i_have_tab(Tab), - %% Should be {dumper,add_table_copy} only + %% Should be {dumper,{add_table_copy, _}} only reply(Done#loader_done.reply_to, Done#loader_done.reply); Done#loader_done.needs_reply == true -> - %% Should be {dumper,add_table_copy} only + %% Should be {dumper,{add_table_copy,_}} only reply(Done#loader_done.reply_to, Done#loader_done.reply); Done#loader_done.needs_announce == true, Tab == schema -> @@ -2148,6 +2148,10 @@ load_table_fun(#net_load{cstruct=Cs, table=Tab, reason=Reason, opt_reply_to=Repl reply_to = ReplyTo, reply = {loaded, ok} }, + AddTableCopy = case Reason of + {dumper,{add_table_copy,_}} -> true; + _ -> false + end, if ReadNode == node() -> %% Already loaded locally @@ -2157,7 +2161,7 @@ load_table_fun(#net_load{cstruct=Cs, table=Tab, reason=Reason, opt_reply_to=Repl Res = mnesia_loader:disc_load_table(Tab, load_local_content), Done#loader_done{reply = Res, needs_announce = true, needs_sync = true} end; - AccessMode == read_only, Reason /= {dumper,add_table_copy} -> + AccessMode == read_only, not AddTableCopy -> fun() -> disc_load_table(Tab, Reason, ReplyTo) end; true -> fun() -> diff --git a/lib/mnesia/src/mnesia_loader.erl b/lib/mnesia/src/mnesia_loader.erl index da8549be50..41fcd76fcb 100644 --- a/lib/mnesia/src/mnesia_loader.erl +++ b/lib/mnesia/src/mnesia_loader.erl @@ -180,8 +180,7 @@ do_get_disc_copy2(Tab, Reason, Storage, Type) when Storage == disc_only_copies - -define(MAX_RAM_TRANSFERS, (?MAX_RAM_FILE_SIZE div ?MAX_TRANSFER_SIZE) + 1). -define(MAX_NOPACKETS, 20). -net_load_table(Tab, Reason, Ns, Cs) - when Reason == {dumper,add_table_copy} -> +net_load_table(Tab, {dumper,{add_table_copy, _}}=Reason, Ns, Cs) -> try_net_load_table(Tab, Reason, Ns, Cs); net_load_table(Tab, Reason, Ns, _Cs) -> try_net_load_table(Tab, Reason, Ns, val({Tab, cstruct})). @@ -233,7 +232,8 @@ do_snmpify(Tab, Us, Storage) -> set({Tab, {index, snmp}}, Snmp). %% Start the recieiver -init_receiver(Node, Tab, Storage, Cs, Reas={dumper,add_table_copy}) -> +init_receiver(Node, Tab, Storage, Cs, Reas={dumper,{add_table_copy, Tid}}) -> + rpc:call(Node, mnesia_lib, set, [{?MODULE, active_trans}, Tid]), case start_remote_sender(Node, Tab, Storage) of {SenderPid, TabSize, DetsData} -> start_receiver(Tab,Storage,Cs,SenderPid,TabSize,DetsData,Reas); @@ -307,7 +307,7 @@ table_init_fun(SenderPid) -> end. %% Add_table_copy get's it's own locks. -start_receiver(Tab,Storage,Cs,SenderPid,TabSize,DetsData,{dumper,add_table_copy}) -> +start_receiver(Tab,Storage,Cs,SenderPid,TabSize,DetsData,{dumper,{add_table_copy,_}}) -> Init = table_init_fun(SenderPid), case do_init_table(Tab,Storage,Cs,SenderPid,TabSize,DetsData,self(), Init) of Err = {error, _} -> @@ -658,9 +658,10 @@ send_table(Pid, Tab, RemoteS) -> {Init, Chunk} = reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer), SendIt = fun() -> - {atomic, ok} = prepare_copy(Pid, Tab, Storage), + NeedLock = need_lock(Tab), + {atomic, ok} = prepare_copy(Pid, Tab, Storage, NeedLock), send_more(Pid, 1, Chunk, Init(), Tab), - finish_copy(Pid, Tab, Storage, RemoteS) + finish_copy(Pid, Tab, Storage, RemoteS, NeedLock) end, try SendIt() of @@ -678,10 +679,10 @@ send_table(Pid, Tab, RemoteS) -> end end. -prepare_copy(Pid, Tab, Storage) -> +prepare_copy(Pid, Tab, Storage, NeedLock) -> Trans = fun() -> - mnesia:lock_table(Tab, load), + NeedLock andalso mnesia:lock_table(Tab, load), mnesia_subscr:subscribe(Pid, {table, Tab}), update_where_to_write(Tab, node(Pid)), mnesia_lib:db_fixtable(Storage, Tab, true), @@ -689,6 +690,21 @@ prepare_copy(Pid, Tab, Storage) -> end, mnesia:transaction(Trans). + +need_lock(Tab) -> + case ?catch_val({?MODULE, active_trans}) of + #tid{} = Tid -> + %% move_table_copy grabs it's own table-lock + %% do not deadlock with it + mnesia_lib:unset({?MODULE, active_trans}), + case mnesia_locker:get_held_locks(Tab) of + [{write, Tid}|_] -> false; + _Locks -> true + end; + _ -> + true + end. + update_where_to_write(Tab, Node) -> case val({Tab, access_mode}) of read_only -> @@ -783,12 +799,12 @@ send_packet(N, Pid, Chunk, {Recs, Cont}) when N < ?MAX_NOPACKETS -> send_packet(_N, _Pid, _Chunk, DataState) -> DataState. -finish_copy(Pid, Tab, Storage, RemoteS) -> +finish_copy(Pid, Tab, Storage, RemoteS, NeedLock) -> RecNode = node(Pid), DatBin = dat2bin(Tab, Storage, RemoteS), Trans = fun() -> - mnesia:read_lock_table(Tab), + NeedLock andalso mnesia:read_lock_table(Tab), A = val({Tab, access_mode}), mnesia_controller:sync_and_block_table_whereabouts(Tab, RecNode, RemoteS, A), cleanup_tab_copier(Pid, Storage, Tab), diff --git a/lib/mnesia/src/mnesia_locker.erl b/lib/mnesia/src/mnesia_locker.erl index 89feeba2c3..5766f22e92 100644 --- a/lib/mnesia/src/mnesia_locker.erl +++ b/lib/mnesia/src/mnesia_locker.erl @@ -22,7 +22,7 @@ -module(mnesia_locker). -export([ - get_held_locks/0, + get_held_locks/0, get_held_locks/1, get_lock_queue/0, global_lock/5, ixrlock/5, @@ -236,6 +236,11 @@ loop(State) -> From ! {Ref, ok}, loop(State); + {From, {is_locked, Oid}} -> + Held = ?ets_lookup(mnesia_held_locks, Oid), + reply(From, Held), + loop(State); + {'EXIT', Pid, _} when Pid == State#state.supervisor -> do_stop(); @@ -1151,6 +1156,19 @@ get_held_locks() -> Locks = receive {mnesia_held_locks, Ls} -> Ls after 5000 -> [] end, rewrite_locks(Locks, []). +%% Mnesia internal usage only +get_held_locks(Tab) when is_atom(Tab) -> + Oid = {Tab, ?ALL}, + ?MODULE ! {self(), {is_locked, Oid}}, + receive + {?MODULE, _Node, Locks} -> + case Locks of + [] -> []; + [{Oid, _Prev, What}] -> What + end + end. + + rewrite_locks([{Oid, _, Ls}|Locks], Acc0) -> Acc = rewrite_locks(Ls, Oid, Acc0), rewrite_locks(Locks, Acc); diff --git a/lib/mnesia/src/mnesia_monitor.erl b/lib/mnesia/src/mnesia_monitor.erl index 8313c3bda5..081d746257 100644 --- a/lib/mnesia/src/mnesia_monitor.erl +++ b/lib/mnesia/src/mnesia_monitor.erl @@ -82,9 +82,9 @@ going_down = [], tm_started = false, early_connects = [], connecting, mq = [], remote_node_status = []}). --define(current_protocol_version, {8,1}). +-define(current_protocol_version, {8,2}). --define(previous_protocol_version, {8,0}). +-define(previous_protocol_version, {8,1}). start() -> gen_server:start_link({local, ?MODULE}, ?MODULE, @@ -193,7 +193,7 @@ protocol_version() -> %% A sorted list of acceptable protocols the %% preferred protocols are first in the list acceptable_protocol_versions() -> - [protocol_version(), ?previous_protocol_version, {7,6}]. + [protocol_version(), ?previous_protocol_version]. needs_protocol_conversion(Node) -> case {?catch_val({protocol, Node}), protocol_version()} of @@ -424,8 +424,6 @@ handle_call({negotiate_protocol, Mon, Version, Protocols}, From, State) case hd(Protocols) of ?previous_protocol_version -> accept_protocol(Mon, MyVersion, ?previous_protocol_version, From, State); - {7,6} -> - accept_protocol(Mon, MyVersion, {7,6}, From, State); _ -> verbose("Connection with ~p rejected. " "version = ~p, protocols = ~p, " diff --git a/lib/mnesia/src/mnesia_schema.erl b/lib/mnesia/src/mnesia_schema.erl index 2acddccf6e..1bd8703e1d 100644 --- a/lib/mnesia/src/mnesia_schema.erl +++ b/lib/mnesia/src/mnesia_schema.erl @@ -646,54 +646,18 @@ cs2list(Cs) when is_record(Cs, cstruct) -> rec2list(Tags, Tags, 2, Cs); cs2list(CreateList) when is_list(CreateList) -> CreateList; -%% 4.6 + +%% since 4.6 cs2list(Cs) when element(1, Cs) == cstruct, tuple_size(Cs) == 19 -> Tags = [name,type,ram_copies,disc_copies,disc_only_copies, load_order,access_mode,majority,index,snmp,local_content, record_name,attributes, user_properties,frag_properties,storage_properties, cookie,version], - rec2list(Tags, Tags, 2, Cs); -%% 4.4.19 -cs2list(Cs) when element(1, Cs) == cstruct, tuple_size(Cs) == 18 -> - Tags = [name,type,ram_copies,disc_copies,disc_only_copies, - load_order,access_mode,majority,index,snmp,local_content, - record_name,attributes,user_properties,frag_properties, - cookie,version], - rec2list(Tags, Tags, 2, Cs); -%% 4.4.18 and earlier -cs2list(Cs) when element(1, Cs) == cstruct, tuple_size(Cs) == 17 -> - Tags = [name,type,ram_copies,disc_copies,disc_only_copies, - load_order,access_mode,index,snmp,local_content, - record_name,attributes,user_properties,frag_properties, - cookie,version], rec2list(Tags, Tags, 2, Cs). cs2list(false, Cs) -> - cs2list(Cs); -cs2list(ver4_4_18, Cs) -> %% Or earlier - Orig = record_info(fields, cstruct), - Tags = [name,type,ram_copies,disc_copies,disc_only_copies, - load_order,access_mode,index,snmp,local_content, - record_name,attributes,user_properties,frag_properties, - cookie,version], - rec2list(Tags, Orig, 2, Cs); -cs2list(ver4_4_19, Cs) -> - Orig = record_info(fields, cstruct), - Tags = [name,type,ram_copies,disc_copies,disc_only_copies, - load_order,access_mode,majority,index,snmp,local_content, - record_name,attributes,user_properties,frag_properties, - cookie,version], - rec2list(Tags, Orig, 2, Cs); -cs2list(ver4_6, Cs) -> - Orig = record_info(fields, cstruct), - Tags = [name,type,ram_copies,disc_copies,disc_only_copies, - load_order,access_mode,majority,index,snmp,local_content, - record_name,attributes, - user_properties,frag_properties,storage_properties, - cookie,version], - rec2list(Tags, Orig, 2, Cs). - + cs2list(Cs). rec2list([Tag | Tags], [Tag | Orig], Pos, Rec) -> Val = element(Pos, Rec), @@ -703,19 +667,8 @@ rec2list([], _, _Pos, _Rec) -> rec2list(Tags, [_|Orig], Pos, Rec) -> rec2list(Tags, Orig, Pos+1, Rec). -normalize_cs(Cstructs, Node) -> - %% backward-compatibility hack; normalize before returning - case need_old_cstructs([Node]) of - false -> - Cstructs; - Version -> - %% some other format - [convert_cs(Version, Cs) || Cs <- Cstructs] - end. - -convert_cs(Version, Cs) -> - Fields = [Value || {_, Value} <- cs2list(Version, Cs)], - list_to_tuple([cstruct|Fields]). +normalize_cs(Cstructs, _Node) -> + Cstructs. list2cs(List) when is_list(List) -> Name = pick(unknown, name, List, must), @@ -1357,7 +1310,11 @@ do_move_table(schema, _FromNode, _ToNode) -> mnesia:abort({bad_type, schema}); do_move_table(Tab, FromNode, ToNode) when is_atom(FromNode), is_atom(ToNode) -> TidTs = get_tid_ts_and_lock(schema, write), - %% get_tid_ts_and_lock(Tab, write), write locked by load_table in mnesia_loader + AnyOld = lists:any(fun(Node) -> mnesia_monitor:needs_protocol_conversion(Node) end, + [ToNode|val({Tab, where_to_write})]), + if AnyOld -> ignore; %% Leads to deadlock on old nodes + true -> get_tid_ts_and_lock(Tab, write) + end, insert_schema_ops(TidTs, make_move_table(Tab, FromNode, ToNode)); do_move_table(Tab, FromNode, ToNode) -> mnesia:abort({badarg, Tab, FromNode, ToNode}). @@ -1965,7 +1922,7 @@ prepare_op(Tid, {op, add_table_copy, Storage, Node, TabDef}, _WaitFor) -> end, %% Tables are created by mnesia_loader get_network code insert_cstruct(Tid, Cs, true), - case mnesia_controller:get_network_copy(Tab, Cs) of + case mnesia_controller:get_network_copy(Tid, Tab, Cs) of {loaded, ok} -> {true, optional}; {not_loaded, ErrReason} -> @@ -2373,7 +2330,7 @@ undo_prepare_op(_Tid, {op, del_table_copy, _, Node, TabDef}) -> case mnesia_lib:val({Tab, where_to_read}) of nowhere -> mnesia_lib:set_remote_where_to_read(Tab); - true -> + _ -> ignore end end; @@ -2837,40 +2794,11 @@ do_merge_schema(LockTabs0) -> end. fetch_cstructs(Node) -> - case need_old_cstructs([Node]) of - false -> - rpc:call(Node, mnesia_controller, get_remote_cstructs, []); - _Ver -> - case rpc:call(Node, mnesia_controller, get_cstructs, []) of - {cstructs, Cs0, RR} -> - {cstructs, [list2cs(cs2list(Cs)) || Cs <- Cs0], RR}; - Err -> Err - end - end. + rpc:call(Node, mnesia_controller, get_remote_cstructs, []). -need_old_cstructs() -> - need_old_cstructs(val({schema, where_to_write})). - -need_old_cstructs(Nodes) -> - Filter = fun(Node) -> not mnesia_monitor:needs_protocol_conversion(Node) end, - case lists:dropwhile(Filter, Nodes) of - [] -> false; - [Node|_] -> - case rpc:call(Node, mnesia_lib, val, [{schema,cstruct}]) of - #cstruct{} -> - %% mnesia_lib:warning("Mnesia on ~p do not need to convert cstruct (~p)~n", - %% [node(), Node]), - false; - {badrpc, _} -> - need_old_cstructs(lists:delete(Node,Nodes)); - Cs when element(1, Cs) == cstruct, tuple_size(Cs) == 17 -> - ver4_4_18; % Without majority - Cs when element(1, Cs) == cstruct, tuple_size(Cs) == 18 -> - ver4_4_19; % With majority - Cs when element(1, Cs) == cstruct, tuple_size(Cs) == 19 -> - ver4_6 % With storage_properties - end - end. +need_old_cstructs() -> false. + +need_old_cstructs(_Nodes) -> false. tab_to_nodes(Tab) when is_atom(Tab) -> Cs = val({Tab, cstruct}), |