diff options
Diffstat (limited to 'lib/mnesia/src/mnesia_locker.erl')
-rw-r--r-- | lib/mnesia/src/mnesia_locker.erl | 376 |
1 files changed, 195 insertions, 181 deletions
diff --git a/lib/mnesia/src/mnesia_locker.erl b/lib/mnesia/src/mnesia_locker.erl index 0492d794f3..a22c95d454 100644 --- a/lib/mnesia/src/mnesia_locker.erl +++ b/lib/mnesia/src/mnesia_locker.erl @@ -1,19 +1,19 @@ %% %% %CopyrightBegin% -%% -%% Copyright Ericsson AB 1996-2011. All Rights Reserved. -%% +%% +%% Copyright Ericsson AB 1996-2012. All Rights Reserved. +%% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. -%% +%% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. -%% +%% %% %CopyrightEnd% %% @@ -66,14 +66,14 @@ -record(queue, {oid, tid, op, pid, lucky}). -%% mnesia_held_locks: contain {Oid, Op, Tid} entries (bag) +%% mnesia_held_locks: contain {Oid, MaxLock, [{Op, Tid}]} entries -define(match_oid_held_locks(Oid), {Oid, '_', '_'}). -%% mnesia_tid_locks: contain {Tid, Oid, Op} entries (bag) +%% mnesia_tid_locks: contain {Tid, Oid, Op} entries (bag) -define(match_oid_tid_locks(Tid), {Tid, '_', '_'}). %% mnesia_sticky_locks: contain {Oid, Node} entries and {Tab, Node} entries (set) -define(match_oid_sticky_locks(Oid),{Oid, '_'}). %% mnesia_lock_queue: contain {queue, Oid, Tid, Op, ReplyTo, WaitForTid} entries (bag) --define(match_oid_lock_queue(Oid), #queue{oid=Oid, tid='_', op = '_', pid = '_', lucky = '_'}). +-define(match_oid_lock_queue(Oid), #queue{oid=Oid, tid='_', op = '_', pid = '_', lucky = '_'}). %% mnesia_lock_counter: {{write, Tab}, Number} && %% {{read, Tab}, Number} entries (set) @@ -83,11 +83,11 @@ start() -> init(Parent) -> register(?MODULE, self()), process_flag(trap_exit, true), - ?ets_new_table(mnesia_held_locks, [bag, private, named_table]), + ?ets_new_table(mnesia_held_locks, [ordered_set, private, named_table]), ?ets_new_table(mnesia_tid_locks, [bag, private, named_table]), ?ets_new_table(mnesia_sticky_locks, [set, private, named_table]), ?ets_new_table(mnesia_lock_queue, [bag, private, named_table, {keypos, 2}]), - + proc_lib:init_ack(Parent, {ok, self()}), case ?catch_val(pid_sort_order) of r9b_plain -> put(pid_sort_order, r9b_plain); @@ -98,8 +98,8 @@ init(Parent) -> val(Var) -> case ?catch_val(Var) of - {'EXIT', _ReASoN_} -> mnesia_lib:other_val(Var, _ReASoN_); - _VaLuE_ -> _VaLuE_ + {'EXIT', _ReASoN_} -> mnesia_lib:other_val(Var, _ReASoN_); + _VaLuE_ -> _VaLuE_ end. reply(From, R) -> @@ -111,10 +111,10 @@ l_request(Node, X, Store) -> l_req_rec(Node, Store) -> ?ets_insert(Store, {nodes, Node}), - receive - {?MODULE, Node, Reply} -> + receive + {?MODULE, Node, Reply} -> Reply; - {mnesia_down, Node} -> + {mnesia_down, Node} -> {not_granted, {node_not_running, Node}} end. @@ -128,10 +128,10 @@ send_release_tid(Nodes, Tid) -> rpc:abcast(Nodes, ?MODULE, {self(), {sync_release_tid, Tid}}). receive_release_tid_acc([Node | Nodes], Tid) -> - receive - {?MODULE, Node, {tid_released, Tid}} -> + receive + {?MODULE, Node, {tid_released, Tid}} -> receive_release_tid_acc(Nodes, Tid); - {mnesia_down, Node} -> + {mnesia_down, Node} -> receive_release_tid_acc(Nodes, Tid) end; receive_release_tid_acc([], _Tid) -> @@ -152,27 +152,27 @@ loop(State) -> %% Really do a read, but get hold of a write lock %% used by mnesia:wread(Oid). - + {From, {read_write, Tid, Oid}} -> try_sticky_lock(Tid, read_write, From, Oid), loop(State); - + %% Tid has somehow terminated, clear up everything %% and pass locks on to queued processes. %% This is the purpose of the mnesia_tid_locks table - + {release_tid, Tid} -> do_release_tid(Tid), loop(State); - + %% stick lock, first tries this to the where_to_read Node {From, {test_set_sticky, Tid, {Tab, _} = Oid, Lock}} -> case ?ets_lookup(mnesia_sticky_locks, Tab) of - [] -> + [] -> reply(From, not_stuck), loop(State); [{_,Node}] when Node == node() -> - %% Lock is stuck here, see now if we can just set + %% Lock is stuck here, see now if we can just set %% a regular write lock try_lock(Tid, Lock, From, Oid), loop(State); @@ -188,7 +188,7 @@ loop(State) -> ?ets_insert(mnesia_sticky_locks, {Tab, N}), loop(State); - %% The caller which sends this message, must have first + %% The caller which sends this message, must have first %% aquired a write lock on the entire table {unstick, Tab} -> ?ets_delete(mnesia_sticky_locks, Tab), @@ -205,14 +205,14 @@ loop(State) -> [{_,N}] -> Req = {From, {ix_read, Tid, Tab, IxKey, Pos}}, From ! {?MODULE, node(), {switch, N, Req}}, - loop(State) + loop(State) end; {From, {sync_release_tid, Tid}} -> do_release_tid(Tid), reply(From, {tid_released, Tid}), loop(State); - + {release_remote_non_pending, Node, Pending} -> release_remote_non_pending(Node, Pending), mnesia_monitor:mnesia_down(?MODULE, Node), @@ -229,16 +229,23 @@ loop(State) -> {get_table, From, LockTable} -> From ! {LockTable, ?ets_match_object(LockTable, '_')}, loop(State); - + Msg -> error("~p got unexpected message: ~p~n", [?MODULE, Msg]), loop(State) end. -set_lock(Tid, Oid, Op) -> - ?dbg("Granted ~p ~p ~p~n", [Tid,Oid,Op]), - ?ets_insert(mnesia_held_locks, {Oid, Op, Tid}), - ?ets_insert(mnesia_tid_locks, {Tid, Oid, Op}). +set_lock(Tid, Oid, Op, []) -> + ?ets_insert(mnesia_tid_locks, {Tid, Oid, Op}), + ?ets_insert(mnesia_held_locks, {Oid, Op, [{Op, Tid}]}); +set_lock(Tid, Oid, read, [{Oid, Prev, Items}]) -> + ?ets_insert(mnesia_tid_locks, {Tid, Oid, read}), + ?ets_insert(mnesia_held_locks, {Oid, Prev, [{read, Tid}|Items]}); +set_lock(Tid, Oid, write, [{Oid, _Prev, Items}]) -> + ?ets_insert(mnesia_tid_locks, {Tid, Oid, write}), + ?ets_insert(mnesia_held_locks, {Oid, write, [{write, Tid}|Items]}); +set_lock(Tid, Oid, Op, undefined) -> + set_lock(Tid, Oid, Op, ?ets_lookup(mnesia_held_locks, Oid)). %%%%%%%%%%%%%%%%%%%%%%%%%%% %% Acquire locks @@ -261,32 +268,32 @@ try_lock(Tid, Op, Pid, Oid) -> try_lock(Tid, Op, SimpleOp, Lock, Pid, Oid) -> case can_lock(Tid, Lock, Oid, {no, bad_luck}) of - yes -> - Reply = grant_lock(Tid, SimpleOp, Lock, Oid), + {yes, Default} -> + Reply = grant_lock(Tid, SimpleOp, Lock, Oid, Default), reply(Pid, Reply); - {no, Lucky} -> + {{no, Lucky},_} -> C = #cyclic{op = SimpleOp, lock = Lock, oid = Oid, lucky = Lucky}, ?dbg("Rejected ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]), reply(Pid, {not_granted, C}); - {queue, Lucky} -> + {{queue, Lucky},_} -> ?dbg("Queued ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]), %% Append to queue: Nice place for trace output - ?ets_insert(mnesia_lock_queue, - #queue{oid = Oid, tid = Tid, op = Op, + ?ets_insert(mnesia_lock_queue, + #queue{oid = Oid, tid = Tid, op = Op, pid = Pid, lucky = Lucky}), ?ets_insert(mnesia_tid_locks, {Tid, Oid, {queued, Op}}) end. -grant_lock(Tid, read, Lock, Oid = {Tab, Key}) +grant_lock(Tid, read, Lock, Oid = {Tab, Key}, Default) when Key /= ?ALL, Tab /= ?GLOBAL -> case node(Tid#tid.pid) == node() of true -> - set_lock(Tid, Oid, Lock), + set_lock(Tid, Oid, Lock, Default), {granted, lookup_in_client}; false -> try Val = mnesia_lib:db_get(Tab, Key), %% lookup as well - set_lock(Tid, Oid, Lock), + set_lock(Tid, Oid, Lock, Default), {granted, Val} catch _:_Reason -> %% Table has been deleted from this node, @@ -296,87 +303,71 @@ grant_lock(Tid, read, Lock, Oid = {Tab, Key}) {not_granted, C} end end; -grant_lock(Tid, {ix_read,IxKey,Pos}, Lock, Oid = {Tab, _}) -> +grant_lock(Tid, {ix_read,IxKey,Pos}, Lock, Oid = {Tab, _}, Default) -> try Res = ix_read_res(Tab, IxKey,Pos), - set_lock(Tid, Oid, Lock), + set_lock(Tid, Oid, Lock, Default), {granted, Res, [?ALL]} catch _:_ -> {not_granted, {no_exists, Tab, {index, [Pos]}}} end; -grant_lock(Tid, read, Lock, Oid) -> - set_lock(Tid, Oid, Lock), +grant_lock(Tid, read, Lock, Oid, Default) -> + set_lock(Tid, Oid, Lock, Default), {granted, ok}; -grant_lock(Tid, write, Lock, Oid) -> - set_lock(Tid, Oid, Lock), +grant_lock(Tid, write, Lock, Oid, Default) -> + set_lock(Tid, Oid, Lock, Default), granted. %% 1) Impose an ordering on all transactions favour old (low tid) transactions %% newer (higher tid) transactions may never wait on older ones, %% 2) When releasing the tids from the queue always begin with youngest (high tid) %% because of 1) it will avoid the deadlocks. -%% 3) TabLocks is the problem :-) They should not starve and not deadlock +%% 3) TabLocks is the problem :-) They should not starve and not deadlock %% handle tablocks in queue as they had locks on unlocked records. -can_lock(Tid, read, {Tab, Key}, AlreadyQ) when Key /= ?ALL -> - %% The key is bound, no need for the other BIF - Oid = {Tab, Key}, - ObjLocks = ?ets_match_object(mnesia_held_locks, {Oid, write, '_'}), - TabLocks = ?ets_match_object(mnesia_held_locks, {{Tab, ?ALL}, write, '_'}), - check_lock(Tid, Oid, ObjLocks, TabLocks, yes, AlreadyQ, read); +can_lock(Tid, read, Oid = {Tab, Key}, AlreadyQ) when Key /= ?ALL -> + ObjLocks = ?ets_lookup(mnesia_held_locks, Oid), + TabLocks = ?ets_lookup(mnesia_held_locks, {Tab, ?ALL}), + {check_lock(Tid, Oid, + filter_write(ObjLocks), + filter_write(TabLocks), + yes, AlreadyQ, read), + ObjLocks}; can_lock(Tid, read, Oid, AlreadyQ) -> % Whole tab Tab = element(1, Oid), ObjLocks = ?ets_match_object(mnesia_held_locks, {{Tab, '_'}, write, '_'}), - check_lock(Tid, Oid, ObjLocks, [], yes, AlreadyQ, read); + {check_lock(Tid, Oid, ObjLocks, [], yes, AlreadyQ, read), undefined}; -can_lock(Tid, write, {Tab, Key}, AlreadyQ) when Key /= ?ALL -> - Oid = {Tab, Key}, +can_lock(Tid, write, Oid = {Tab, Key}, AlreadyQ) when Key /= ?ALL -> ObjLocks = ?ets_lookup(mnesia_held_locks, Oid), TabLocks = ?ets_lookup(mnesia_held_locks, {Tab, ?ALL}), - check_lock(Tid, Oid, ObjLocks, TabLocks, yes, AlreadyQ, write); + {check_lock(Tid, Oid, ObjLocks, TabLocks, yes, AlreadyQ, write), ObjLocks}; can_lock(Tid, write, Oid, AlreadyQ) -> % Whole tab Tab = element(1, Oid), ObjLocks = ?ets_match_object(mnesia_held_locks, ?match_oid_held_locks({Tab, '_'})), - check_lock(Tid, Oid, ObjLocks, [], yes, AlreadyQ, write). + {check_lock(Tid, Oid, ObjLocks, [], yes, AlreadyQ, write), undefined}. + +filter_write([{_, read, _}]) -> []; +filter_write(Res) -> Res. %% Check held locks for conflicting locks -check_lock(Tid, Oid, [Lock | Locks], TabLocks, X, AlreadyQ, Type) -> - case element(3, Lock) of - Tid -> - check_lock(Tid, Oid, Locks, TabLocks, X, AlreadyQ, Type); - WaitForTid -> - Queue = allowed_to_be_queued(WaitForTid,Tid), - if Queue == true -> - check_lock(Tid, Oid, Locks, TabLocks, {queue, WaitForTid}, AlreadyQ, Type); - Tid#tid.pid == WaitForTid#tid.pid -> - dbg_out("Spurious lock conflict ~w ~w: ~w -> ~w~n", - [Oid, Lock, Tid, WaitForTid]), - %% Test.. - {Tab, _Key} = Oid, - HaveQ = (ets:lookup(mnesia_lock_queue, Oid) /= []) - orelse (ets:lookup(mnesia_lock_queue,{Tab,?ALL}) /= []), - if - HaveQ -> - {no, WaitForTid}; - true -> - check_lock(Tid,Oid,Locks,TabLocks,{queue,WaitForTid},AlreadyQ,Type) - end; - %%{no, WaitForTid}; Safe solution - true -> - {no, WaitForTid} - end +check_lock(Tid, Oid, [{_, _, Lock} | Locks], TabLocks, _X, AlreadyQ, Type) -> + case can_queue(Lock, Tid, Oid, _X) of + {no, _} = Res -> + Res; + Res -> + check_lock(Tid, Oid, Locks, TabLocks, Res, AlreadyQ, Type) end; check_lock(_, _, [], [], X, {queue, bad_luck}, _) -> X; %% The queue should be correct already no need to check it again check_lock(_, _, [], [], X = {queue, _Tid}, _AlreadyQ, _) -> - X; + X; -check_lock(Tid, Oid, [], [], X, AlreadyQ, Type) -> - {Tab, Key} = Oid, +check_lock(Tid, Oid = {Tab, Key}, [], [], X, AlreadyQ, Type) -> if Type == write -> check_queue(Tid, Tab, X, AlreadyQ); @@ -387,7 +378,7 @@ check_lock(Tid, Oid, [], [], X, AlreadyQ, Type) -> %% If there is a queue on that object, read_lock shouldn't be granted ObjLocks = ets:lookup(mnesia_lock_queue, Oid), case max(ObjLocks) of - empty -> + empty -> check_queue(Tid, Tab, X, AlreadyQ); ObjL -> case allowed_to_be_queued(ObjL,Tid) of @@ -403,16 +394,36 @@ check_lock(Tid, Oid, [], [], X, AlreadyQ, Type) -> check_lock(Tid, Oid, [], TabLocks, X, AlreadyQ, Type) -> check_lock(Tid, Oid, TabLocks, [], X, AlreadyQ, Type). +can_queue([{_Op, Tid}|Locks], Tid, Oid, Res) -> + can_queue(Locks, Tid, Oid, Res); +can_queue([{Op, WaitForTid}|Locks], Tid, Oid = {Tab, _}, _) -> + case allowed_to_be_queued(WaitForTid,Tid) of + true when Tid#tid.pid == WaitForTid#tid.pid -> + dbg_out("Spurious lock conflict ~w ~w: ~w -> ~w~n", + [Oid, Op, Tid, WaitForTid]), + HaveQ = (ets:lookup(mnesia_lock_queue, Oid) /= []) + orelse (ets:lookup(mnesia_lock_queue,{Tab,?ALL}) /= []), + case HaveQ of + true -> {no, WaitForTid}; + false -> can_queue(Locks, Tid, Oid, {queue, WaitForTid}) + end; + true -> + can_queue(Locks, Tid, Oid, {queue, WaitForTid}); + false -> + {no, WaitForTid} + end; +can_queue([], _, _, Res) -> Res. + %% True if WaitForTid > Tid -> % Important order allowed_to_be_queued(WaitForTid, Tid) -> case get(pid_sort_order) of undefined -> WaitForTid > Tid; - r9b_plain -> + r9b_plain -> cmp_tid(true, WaitForTid, Tid) =:= 1; - standard -> + standard -> cmp_tid(false, WaitForTid, Tid) =:= 1 - end. - + end. + %% Check queue for conflicting locks %% Assume that all queued locks belongs to other tid's @@ -421,25 +432,25 @@ check_queue(Tid, Tab, X, AlreadyQ) -> Greatest = max(TabLocks), case Greatest of empty -> X; - Tid -> X; - WaitForTid -> + Tid -> X; + WaitForTid -> case allowed_to_be_queued(WaitForTid,Tid) of true -> {queue, WaitForTid}; - false when AlreadyQ =:= {no, bad_luck} -> + false when AlreadyQ =:= {no, bad_luck} -> {no, WaitForTid} end end. sort_queue(QL) -> case get(pid_sort_order) of - undefined -> + undefined -> lists:reverse(lists:keysort(#queue.tid, QL)); - r9b_plain -> - lists:sort(fun(#queue{tid=X},#queue{tid=Y}) -> + r9b_plain -> + lists:sort(fun(#queue{tid=X},#queue{tid=Y}) -> cmp_tid(true, X, Y) == 1 end, QL); - standard -> + standard -> lists:sort(fun(#queue{tid=X},#queue{tid=Y}) -> cmp_tid(false, X, Y) == 1 end, QL) @@ -456,22 +467,22 @@ set_read_lock_on_all_keys(Tid, From, Tab, IxKey, Pos) -> Op = {ix_read,IxKey, Pos}, Lock = read, case can_lock(Tid, Lock, Oid, {no, bad_luck}) of - yes -> - Reply = grant_lock(Tid, Op, Lock, Oid), + {yes, Default} -> + Reply = grant_lock(Tid, Op, Lock, Oid, Default), reply(From, Reply); - {no, Lucky} -> + {{no, Lucky},_} -> C = #cyclic{op = Op, lock = Lock, oid = Oid, lucky = Lucky}, ?dbg("Rejected ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]), reply(From, {not_granted, C}); - {queue, Lucky} -> + {{queue, Lucky},_} -> ?dbg("Queued ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]), %% Append to queue: Nice place for trace output - ?ets_insert(mnesia_lock_queue, - #queue{oid = Oid, tid = Tid, op = Op, + ?ets_insert(mnesia_lock_queue, + #queue{oid = Oid, tid = Tid, op = Op, pid = From, lucky = Lucky}), ?ets_insert(mnesia_tid_locks, {Tid, Oid, {queued, Op}}) end. - + %%%%%%%%%%%%%%%%%%%%%%%%%%% %% Release of locks @@ -520,30 +531,40 @@ release_locks([]) -> release_lock({Tid, Oid, {queued, _}}) -> ?ets_match_delete(mnesia_lock_queue, #queue{oid=Oid, tid = Tid, op = '_', pid = '_', lucky = '_'}); -release_lock({Tid, Oid, Op}) -> - if - Op == write -> - ?ets_delete(mnesia_held_locks, Oid); - Op == read -> - ets:delete_object(mnesia_held_locks, {Oid, Op, Tid}) +release_lock({_Tid, Oid, write}) -> + ?ets_delete(mnesia_held_locks, Oid); +release_lock({Tid, Oid, read}) -> + case ?ets_lookup(mnesia_held_locks, Oid) of + [{Oid, Prev, Locks0}] -> + case remove_tid(Locks0, Tid, []) of + [] -> ?ets_delete(mnesia_held_locks, Oid); + Locks -> ?ets_insert(mnesia_held_locks, {Oid, Prev, Locks}) + end; + [] -> ok end. +remove_tid([{_Op, Tid}|Ls], Tid, Acc) -> + remove_tid(Ls,Tid, Acc); +remove_tid([Keep|Ls], Tid, Acc) -> + remove_tid(Ls,Tid, [Keep|Acc]); +remove_tid([], _, Acc) -> Acc. + rearrange_queue([{_Tid, {Tab, Key}, _} | Locks]) -> if - Key /= ?ALL-> - Queue = - ets:lookup(mnesia_lock_queue, {Tab, ?ALL}) ++ + Key /= ?ALL-> + Queue = + ets:lookup(mnesia_lock_queue, {Tab, ?ALL}) ++ ets:lookup(mnesia_lock_queue, {Tab, Key}), - case Queue of - [] -> + case Queue of + [] -> ok; _ -> Sorted = sort_queue(Queue), try_waiters_obj(Sorted) - end; - true -> + end; + true -> Pat = ?match_oid_lock_queue({Tab, '_'}), - Queue = ?ets_match_object(mnesia_lock_queue, Pat), + Queue = ?ets_match_object(mnesia_lock_queue, Pat), Sorted = sort_queue(Queue), try_waiters_tab(Sorted) end, @@ -556,7 +577,7 @@ try_waiters_obj([W | Waiters]) -> case try_waiter(W) of queued -> no; - _ -> + _ -> try_waiters_obj(Waiters) end; try_waiters_obj([]) -> @@ -573,10 +594,10 @@ try_waiters_tab([W | Waiters]) -> end; Oid -> case try_waiter(W) of - queued -> + queued -> Rest = key_delete_all(Oid, #queue.oid, Waiters), try_waiters_tab(Rest); - _ -> + _ -> try_waiters_tab(Waiters) end end; @@ -592,22 +613,22 @@ try_waiter({queue, Oid, Tid, Op, ReplyTo, _}) -> try_waiter(Oid, Op, SimpleOp, Lock, ReplyTo, Tid) -> case can_lock(Tid, Lock, Oid, {queue, bad_luck}) of - yes -> + {yes, Default} -> %% Delete from queue: Nice place for trace output - ?ets_match_delete(mnesia_lock_queue, + ?ets_match_delete(mnesia_lock_queue, #queue{oid=Oid, tid = Tid, op = Op, pid = ReplyTo, lucky = '_'}), - Reply = grant_lock(Tid, SimpleOp, Lock, Oid), + Reply = grant_lock(Tid, SimpleOp, Lock, Oid, Default), reply(ReplyTo,Reply), locked; - {queue, _Why} -> + {{queue, _Why}, _} -> ?dbg("Keep ~p ~p ~p ~p~n", [Tid, Oid, Lock, _Why]), - queued; % Keep waiter in queue - {no, Lucky} -> + queued; % Keep waiter in queue + {{no, Lucky}, _} -> C = #cyclic{op = SimpleOp, lock = Lock, oid = Oid, lucky = Lucky}, verbose("** WARNING ** Restarted transaction, possible deadlock in lock queue ~w: cyclic = ~w~n", [Tid, C]), - ?ets_match_delete(mnesia_lock_queue, + ?ets_match_delete(mnesia_lock_queue, #queue{oid=Oid, tid = Tid, op = Op, pid = ReplyTo, lucky = '_'}), Reply = {not_granted, C}, @@ -645,7 +666,7 @@ mnesia_down(N, Pending) -> Pid ! {release_remote_non_pending, N, Pending} end. -%% Aquire a write lock, but do a read, used by +%% Aquire a write lock, but do a read, used by %% mnesia:wread/1 rwlock(Tid, Store, Oid) -> @@ -657,9 +678,10 @@ rwlock(Tid, Store, Oid) -> Lock = write, case need_lock(Store, Tab, Key, Lock) of yes -> - {Ns, Majority} = w_nodes(Tab), + {Ns0, Majority} = w_nodes(Tab), + Ns = [Node|lists:delete(Node,Ns0)], check_majority(Majority, Tab, Ns), - Res = get_rwlocks_on_nodes(Ns, rwlock, Node, Store, Tid, Oid), + Res = get_rwlocks_on_nodes(Ns, make_ref(), Store, Tid, Oid), ?ets_insert(Store, {{locks, Tab, Key}, Lock}), Res; no -> @@ -718,7 +740,7 @@ sticky_rwlock(Tid, Store, Oid) -> sticky_lock(Tid, Store, Oid, read_write). sticky_lock(Tid, Store, {Tab, Key} = Oid, Lock) -> - N = val({Tab, where_to_read}), + N = val({Tab, where_to_read}), if node() == N -> case need_lock(Store, Tab, Key, write) of @@ -805,9 +827,9 @@ sticky_wlock_table(Tid, Store, Tab) -> %% aquire a wlock on Oid %% We store a {Tabname, write, Tid} in all locktables %% on all nodes containing a copy of Tabname -%% We also store an item {{locks, Tab, Key}, write} in the +%% We also store an item {{locks, Tab, Key}, write} in the %% local store when we have aquired the lock. -%% +%% wlock(Tid, Store, Oid) -> wlock(Tid, Store, Oid, _CheckMajority = true). @@ -845,10 +867,10 @@ wlock_no_exist(Tid, Store, Tab, Ns) -> need_lock(Store, Tab, Key, LockPattern) -> TabL = ?ets_match_object(Store, {{locks, Tab, ?ALL}, LockPattern}), - if + if TabL == [] -> KeyL = ?ets_match_object(Store, {{locks, Tab, Key}, LockPattern}), - if + if KeyL == [] -> yes; true -> @@ -865,7 +887,7 @@ del_debug() -> erase(mnesia_wlock_nodes). %% We first send lock request to the local node if it is part of the lockers -%% then the first sorted node then to the rest of the lockmanagers on all +%% then the first sorted node then to the rest of the lockmanagers on all %% nodes holding a copy of the table get_wlocks_on_nodes([Node | Tail], Orig, Store, Request, Oid) -> @@ -875,51 +897,31 @@ get_wlocks_on_nodes([Node | Tail], Orig, Store, Request, Oid) -> case node() of Node -> %% Local done try one more get_wlocks_on_nodes(Tail, Orig, Store, Request, Oid); - _ -> %% The first succeded cont with the rest + _ -> %% The first succeded cont with the rest get_wlocks_on_nodes(Tail, Store, Request), receive_wlocks(Tail, Orig, Store, Oid) end; -get_wlocks_on_nodes([], Orig, _Store, _Request, _Oid) -> +get_wlocks_on_nodes([], Orig, _Store, _Request, _Oid) -> Orig. get_wlocks_on_nodes([Node | Tail], Store, Request) -> {?MODULE, Node} ! Request, ?ets_insert(Store,{nodes, Node}), get_wlocks_on_nodes(Tail, Store, Request); -get_wlocks_on_nodes([], _, _) -> +get_wlocks_on_nodes([], _, _) -> ok. -get_rwlocks_on_nodes([ReadNode|Tail], _Res, ReadNode, Store, Tid, Oid) -> +get_rwlocks_on_nodes([ReadNode|Tail], Ref, Store, Tid, Oid) -> Op = {self(), {read_write, Tid, Oid}}, {?MODULE, ReadNode} ! Op, ?ets_insert(Store, {nodes, ReadNode}), - Res = receive_wlocks([ReadNode], undefined, Store, Oid), - case node() of - ReadNode -> - get_rwlocks_on_nodes(Tail, Res, ReadNode, Store, Tid, Oid); - _ -> - get_wlocks_on_nodes(Tail, Store, {self(), {write, Tid, Oid}}), - receive_wlocks(Tail, Res, Store, Oid) + case receive_wlocks([ReadNode], Ref, Store, Oid) of + Ref -> + get_rwlocks_on_nodes(Tail, Ref, Store, Tid, Oid); + Res -> + get_wlocks_on_nodes(Tail, Res, Store, {self(), {write, Tid, Oid}}, Oid) end; -get_rwlocks_on_nodes([Node | Tail], Res, ReadNode, Store, Tid, Oid) -> - Op = {self(), {write, Tid, Oid}}, - {?MODULE, Node} ! Op, - ?ets_insert(Store, {nodes, Node}), - receive_wlocks([Node], undefined, Store, Oid), - if node() == Node -> - get_rwlocks_on_nodes(Tail, Res, ReadNode, Store, Tid, Oid); - Res == rwlock -> %% Hmm - Rest = lists:delete(ReadNode, Tail), - Op2 = {self(), {read_write, Tid, Oid}}, - {?MODULE, ReadNode} ! Op2, - ?ets_insert(Store, {nodes, ReadNode}), - get_wlocks_on_nodes(Rest, Store, {self(), {write, Tid, Oid}}), - receive_wlocks([ReadNode|Rest], undefined, Store, Oid); - true -> - get_wlocks_on_nodes(Tail, Store, {self(), {write, Tid, Oid}}), - receive_wlocks(Tail, Res, Store, Oid) - end; -get_rwlocks_on_nodes([],Res,_,_,_,_) -> +get_rwlocks_on_nodes([],Res,_,_,_) -> Res. receive_wlocks([], Res, _Store, _Oid) -> @@ -944,8 +946,8 @@ receive_wlocks(Nodes = [This|Ns], Res, Store, Oid) -> Tail = lists:delete(Node,Nodes), Nonstuck = lists:delete(Sticky,Tail), [?ets_insert(Store, {nodes, NSNode}) || NSNode <- Nonstuck], - case lists:member(Sticky,Tail) of - true -> + case lists:member(Sticky,Tail) of + true -> sticky_flush(Nonstuck,Store), receive_wlocks([Sticky], Res, Store, Oid); false -> @@ -957,7 +959,7 @@ receive_wlocks(Nodes = [This|Ns], Res, Store, Oid) -> flush_remaining(Ns, This, Reason1) end. -sticky_flush([], _) -> +sticky_flush([], _) -> del_debug(), ok; sticky_flush(Ns=[Node | Tail], Store) -> @@ -991,7 +993,7 @@ opt_lookup_in_client(lookup_in_client, Oid, Lock) -> %% Table has been deleted from this node, %% restart the transaction. #cyclic{op = read, lock = Lock, oid = Oid, lucky = nowhere}; - Val -> + Val -> Val end; opt_lookup_in_client(Val, _Oid, _Lock) -> @@ -1000,8 +1002,8 @@ opt_lookup_in_client(Val, _Oid, _Lock) -> return_granted_or_nodes({_, ?ALL} , Nodes) -> Nodes; return_granted_or_nodes({?GLOBAL, _}, Nodes) -> Nodes; return_granted_or_nodes(_ , _Nodes) -> granted. - -%% We store a {Tab, read, From} item in the + +%% We store a {Tab, read, From} item in the %% locks table on the node where we actually do pick up the object %% and we also store an item {lock, Oid, read} in our local store %% so that we can release any locks we hold when we commit. @@ -1059,9 +1061,9 @@ rlock_get_reply(Node, Store, Oid, {granted, V}) -> ?ets_insert(Store, {{locks, Tab, Key}, read}), ?ets_insert(Store, {nodes, Node}), case opt_lookup_in_client(V, Oid, read) of - C = #cyclic{} -> + C = #cyclic{} -> mnesia:abort(C); - Val -> + Val -> Val end; rlock_get_reply(Node, Store, Oid, granted) -> @@ -1079,7 +1081,7 @@ rlock_get_reply(Node, Store, Tab, {granted, V, RealKeys}) -> rlock_get_reply(_Node, _Store, _Oid, {not_granted, Reason}) -> exit({aborted, Reason}); -rlock_get_reply(_Node, Store, Oid, {switch, N2, Req}) -> +rlock_get_reply(_Node, Store, Oid, {switch, N2, Req}) -> ?ets_insert(Store, {nodes, N2}), {?MODULE, N2} ! Req, rlock_get_reply(N2, Store, Oid, l_req_rec(N2, Store)). @@ -1095,7 +1097,7 @@ ixrlock(Tid, Store, Tab, IxKey, Pos) -> %%% Old code %% R = l_request(Node, {ix_read, Tid, Tab, IxKey, Pos}, Store), %% rlock_get_reply(Node, Store, Tab, R) - + case need_lock(Store, Tab, ?ALL, read) of no when Node =:= node() -> ix_read_res(Tab,IxKey,Pos); @@ -1135,11 +1137,23 @@ rec_requests([], _Oid, _Store) -> get_held_locks() -> ?MODULE ! {get_table, self(), mnesia_held_locks}, - receive {mnesia_held_locks, Locks} -> Locks end. + Locks = receive {mnesia_held_locks, Ls} -> Ls after 5000 -> [] end, + rewrite_locks(Locks, []). + +rewrite_locks([{Oid, _, Ls}|Locks], Acc0) -> + Acc = rewrite_locks(Ls, Oid, Acc0), + rewrite_locks(Locks, Acc); +rewrite_locks([], Acc) -> + lists:reverse(Acc). + +rewrite_locks([{Op, Tid}|Ls], Oid, Acc) -> + rewrite_locks(Ls, Oid, [{Oid, Op, Tid}|Acc]); +rewrite_locks([], _, Acc) -> + Acc. get_lock_queue() -> ?MODULE ! {get_table, self(), mnesia_lock_queue}, - Q = receive {mnesia_lock_queue, Locks} -> Locks end, + Q = receive {mnesia_lock_queue, Locks} -> Locks after 5000 -> [] end, [{Oid, Op, Pid, Tid, WFT} || {queue, Oid, Tid, Op, Pid, WFT} <- Q]. do_stop() -> |