%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 1996-2018. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %% %% %CopyrightEnd% %% %% -module(mnesia_locker). -export([ get_held_locks/0, get_held_locks/1, get_lock_queue/0, global_lock/5, ixrlock/5, init/1, release_tid/1, mnesia_down/2, async_release_tid/2, send_release_tid/2, receive_release_tid_acc/2, rlock/3, rlock_table/3, rwlock/3, sticky_rwlock/3, start/0, sticky_wlock/3, sticky_wlock_table/3, wlock/3, wlock_no_exist/4, wlock_table/3, load_lock_table/3 ]). %% sys callback functions -export([system_continue/3, system_terminate/4, system_code_change/4 ]). -compile({no_auto_import,[error/2]}). -include("mnesia.hrl"). -import(mnesia_lib, [dbg_out/2, error/2, verbose/2]). -define(dbg(S,V), ok). %-define(dbg(S,V), dbg_out("~p:~p: " ++ S, [?MODULE, ?LINE] ++ V)). -define(ALL, '______WHOLETABLE_____'). -define(STICK, '______STICK_____'). -define(GLOBAL, '______GLOBAL_____'). -record(state, {supervisor}). -record(queue, {oid, tid, op, pid, lucky}). %% mnesia_held_locks: contain {Oid, MaxLock, [{Op, Tid}]} entries -define(match_oid_held_locks(Oid), {Oid, '_', '_'}). %% mnesia_tid_locks: contain {Tid, Oid, Op} entries (bag) -define(match_oid_tid_locks(Tid), {Tid, '_', '_'}). %% mnesia_sticky_locks: contain {Oid, Node} entries and {Tab, Node} entries (set) -define(match_oid_sticky_locks(Oid),{Oid, '_'}). %% mnesia_lock_queue: contain {queue, Oid, Tid, Op, ReplyTo, WaitForTid} entries (bag) -define(match_oid_lock_queue(Oid), #queue{oid=Oid, tid='_', op = '_', pid = '_', lucky = '_'}). %% mnesia_lock_counter: {{write, Tab}, Number} && %% {{read, Tab}, Number} entries (set) start() -> mnesia_monitor:start_proc(?MODULE, ?MODULE, init, [self()]). init(Parent) -> register(?MODULE, self()), process_flag(trap_exit, true), ?ets_new_table(mnesia_held_locks, [ordered_set, private, named_table]), ?ets_new_table(mnesia_tid_locks, [ordered_set, private, named_table]), ?ets_new_table(mnesia_sticky_locks, [set, private, named_table]), ?ets_new_table(mnesia_lock_queue, [bag, private, named_table, {keypos, 2}]), proc_lib:init_ack(Parent, {ok, self()}), case ?catch_val(pid_sort_order) of r9b_plain -> put(pid_sort_order, r9b_plain); standard -> put(pid_sort_order, standard); _ -> ignore end, loop(#state{supervisor = Parent}). %% Local function in order to avoid external function call val(Var) -> case ?catch_val_and_stack(Var) of {'EXIT', Stacktrace} -> mnesia_lib:other_val(Var, Stacktrace); Value -> Value end. reply(From, R) -> From ! {?MODULE, node(), R}, true. %% Quiets dialyzer l_request(Node, X, Store) -> {?MODULE, Node} ! {self(), X}, l_req_rec(Node, Store). l_req_rec(Node, Store) -> ?ets_insert(Store, {nodes, Node}), receive {?MODULE, Node, Reply} -> Reply; {mnesia_down, Node} -> {not_granted, {node_not_running, Node}} end. release_tid(Tid) -> ?MODULE ! {release_tid, Tid}. async_release_tid(Nodes, Tid) -> rpc:abcast(Nodes, ?MODULE, {release_tid, Tid}). send_release_tid(Nodes, Tid) -> rpc:abcast(Nodes, ?MODULE, {self(), {sync_release_tid, Tid}}). receive_release_tid_acc([Node | Nodes], Tid) -> receive {?MODULE, Node, {tid_released, Tid}} -> receive_release_tid_acc(Nodes, Tid) after 0 -> receive {?MODULE, Node, {tid_released, Tid}} -> receive_release_tid_acc(Nodes, Tid); {mnesia_down, Node} -> receive_release_tid_acc(Nodes, Tid) end end; receive_release_tid_acc([], _Tid) -> ok. mnesia_down(Node, Pending) -> case whereis(?MODULE) of undefined -> {error, node_not_running}; Pid -> Ref = make_ref(), Pid ! {{self(), Ref}, {release_remote_non_pending, Node, Pending}}, receive %% No need to wait for anything else if process dies we die soon {Ref,ok} -> ok end end. loop(State) -> receive {From, {write, Tid, Oid}} -> try_sticky_lock(Tid, write, From, Oid), loop(State); %% If Key == ?ALL it's a request to lock the entire table %% {From, {read, Tid, Oid}} -> try_sticky_lock(Tid, read, From, Oid), loop(State); %% Really do a read, but get hold of a write lock %% used by mnesia:wread(Oid). {From, {read_write, Tid, Oid}} -> try_sticky_lock(Tid, read_write, From, Oid), loop(State); %% Tid has somehow terminated, clear up everything %% and pass locks on to queued processes. %% This is the purpose of the mnesia_tid_locks table {release_tid, Tid} -> do_release_tid(Tid), loop(State); %% stick lock, first tries this to the where_to_read Node {From, {test_set_sticky, Tid, {Tab, _} = Oid, Lock}} -> case ?ets_lookup(mnesia_sticky_locks, Tab) of [] -> reply(From, not_stuck), loop(State); [{_,Node}] when Node == node() -> %% Lock is stuck here, see now if we can just set %% a regular write lock try_lock(Tid, Lock, From, Oid), loop(State); [{_,Node}] -> reply(From, {stuck_elsewhere, Node}), loop(State) end; %% If test_set_sticky fails, we send this to all nodes %% after aquiring a real write lock on Oid {stick, {Tab, _}, N} -> ?ets_insert(mnesia_sticky_locks, {Tab, N}), loop(State); %% The caller which sends this message, must have first %% aquired a write lock on the entire table {unstick, Tab} -> ?ets_delete(mnesia_sticky_locks, Tab), loop(State); {From, {ix_read, Tid, Tab, IxKey, Pos}} -> case ?ets_lookup(mnesia_sticky_locks, Tab) of [] -> set_read_lock_on_all_keys(Tid,From,Tab,IxKey,Pos), loop(State); [{_,N}] when N == node() -> set_read_lock_on_all_keys(Tid,From,Tab,IxKey,Pos), loop(State); [{_,N}] -> Req = {From, {ix_read, Tid, Tab, IxKey, Pos}}, From ! {?MODULE, node(), {switch, N, Req}}, loop(State) end; {From, {sync_release_tid, Tid}} -> do_release_tid(Tid), reply(From, {tid_released, Tid}), loop(State); {{From, Ref},{release_remote_non_pending, Node, Pending}} -> release_remote_non_pending(Node, Pending), From ! {Ref, ok}, loop(State); {From, {is_locked, Oid}} -> Held = ?ets_lookup(mnesia_held_locks, Oid), reply(From, Held), loop(State); {'EXIT', Pid, _} when Pid == State#state.supervisor -> do_stop(); {system, From, Msg} -> verbose("~p got {system, ~p, ~tp}~n", [?MODULE, From, Msg]), Parent = State#state.supervisor, sys:handle_system_msg(Msg, From, Parent, ?MODULE, [], State); {get_table, From, LockTable} -> From ! {LockTable, ?ets_match_object(LockTable, '_')}, loop(State); Msg -> error("~p got unexpected message: ~tp~n", [?MODULE, Msg]), loop(State) end. set_lock(Tid, Oid, Op, []) -> ?ets_insert(mnesia_tid_locks, {{Tid, Oid, Op}}), ?ets_insert(mnesia_held_locks, {Oid, Op, [{Op, Tid}]}); set_lock(Tid, Oid, read, [{Oid, Prev, Items}]) -> ?ets_insert(mnesia_tid_locks, {{Tid, Oid, read}}), ?ets_insert(mnesia_held_locks, {Oid, Prev, [{read, Tid}|Items]}); set_lock(Tid, Oid, write, [{Oid, _Prev, Items}]) -> ?ets_insert(mnesia_tid_locks, {{Tid, Oid, write}}), ?ets_insert(mnesia_held_locks, {Oid, write, [{write, Tid}|Items]}); set_lock(Tid, Oid, Op, undefined) -> set_lock(Tid, Oid, Op, ?ets_lookup(mnesia_held_locks, Oid)). %%%%%%%%%%%%%%%%%%%%%%%%%%% %% Acquire locks try_sticky_lock(Tid, Op, Pid, {Tab, _} = Oid) -> case ?ets_lookup(mnesia_sticky_locks, Tab) of [] -> try_lock(Tid, Op, Pid, Oid); [{_,N}] when N == node() -> try_lock(Tid, Op, Pid, Oid); [{_,N}] -> Req = {Pid, {Op, Tid, Oid}}, Pid ! {?MODULE, node(), {switch, N, Req}}, true end. try_lock(Tid, read_write, Pid, Oid) -> try_lock(Tid, read_write, read, write, Pid, Oid); try_lock(Tid, Op, Pid, Oid) -> try_lock(Tid, Op, Op, Op, Pid, Oid). try_lock(Tid, Op, SimpleOp, Lock, Pid, Oid) -> case can_lock(Tid, Lock, Oid, {no, bad_luck}) of {yes, Default} -> Reply = grant_lock(Tid, SimpleOp, Lock, Oid, Default), reply(Pid, Reply); {{no, Lucky},_} -> C = #cyclic{op = SimpleOp, lock = Lock, oid = Oid, lucky = Lucky}, ?dbg("Rejected ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]), reply(Pid, {not_granted, C}); {{queue, Lucky},_} -> ?dbg("Queued ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]), %% Append to queue: Nice place for trace output ?ets_insert(mnesia_lock_queue, #queue{oid = Oid, tid = Tid, op = Op, pid = Pid, lucky = Lucky}), ?ets_insert(mnesia_tid_locks, {{Tid, Oid, {queued, Op}}}) end. grant_lock(Tid, read, Lock, Oid = {Tab, Key}, Default) when Key /= ?ALL, Tab /= ?GLOBAL -> case node(Tid#tid.pid) == node() of true -> set_lock(Tid, Oid, Lock, Default), {granted, lookup_in_client}; false -> try Val = mnesia_lib:db_get(Tab, Key), %% lookup as well set_lock(Tid, Oid, Lock, Default), {granted, Val} catch _:_Reason -> %% Table has been deleted from this node, %% restart the transaction. C = #cyclic{op = read, lock = Lock, oid = Oid, lucky = nowhere}, {not_granted, C} end end; grant_lock(Tid, {ix_read,IxKey,Pos}, Lock, Oid = {Tab, _}, Default) -> try Res = ix_read_res(Tab, IxKey,Pos), set_lock(Tid, Oid, Lock, Default), {granted, Res, [?ALL]} catch _:_ -> {not_granted, {no_exists, Tab, {index, [Pos]}}} end; grant_lock(Tid, read, Lock, Oid, Default) -> set_lock(Tid, Oid, Lock, Default), {granted, ok}; grant_lock(Tid, write, Lock, Oid, Default) -> set_lock(Tid, Oid, Lock, Default), granted. %% 1) Impose an ordering on all transactions favour old (low tid) transactions %% newer (higher tid) transactions may never wait on older ones, %% 2) When releasing the tids from the queue always begin with youngest (high tid) %% because of 1) it will avoid the deadlocks. %% 3) TabLocks is the problem :-) They should not starve and not deadlock %% handle tablocks in queue as they had locks on unlocked records. can_lock(Tid, read, Oid = {Tab, Key}, AlreadyQ) when Key /= ?ALL -> ObjLocks = ?ets_lookup(mnesia_held_locks, Oid), TabLocks = ?ets_lookup(mnesia_held_locks, {Tab, ?ALL}), {check_lock(Tid, Oid, filter_write(ObjLocks), filter_write(TabLocks), yes, AlreadyQ, read), ObjLocks}; can_lock(Tid, read, Oid, AlreadyQ) -> % Whole tab Tab = element(1, Oid), ObjLocks = ?ets_match_object(mnesia_held_locks, {{Tab, '_'}, write, '_'}), {check_lock(Tid, Oid, ObjLocks, [], yes, AlreadyQ, read), undefined}; can_lock(Tid, write, Oid = {Tab, Key}, AlreadyQ) when Key /= ?ALL -> ObjLocks = ?ets_lookup(mnesia_held_locks, Oid), TabLocks = ?ets_lookup(mnesia_held_locks, {Tab, ?ALL}), {check_lock(Tid, Oid, ObjLocks, TabLocks, yes, AlreadyQ, write), ObjLocks}; can_lock(Tid, write, Oid, AlreadyQ) -> % Whole tab Tab = element(1, Oid), ObjLocks = ?ets_match_object(mnesia_held_locks, ?match_oid_held_locks({Tab, '_'})), {check_lock(Tid, Oid, ObjLocks, [], yes, AlreadyQ, write), undefined}. filter_write([{_, read, _}]) -> []; filter_write(Res) -> Res. %% Check held locks for conflicting locks check_lock(Tid, Oid, [{_, _, Lock} | Locks], TabLocks, _X, AlreadyQ, Type) -> case can_queue(Lock, Tid, Oid, _X) of {no, _} = Res -> Res; Res -> check_lock(Tid, Oid, Locks, TabLocks, Res, AlreadyQ, Type) end; check_lock(_, _, [], [], X, {queue, bad_luck}, _) -> X; %% The queue should be correct already no need to check it again check_lock(_, _, [], [], X = {queue, _Tid}, _AlreadyQ, _) -> X; check_lock(Tid, Oid = {Tab, Key}, [], [], X, AlreadyQ, Type) -> if Type == write -> check_queue(Tid, Tab, X, AlreadyQ); Key == ?ALL -> %% hmm should be solvable by a clever select expr but not today... check_queue(Tid, Tab, X, AlreadyQ); true -> %% If there is a queue on that object, read_lock shouldn't be granted ObjLocks = ets:lookup(mnesia_lock_queue, Oid), case max(ObjLocks) of empty -> check_queue(Tid, Tab, X, AlreadyQ); ObjL -> case allowed_to_be_queued(ObjL,Tid) of false -> %% Starvation Preemption (write waits for read) {no, ObjL}; true -> check_queue(Tid, Tab, {queue, ObjL}, AlreadyQ) end end end; check_lock(Tid, Oid, [], TabLocks, X, AlreadyQ, Type) -> check_lock(Tid, Oid, TabLocks, [], X, AlreadyQ, Type). can_queue([{_Op, Tid}|Locks], Tid, Oid, Res) -> can_queue(Locks, Tid, Oid, Res); can_queue([{Op, WaitForTid}|Locks], Tid, Oid = {Tab, _}, _) -> case allowed_to_be_queued(WaitForTid,Tid) of true when Tid#tid.pid == WaitForTid#tid.pid -> dbg_out("Spurious lock conflict ~w ~w: ~w -> ~w~n", [Oid, Op, Tid, WaitForTid]), HaveQ = (ets:lookup(mnesia_lock_queue, Oid) /= []) orelse (ets:lookup(mnesia_lock_queue,{Tab,?ALL}) /= []), case HaveQ of true -> {no, WaitForTid}; false -> can_queue(Locks, Tid, Oid, {queue, WaitForTid}) end; true -> can_queue(Locks, Tid, Oid, {queue, WaitForTid}); false -> {no, WaitForTid} end; can_queue([], _, _, Res) -> Res. %% True if WaitForTid > Tid -> % Important order allowed_to_be_queued(WaitForTid, Tid) -> case get(pid_sort_order) of undefined -> WaitForTid > Tid; r9b_plain -> cmp_tid(true, WaitForTid, Tid) =:= 1; standard -> cmp_tid(false, WaitForTid, Tid) =:= 1 end. %% Check queue for conflicting locks %% Assume that all queued locks belongs to other tid's check_queue(Tid, Tab, X, AlreadyQ) -> TabLocks = ets:lookup(mnesia_lock_queue, {Tab,?ALL}), Greatest = max(TabLocks), case Greatest of empty -> X; Tid -> X; WaitForTid -> case allowed_to_be_queued(WaitForTid,Tid) of true -> {queue, WaitForTid}; false when AlreadyQ =:= {no, bad_luck} -> {no, WaitForTid} end end. sort_queue(QL) -> case get(pid_sort_order) of undefined -> lists:reverse(lists:keysort(#queue.tid, QL)); r9b_plain -> lists:sort(fun(#queue{tid=X},#queue{tid=Y}) -> cmp_tid(true, X, Y) == 1 end, QL); standard -> lists:sort(fun(#queue{tid=X},#queue{tid=Y}) -> cmp_tid(false, X, Y) == 1 end, QL) end. max([]) -> empty; max([#queue{tid=Max}]) -> Max; max(L) -> [#queue{tid=Max}|_] = sort_queue(L), Max. set_read_lock_on_all_keys(Tid, From, Tab, IxKey, Pos) -> Oid = {Tab,?ALL}, Op = {ix_read,IxKey, Pos}, Lock = read, case can_lock(Tid, Lock, Oid, {no, bad_luck}) of {yes, Default} -> Reply = grant_lock(Tid, Op, Lock, Oid, Default), reply(From, Reply); {{no, Lucky},_} -> C = #cyclic{op = Op, lock = Lock, oid = Oid, lucky = Lucky}, ?dbg("Rejected ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]), reply(From, {not_granted, C}); {{queue, Lucky},_} -> ?dbg("Queued ~p ~p ~p ~p ~n", [Tid, Oid, Lock, Lucky]), %% Append to queue: Nice place for trace output ?ets_insert(mnesia_lock_queue, #queue{oid = Oid, tid = Tid, op = Op, pid = From, lucky = Lucky}), ?ets_insert(mnesia_tid_locks, {{Tid, Oid, {queued, Op}}}) end. %%%%%%%%%%%%%%%%%%%%%%%%%%% %% Release of locks %% Release remote non-pending nodes release_remote_non_pending(Node, Pending) -> %% Clear the mnesia_sticky_locks table first, to avoid %% unnecessary requests to the failing node ?ets_match_delete(mnesia_sticky_locks, {'_' , Node}), %% Then we have to release all locks held by processes %% running at the failed node and also simply remove all %% queue'd requests back to the failed node AllTids0 = ?ets_match(mnesia_tid_locks, {{'$1', '_', '_'}}), AllTids = lists:usort(AllTids0), Tids = [T || [T] <- AllTids, Node == node(T#tid.pid), not lists:member(T, Pending)], do_release_tids(Tids). do_release_tids([Tid | Tids]) -> do_release_tid(Tid), do_release_tids(Tids); do_release_tids([]) -> ok. do_release_tid(Tid) -> Objects = ets:select(mnesia_tid_locks, [{{{Tid, '_', '_'}}, [], ['$_']}]), Locks = lists:map(fun({L}) -> L end, Objects), ?dbg("Release ~p ~p ~n", [Tid, Locks]), [?ets_delete(mnesia_tid_locks, L) || L <- Locks], release_locks(Locks), %% Removed queued locks which has had locks UniqueLocks = keyunique(lists:sort(Locks),[]), rearrange_queue(UniqueLocks). keyunique([{_Tid, Oid, _Op}|R], Acc = [{_, Oid, _}|_]) -> keyunique(R, Acc); keyunique([H|R], Acc) -> keyunique(R, [H|Acc]); keyunique([], Acc) -> Acc. release_locks([Lock | Locks]) -> release_lock(Lock), release_locks(Locks); release_locks([]) -> ok. release_lock({Tid, Oid, {queued, _}}) -> ?ets_match_delete(mnesia_lock_queue, #queue{oid=Oid, tid = Tid, op = '_', pid = '_', lucky = '_'}); release_lock({_Tid, Oid, write}) -> ?ets_delete(mnesia_held_locks, Oid); release_lock({Tid, Oid, read}) -> case ?ets_lookup(mnesia_held_locks, Oid) of [{Oid, Prev, Locks0}] -> case remove_tid(Locks0, Tid, []) of [] -> ?ets_delete(mnesia_held_locks, Oid); Locks -> ?ets_insert(mnesia_held_locks, {Oid, Prev, Locks}) end; [] -> ok end. remove_tid([{_Op, Tid}|Ls], Tid, Acc) -> remove_tid(Ls,Tid, Acc); remove_tid([Keep|Ls], Tid, Acc) -> remove_tid(Ls,Tid, [Keep|Acc]); remove_tid([], _, Acc) -> Acc. rearrange_queue([{_Tid, {Tab, Key}, _} | Locks]) -> if Key /= ?ALL-> Queue = ets:lookup(mnesia_lock_queue, {Tab, ?ALL}) ++ ets:lookup(mnesia_lock_queue, {Tab, Key}), case Queue of [] -> ok; _ -> Sorted = sort_queue(Queue), try_waiters_obj(Sorted) end; true -> Pat = ?match_oid_lock_queue({Tab, '_'}), Queue = ?ets_match_object(mnesia_lock_queue, Pat), Sorted = sort_queue(Queue), try_waiters_tab(Sorted) end, ?dbg("RearrQ ~p~n", [Queue]), rearrange_queue(Locks); rearrange_queue([]) -> ok. try_waiters_obj([W | Waiters]) -> case try_waiter(W) of queued -> no; _ -> try_waiters_obj(Waiters) end; try_waiters_obj([]) -> ok. try_waiters_tab([W | Waiters]) -> case W#queue.oid of {_Tab, ?ALL} -> case try_waiter(W) of queued -> no; _ -> try_waiters_tab(Waiters) end; Oid -> case try_waiter(W) of queued -> Rest = key_delete_all(Oid, #queue.oid, Waiters), try_waiters_tab(Rest); _ -> try_waiters_tab(Waiters) end end; try_waiters_tab([]) -> ok. try_waiter({queue, Oid, Tid, read_write, ReplyTo, _}) -> try_waiter(Oid, read_write, read, write, ReplyTo, Tid); try_waiter({queue, Oid, Tid, IXR = {ix_read,_,_}, ReplyTo, _}) -> try_waiter(Oid, IXR, IXR, read, ReplyTo, Tid); try_waiter({queue, Oid, Tid, Op, ReplyTo, _}) -> try_waiter(Oid, Op, Op, Op, ReplyTo, Tid). try_waiter(Oid, Op, SimpleOp, Lock, ReplyTo, Tid) -> case can_lock(Tid, Lock, Oid, {queue, bad_luck}) of {yes, Default} -> %% Delete from queue: Nice place for trace output ?ets_match_delete(mnesia_lock_queue, #queue{oid=Oid, tid = Tid, op = Op, pid = ReplyTo, lucky = '_'}), Reply = grant_lock(Tid, SimpleOp, Lock, Oid, Default), reply(ReplyTo,Reply), locked; {{queue, _Why}, _} -> ?dbg("Keep ~p ~p ~p ~p~n", [Tid, Oid, Lock, _Why]), queued; % Keep waiter in queue {{no, Lucky}, _} -> C = #cyclic{op = SimpleOp, lock = Lock, oid = Oid, lucky = Lucky}, verbose("** WARNING ** Restarted transaction, possible deadlock in lock queue ~w: cyclic = ~w~n", [Tid, C]), ?ets_match_delete(mnesia_lock_queue, #queue{oid=Oid, tid = Tid, op = Op, pid = ReplyTo, lucky = '_'}), Reply = {not_granted, C}, reply(ReplyTo,Reply), removed end. key_delete_all(Key, Pos, TupleList) -> key_delete_all(Key, Pos, TupleList, []). key_delete_all(Key, Pos, [H|T], Ack) when element(Pos, H) == Key -> key_delete_all(Key, Pos, T, Ack); key_delete_all(Key, Pos, [H|T], Ack) -> key_delete_all(Key, Pos, T, [H|Ack]); key_delete_all(_, _, [], Ack) -> lists:reverse(Ack). ix_read_res(Tab,IxKey,Pos) -> Index = mnesia_index:get_index_table(Tab, Pos), Rks = mnesia_lib:elems(2,mnesia_index:db_get(Index, IxKey)), lists:append(lists:map(fun(Real) -> mnesia_lib:db_get(Tab, Real) end, Rks)). %% ********************* end server code ******************** %% The following code executes at the client side of a transactions %% Aquire a write lock, but do a read, used by %% mnesia:wread/1 rwlock(Tid, Store, Oid) -> {Tab, Key} = Oid, case val({Tab, where_to_read}) of nowhere -> mnesia:abort({no_exists, Tab}); Node -> Lock = write, case need_lock(Store, Tab, Key, Lock) of yes -> {Ns0, Majority} = w_nodes(Tab), Ns = [Node|lists:delete(Node,Ns0)], check_majority(Majority, Tab, Ns), Res = get_rwlocks_on_nodes(Ns, make_ref(), Store, Tid, Oid), ?ets_insert(Store, {{locks, Tab, Key}, Lock}), Res; no -> if Key == ?ALL -> element(2, w_nodes(Tab)); Tab == ?GLOBAL -> element(2, w_nodes(Tab)); true -> dirty_rpc(Node, Tab, Key, Lock) end end end. %% Return a list of nodes or abort transaction %% WE also insert any additional where_to_write nodes %% in the local store under the key == nodes w_nodes(Tab) -> case ?catch_val({Tab, where_to_wlock}) of {[_ | _], _} = Where -> Where; _ -> mnesia:abort({no_exists, Tab}) end. %% If the table has the 'majority' flag set, we can %% only take a write lock if we see a majority of the %% nodes. check_majority(true, Tab, HaveNs) -> check_majority(Tab, HaveNs); check_majority(false, _, _) -> ok. check_majority(Tab, HaveNs) -> case ?catch_val({Tab, majority}) of true -> case mnesia_lib:have_majority(Tab, HaveNs) of true -> ok; false -> mnesia:abort({no_majority, Tab}) end; _ -> ok end. %% aquire a sticky wlock, a sticky lock is a lock %% which remains at this node after the termination of the %% transaction. sticky_wlock(Tid, Store, Oid) -> sticky_lock(Tid, Store, Oid, write). sticky_rwlock(Tid, Store, Oid) -> sticky_lock(Tid, Store, Oid, read_write). sticky_lock(Tid, Store, {Tab, Key} = Oid, Lock) -> N = val({Tab, where_to_read}), if node() == N -> case need_lock(Store, Tab, Key, write) of yes -> do_sticky_lock(Tid, Store, Oid, Lock); no -> dirty_sticky_lock(Tab, Key, [N], Lock) end; true -> mnesia:abort({not_local, Tab}) end. do_sticky_lock(Tid, Store, {Tab, Key} = Oid, Lock) -> {WNodes, Majority} = w_nodes(Tab), sticky_check_majority(Lock, Tab, Majority, WNodes), ?MODULE ! {self(), {test_set_sticky, Tid, Oid, Lock}}, N = node(), receive {?MODULE, N, granted} -> ?ets_insert(Store, {{locks, Tab, Key}, write}), [?ets_insert(Store, {nodes, Node}) || Node <- WNodes], granted; {?MODULE, N, {granted, Val}} -> %% for rwlocks case opt_lookup_in_client(Val, Oid, write) of C = #cyclic{} -> exit({aborted, C}); Val2 -> ?ets_insert(Store, {{locks, Tab, Key}, write}), [?ets_insert(Store, {nodes, Node}) || Node <- WNodes], Val2 end; {?MODULE, N, {not_granted, Reason}} -> exit({aborted, Reason}); {?MODULE, N, not_stuck} -> not_stuck(Tid, Store, Tab, Key, Oid, Lock, N), dirty_sticky_lock(Tab, Key, [N], Lock); {mnesia_down, Node} -> EMsg = {aborted, {node_not_running, Node}}, flush_remaining([N], Node, EMsg); {?MODULE, N, {stuck_elsewhere, _N2}} -> stuck_elsewhere(Tid, Store, Tab, Key, Oid, Lock), dirty_sticky_lock(Tab, Key, [N], Lock) end. sticky_check_majority(W, Tab, true, WNodes) when W==write; W==read_write -> case mnesia_lib:have_majority(Tab, WNodes) of true -> ok; false -> mnesia:abort({no_majority, Tab}) end; sticky_check_majority(_, _, _, _) -> ok. not_stuck(Tid, Store, Tab, _Key, Oid, _Lock, N) -> rlock(Tid, Store, {Tab, ?ALL}), %% needed? wlock(Tid, Store, Oid), %% perfect sync wlock(Tid, Store, {Tab, ?STICK}), %% max one sticker/table Ns = val({Tab, where_to_write}), rpc:abcast(Ns, ?MODULE, {stick, Oid, N}). stuck_elsewhere(Tid, Store, Tab, _Key, Oid, _Lock) -> rlock(Tid, Store, {Tab, ?ALL}), %% needed? wlock(Tid, Store, Oid), %% perfect sync wlock(Tid, Store, {Tab, ?STICK}), %% max one sticker/table Ns = val({Tab, where_to_write}), rpc:abcast(Ns, ?MODULE, {unstick, Tab}). dirty_sticky_lock(Tab, Key, Nodes, Lock) -> if Lock == read_write -> mnesia_lib:db_get(Tab, Key); Key == ?ALL -> Nodes; Tab == ?GLOBAL -> Nodes; true -> ok end. sticky_wlock_table(Tid, Store, Tab) -> sticky_lock(Tid, Store, {Tab, ?ALL}, write). %% aquire a wlock on Oid %% We store a {Tabname, write, Tid} in all locktables %% on all nodes containing a copy of Tabname %% We also store an item {{locks, Tab, Key}, write} in the %% local store when we have aquired the lock. %% wlock(Tid, Store, Oid) -> wlock(Tid, Store, Oid, _CheckMajority = true). wlock(Tid, Store, Oid, CheckMajority) -> {Tab, Key} = Oid, case need_lock(Store, Tab, Key, write) of yes -> {Ns, Majority} = w_nodes(Tab), if CheckMajority -> check_majority(Majority, Tab, Ns); true -> ignore end, Op = {self(), {write, Tid, Oid}}, ?ets_insert(Store, {{locks, Tab, Key}, write}), get_wlocks_on_nodes(Ns, Ns, Store, Op, Oid); no when Key /= ?ALL, Tab /= ?GLOBAL -> []; no -> element(2, w_nodes(Tab)) end. wlock_table(Tid, Store, Tab) -> wlock(Tid, Store, {Tab, ?ALL}). load_lock_table(Tid, Store, Tab) -> wlock(Tid, Store, {Tab, ?ALL}, _CheckMajority = false). %% Write lock even if the table does not exist wlock_no_exist(Tid, Store, Tab, Ns) -> Oid = {Tab, ?ALL}, Op = {self(), {write, Tid, Oid}}, get_wlocks_on_nodes(Ns, Ns, Store, Op, Oid). need_lock(Store, Tab, Key, LockPattern) -> TabL = ?ets_match_object(Store, {{locks, Tab, ?ALL}, LockPattern}), if TabL == [] -> KeyL = ?ets_match_object(Store, {{locks, Tab, Key}, LockPattern}), if KeyL == [] -> yes; true -> no end; true -> no end. add_debug(Nodes) -> % Use process dictionary for debug info put(mnesia_wlock_nodes, Nodes). del_debug() -> erase(mnesia_wlock_nodes). %% We first send lock request to the local node if it is part of the lockers %% then the first sorted node then to the rest of the lockmanagers on all %% nodes holding a copy of the table get_wlocks_on_nodes([Node | Tail], Orig, Store, Request, Oid) -> {?MODULE, Node} ! Request, ?ets_insert(Store, {nodes, Node}), receive_wlocks([Node], undefined, Store, Oid), case node() of Node -> %% Local done try one more get_wlocks_on_nodes(Tail, Orig, Store, Request, Oid); _ -> %% The first succeded cont with the rest get_wlocks_on_nodes(Tail, Store, Request), receive_wlocks(Tail, Orig, Store, Oid) end; get_wlocks_on_nodes([], Orig, _Store, _Request, _Oid) -> Orig. get_wlocks_on_nodes([Node | Tail], Store, Request) -> {?MODULE, Node} ! Request, ?ets_insert(Store,{nodes, Node}), get_wlocks_on_nodes(Tail, Store, Request); get_wlocks_on_nodes([], _, _) -> ok. get_rwlocks_on_nodes([ReadNode|Tail], Ref, Store, Tid, Oid) -> Op = {self(), {read_write, Tid, Oid}}, {?MODULE, ReadNode} ! Op, ?ets_insert(Store, {nodes, ReadNode}), case receive_wlocks([ReadNode], Ref, Store, Oid) of Ref -> get_rwlocks_on_nodes(Tail, Ref, Store, Tid, Oid); Res -> get_wlocks_on_nodes(Tail, Res, Store, {self(), {write, Tid, Oid}}, Oid) end; get_rwlocks_on_nodes([],Res,_,_,_) -> Res. receive_wlocks([], Res, _Store, _Oid) -> del_debug(), Res; receive_wlocks(Nodes = [This|Ns], Res, Store, Oid) -> add_debug(Nodes), receive {?MODULE, Node, granted} -> receive_wlocks(lists:delete(Node,Nodes), Res, Store, Oid); {?MODULE, Node, {granted, Val}} -> %% for rwlocks case opt_lookup_in_client(Val, Oid, write) of C = #cyclic{} -> flush_remaining(Nodes, Node, {aborted, C}); Val2 -> receive_wlocks(lists:delete(Node,Nodes), Val2, Store, Oid) end; {?MODULE, Node, {not_granted, Reason}} -> Reason1 = {aborted, Reason}, flush_remaining(Nodes,Node,Reason1); {?MODULE, Node, {switch, Sticky, _Req}} -> %% for rwlocks Tail = lists:delete(Node,Nodes), Nonstuck = lists:delete(Sticky,Tail), [?ets_insert(Store, {nodes, NSNode}) || NSNode <- Nonstuck], case lists:member(Sticky,Tail) of true -> sticky_flush(Nonstuck,Store), receive_wlocks([Sticky], Res, Store, Oid); false -> sticky_flush(Nonstuck,Store), Res end; {mnesia_down, This} -> % Only look for down from Nodes in list Reason1 = {aborted, {node_not_running, This}}, flush_remaining(Ns, This, Reason1) end. sticky_flush([], _) -> del_debug(), ok; sticky_flush(Ns=[Node | Tail], Store) -> add_debug(Ns), receive {?MODULE, Node, _} -> sticky_flush(Tail, Store); {mnesia_down, Node} -> Reason1 = {aborted, {node_not_running, Node}}, flush_remaining(Tail, Node, Reason1) end. flush_remaining([], _SkipNode, Res) -> del_debug(), exit(Res); flush_remaining(Ns=[SkipNode | Tail ], SkipNode, Res) -> add_debug(Ns), receive {?MODULE, SkipNode, _} -> flush_remaining(Tail, SkipNode, Res) after 0 -> flush_remaining(Tail, SkipNode, Res) end; flush_remaining(Ns=[Node | Tail], SkipNode, Res) -> add_debug(Ns), receive {?MODULE, Node, _} -> flush_remaining(Tail, SkipNode, Res); {mnesia_down, Node} -> flush_remaining(Tail, SkipNode, {aborted, {node_not_running, Node}}) end. opt_lookup_in_client(lookup_in_client, Oid, Lock) -> {Tab, Key} = Oid, try mnesia_lib:db_get(Tab, Key) catch error:_ -> %% Table has been deleted from this node, %% restart the transaction. #cyclic{op = read, lock = Lock, oid = Oid, lucky = nowhere} end; opt_lookup_in_client(Val, _Oid, _Lock) -> Val. return_granted_or_nodes({_, ?ALL} , Nodes) -> Nodes; return_granted_or_nodes({?GLOBAL, _}, Nodes) -> Nodes; return_granted_or_nodes(_ , _Nodes) -> granted. %% We store a {Tab, read, From} item in the %% locks table on the node where we actually do pick up the object %% and we also store an item {lock, Oid, read} in our local store %% so that we can release any locks we hold when we commit. %% This function not only aquires a read lock, but also reads the object %% Oid's are always {Tab, Key} tuples rlock(Tid, Store, Oid) -> {Tab, Key} = Oid, case val({Tab, where_to_read}) of nowhere -> mnesia:abort({no_exists, Tab}); Node -> case need_lock(Store, Tab, Key, '_') of yes -> R = l_request(Node, {read, Tid, Oid}, Store), rlock_get_reply(Node, Store, Oid, R); no -> if Key == ?ALL -> [Node]; Tab == ?GLOBAL -> [Node]; true -> dirty_rpc(Node, Tab, Key, read) end end end. dirty_rpc(nowhere, Tab, Key, _Lock) -> mnesia:abort({no_exists, {Tab, Key}}); dirty_rpc(Node, _Tab, ?ALL, _Lock) -> [Node]; dirty_rpc(Node, ?GLOBAL, _Key, _Lock) -> [Node]; dirty_rpc(Node, Tab, Key, Lock) -> Args = [Tab, Key], case rpc:call(Node, mnesia_lib, db_get, Args) of {badrpc, Reason} -> case val({Tab, where_to_read}) of Node -> ErrorTag = mnesia_lib:dirty_rpc_error_tag(Reason), mnesia:abort({ErrorTag, Args}); _NewNode -> %% Table has been deleted from the node, %% restart the transaction. C = #cyclic{op = read, lock = Lock, oid = {Tab, Key}, lucky = nowhere}, exit({aborted, C}) end; Other -> Other end. rlock_get_reply(Node, Store, Oid, {granted, V}) -> {Tab, Key} = Oid, ?ets_insert(Store, {{locks, Tab, Key}, read}), ?ets_insert(Store, {nodes, Node}), case opt_lookup_in_client(V, Oid, read) of C = #cyclic{} -> mnesia:abort(C); Val -> Val end; rlock_get_reply(Node, Store, Oid, granted) -> {Tab, Key} = Oid, ?ets_insert(Store, {{locks, Tab, Key}, read}), ?ets_insert(Store, {nodes, Node}), return_granted_or_nodes(Oid, [Node]); rlock_get_reply(Node, Store, Tab, {granted, V, RealKeys}) -> %% Kept for backwards compatibility, keep until no old nodes %% are available L = fun(K) -> ?ets_insert(Store, {{locks, Tab, K}, read}) end, lists:foreach(L, RealKeys), ?ets_insert(Store, {nodes, Node}), V; rlock_get_reply(_Node, _Store, _Oid, {not_granted, Reason}) -> exit({aborted, Reason}); rlock_get_reply(_Node, Store, Oid, {switch, N2, Req}) -> ?ets_insert(Store, {nodes, N2}), {?MODULE, N2} ! Req, rlock_get_reply(N2, Store, Oid, l_req_rec(N2, Store)). rlock_table(Tid, Store, Tab) -> rlock(Tid, Store, {Tab, ?ALL}). ixrlock(Tid, Store, Tab, IxKey, Pos) -> case val({Tab, where_to_read}) of nowhere -> mnesia:abort({no_exists, Tab}); Node -> %%% Old code %% R = l_request(Node, {ix_read, Tid, Tab, IxKey, Pos}, Store), %% rlock_get_reply(Node, Store, Tab, R) case need_lock(Store, Tab, ?ALL, read) of no when Node =:= node() -> ix_read_res(Tab,IxKey,Pos); _ -> %% yes or need to get the result from other node R = l_request(Node, {ix_read, Tid, Tab, IxKey, Pos}, Store), rlock_get_reply(Node, Store, Tab, R) end end. %% Grabs the locks or exits global_lock(Tid, Store, Item, write, Ns) -> Oid = {?GLOBAL, Item}, Op = {self(), {write, Tid, Oid}}, get_wlocks_on_nodes(Ns, Ns, Store, Op, Oid); global_lock(Tid, Store, Item, read, Ns) -> Oid = {?GLOBAL, Item}, send_requests(Ns, {read, Tid, Oid}), rec_requests(Ns, Oid, Store), Ns. send_requests([Node | Nodes], X) -> {?MODULE, Node} ! {self(), X}, send_requests(Nodes, X); send_requests([], _X) -> ok. rec_requests([Node | Nodes], Oid, Store) -> Res = l_req_rec(Node, Store), try rlock_get_reply(Node, Store, Oid, Res) of _ -> rec_requests(Nodes, Oid, Store) catch _:Reason -> flush_remaining(Nodes, Node, Reason) end; rec_requests([], _Oid, _Store) -> ok. get_held_locks() -> ?MODULE ! {get_table, self(), mnesia_held_locks}, Locks = receive {mnesia_held_locks, Ls} -> Ls after 5000 -> [] end, rewrite_locks(Locks, []). %% Mnesia internal usage only get_held_locks(Tab) when is_atom(Tab) -> Oid = {Tab, ?ALL}, ?MODULE ! {self(), {is_locked, Oid}}, receive {?MODULE, _Node, Locks} -> case Locks of [] -> []; [{Oid, _Prev, What}] -> What end end. rewrite_locks([{Oid, _, Ls}|Locks], Acc0) -> Acc = rewrite_locks(Ls, Oid, Acc0), rewrite_locks(Locks, Acc); rewrite_locks([], Acc) -> lists:reverse(Acc). rewrite_locks([{Op, Tid}|Ls], Oid, Acc) -> rewrite_locks(Ls, Oid, [{Oid, Op, Tid}|Acc]); rewrite_locks([], _, Acc) -> Acc. get_lock_queue() -> ?MODULE ! {get_table, self(), mnesia_lock_queue}, Q = receive {mnesia_lock_queue, Locks} -> Locks after 5000 -> [] end, [{Oid, Op, Pid, Tid, WFT} || {queue, Oid, Tid, Op, Pid, WFT} <- Q]. do_stop() -> exit(shutdown). %%%%%%%%%%%%%%%%%%%%%%%%%%% %% System upgrade system_continue(_Parent, _Debug, State) -> loop(State). -spec system_terminate(_, _, _, _) -> no_return(). system_terminate(_Reason, _Parent, _Debug, _State) -> do_stop(). system_code_change(State, _Module, _OldVsn, _Extra) -> {ok, State}. %%%%%%%%%%%%%%%%%%%%%%%%%%% %% AXD301 patch sort pids according to R9B sort order %%%%%%%%%%%%%%%%%%%%%%%%%%% %% Om R9B == true, the comparison is done as in R9B plain. %% Om R9B == false, the comparison is done as in any other release. %% cmp_tid(T1, T2) returns -1 if T1 < T2, 0 if T1 = T2 and 1 if T1 > T2. -define(VERSION_MAGIC, 131). -define(ATOM_EXT, 100). -define(PID_EXT, 103). -record(pid_info, {serial, number, nodename, creation}). cmp_tid(R9B, #tid{} = T, #tid{} = T) when R9B == true; R9B == false -> 0; cmp_tid(R9B, #tid{counter = C, pid = Pid1}, #tid{counter = C, pid = Pid2}) when R9B == true; R9B == false -> cmp_pid_info(R9B, pid_to_pid_info(Pid1), pid_to_pid_info(Pid2)); cmp_tid(R9B, #tid{counter = C1}, #tid{counter = C2}) when R9B == true; R9B == false -> cmp(C1, C2). cmp_pid_info(_, #pid_info{} = PI, #pid_info{} = PI) -> 0; cmp_pid_info(false, #pid_info{serial = S, number = N, nodename = NN, creation = C1}, #pid_info{serial = S, number = N, nodename = NN, creation = C2}) -> cmp(C1, C2); cmp_pid_info(false, #pid_info{serial = S, number = N, nodename = NN1}, #pid_info{serial = S, number = N, nodename = NN2}) -> cmp(NN1, NN2); cmp_pid_info(false, #pid_info{serial = S, number = N1}, #pid_info{serial = S, number = N2}) -> cmp(N1, N2); cmp_pid_info(false, #pid_info{serial = S1}, #pid_info{serial = S2}) -> cmp(S1, S2); cmp_pid_info(true, #pid_info{nodename = NN, creation = C, serial = S, number = N1}, #pid_info{nodename = NN, creation = C, serial = S, number = N2}) -> cmp(N1, N2); cmp_pid_info(true, #pid_info{nodename = NN, creation = C, serial = S1}, #pid_info{nodename = NN, creation = C, serial = S2}) -> cmp(S1, S2); cmp_pid_info(true, #pid_info{nodename = NN, creation = C1}, #pid_info{nodename = NN, creation = C2}) -> cmp(C1, C2); cmp_pid_info(true, #pid_info{nodename = NN1}, #pid_info{nodename = NN2}) -> cmp(NN1, NN2). cmp(X, X) -> 0; cmp(X1, X2) when X1 < X2 -> -1; cmp(_X1, _X2) -> 1. pid_to_pid_info(Pid) when is_pid(Pid) -> [?VERSION_MAGIC, ?PID_EXT, ?ATOM_EXT, NNL1, NNL0 | Rest] = binary_to_list(term_to_binary(Pid)), [N3, N2, N1, N0, S3, S2, S1, S0, Creation] = drop(bytes2int(NNL1, NNL0), Rest), #pid_info{serial = bytes2int(S3, S2, S1, S0), number = bytes2int(N3, N2, N1, N0), nodename = node(Pid), creation = Creation}. drop(0, L) -> L; drop(N, [_|L]) when is_integer(N), N > 0 -> drop(N-1, L); drop(N, []) when is_integer(N), N > 0 -> []. bytes2int(N1, N0) when 0 =< N1, N1 =< 255, 0 =< N0, N0 =< 255 -> (N1 bsl 8) bor N0. bytes2int(N3, N2, N1, N0) when 0 =< N3, N3 =< 255, 0 =< N2, N2 =< 255, 0 =< N1, N1 =< 255, 0 =< N0, N0 =< 255 -> (N3 bsl 24) bor (N2 bsl 16) bor (N1 bsl 8) bor N0.