aboutsummaryrefslogtreecommitdiffstats
path: root/lib/dialyzer/test/r9c_tests_SUITE_data/src/mnesia/mnesia_tm.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/dialyzer/test/r9c_tests_SUITE_data/src/mnesia/mnesia_tm.erl')
-rw-r--r--lib/dialyzer/test/r9c_tests_SUITE_data/src/mnesia/mnesia_tm.erl2173
1 files changed, 0 insertions, 2173 deletions
diff --git a/lib/dialyzer/test/r9c_tests_SUITE_data/src/mnesia/mnesia_tm.erl b/lib/dialyzer/test/r9c_tests_SUITE_data/src/mnesia/mnesia_tm.erl
deleted file mode 100644
index 7bee382a89..0000000000
--- a/lib/dialyzer/test/r9c_tests_SUITE_data/src/mnesia/mnesia_tm.erl
+++ /dev/null
@@ -1,2173 +0,0 @@
-%% ``The contents of this file are subject to the Erlang Public License,
-%% Version 1.1, (the "License"); you may not use this file except in
-%% compliance with the License. You should have received a copy of the
-%% Erlang Public License along with this software. If not, it can be
-%% retrieved via the world wide web at http://www.erlang.org/.
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Initial Developer of the Original Code is Ericsson Utvecklings AB.
-%% Portions created by Ericsson are Copyright 1999, Ericsson Utvecklings
-%% AB. All Rights Reserved.''
-%%
-%% $Id: mnesia_tm.erl,v 1.2 2010/03/04 13:54:20 maria Exp $
-%%
--module(mnesia_tm).
-
--export([
- start/0,
- init/1,
- non_transaction/5,
- transaction/6,
- commit_participant/5,
- dirty/2,
- display_info/2,
- do_update_op/3,
- get_info/1,
- get_transactions/0,
- info/1,
- mnesia_down/1,
- prepare_checkpoint/2,
- prepare_checkpoint/1, % Internal
- prepare_snmp/3,
- do_snmp/2,
- put_activity_id/1,
- block_tab/1,
- unblock_tab/1
- ]).
-
-%% sys callback functions
--export([system_continue/3,
- system_terminate/4,
- system_code_change/4
- ]).
-
--include("mnesia.hrl").
--import(mnesia_lib, [set/2]).
--import(mnesia_lib, [fatal/2, verbose/2, dbg_out/2]).
-
--record(state, {coordinators = [], participants = [], supervisor,
- blocked_tabs = [], dirty_queue = []}).
-%% Format on coordinators is [{Tid, EtsTabList} .....
-
--record(prep, {protocol = sym_trans,
- %% async_dirty | sync_dirty | sym_trans | sync_sym_trans | asym_trans
- records = [],
- prev_tab = [], % initiate to a non valid table name
- prev_types,
- prev_snmp,
- types
- }).
-
--record(participant, {tid, pid, commit, disc_nodes = [],
- ram_nodes = [], protocol = sym_trans}).
-
-start() ->
- mnesia_monitor:start_proc(?MODULE, ?MODULE, init, [self()]).
-
-init(Parent) ->
- register(?MODULE, self()),
- process_flag(trap_exit, true),
-
- %% Initialize the schema
- IgnoreFallback = mnesia_monitor:get_env(ignore_fallback_at_startup),
- mnesia_bup:tm_fallback_start(IgnoreFallback),
- mnesia_schema:init(IgnoreFallback),
-
- %% Handshake and initialize transaction recovery
- mnesia_recover:init(),
- Early = mnesia_monitor:init(),
- AllOthers = mnesia_lib:uniq(Early ++ mnesia_lib:all_nodes()) -- [node()],
- set(original_nodes, AllOthers),
- mnesia_recover:connect_nodes(AllOthers),
-
- %% Recover transactions, may wait for decision
- case mnesia_monitor:use_dir() of
- true ->
- P = mnesia_dumper:opt_dump_log(startup), % previous log
- L = mnesia_dumper:opt_dump_log(startup), % latest log
- Msg = "Initial dump of log during startup: ~p~n",
- mnesia_lib:verbose(Msg, [[P, L]]),
- mnesia_log:init();
- false ->
- ignore
- end,
-
- mnesia_schema:purge_tmp_files(),
- mnesia_recover:start_garb(),
-
- ?eval_debug_fun({?MODULE, init}, [{nodes, AllOthers}]),
-
- case val(debug) of
- Debug when Debug /= debug, Debug /= trace ->
- ignore;
- _ ->
- mnesia_subscr:subscribe(whereis(mnesia_event), {table, schema})
- end,
- proc_lib:init_ack(Parent, {ok, self()}),
- doit_loop(#state{supervisor = Parent}).
-
-val(Var) ->
- case ?catch_val(Var) of
- {'EXIT', _ReASoN_} -> mnesia_lib:other_val(Var, _ReASoN_);
- _VaLuE_ -> _VaLuE_
- end.
-
-reply({From,Ref}, R) ->
- From ! {?MODULE, Ref, R};
-reply(From, R) ->
- From ! {?MODULE, node(), R}.
-
-reply(From, R, State) ->
- reply(From, R),
- doit_loop(State).
-
-req(R) ->
- case whereis(?MODULE) of
- undefined ->
- {error, {node_not_running, node()}};
- Pid ->
- Ref = make_ref(),
- Pid ! {{self(), Ref}, R},
- rec(Pid, Ref)
- end.
-
-rec() ->
- rec(whereis(?MODULE)).
-
-rec(Pid) when pid(Pid) ->
- receive
- {?MODULE, _, Reply} ->
- Reply;
-
- {'EXIT', Pid, _} ->
- {error, {node_not_running, node()}}
- end;
-rec(undefined) ->
- {error, {node_not_running, node()}}.
-
-rec(Pid, Ref) ->
- receive
- {?MODULE, Ref, Reply} ->
- Reply;
- {'EXIT', Pid, _} ->
- {error, {node_not_running, node()}}
- end.
-
-tmlink({From, Ref}) when reference(Ref) ->
- link(From);
-tmlink(From) ->
- link(From).
-tmpid({Pid, _Ref}) when pid(Pid) ->
- Pid;
-tmpid(Pid) ->
- Pid.
-
-%% Returns a list of participant transaction Tid's
-mnesia_down(Node) ->
- %% Syncronously call needed in order to avoid
- %% race with mnesia_tm's coordinator processes
- %% that may restart and acquire new locks.
- %% mnesia_monitor takes care of the sync
- case whereis(?MODULE) of
- undefined ->
- mnesia_monitor:mnesia_down(?MODULE, {Node, []});
- Pid ->
- Pid ! {mnesia_down, Node}
- end.
-
-prepare_checkpoint(Nodes, Cp) ->
- rpc:multicall(Nodes, ?MODULE, prepare_checkpoint, [Cp]).
-
-prepare_checkpoint(Cp) ->
- req({prepare_checkpoint,Cp}).
-
-block_tab(Tab) ->
- req({block_tab, Tab}).
-
-unblock_tab(Tab) ->
- req({unblock_tab, Tab}).
-
-doit_loop(#state{coordinators = Coordinators, participants = Participants, supervisor = Sup}
- = State) ->
- receive
- {_From, {async_dirty, Tid, Commit, Tab}} ->
- case lists:member(Tab, State#state.blocked_tabs) of
- false ->
- do_async_dirty(Tid, Commit, Tab),
- doit_loop(State);
- true ->
- Item = {async_dirty, Tid, Commit, Tab},
- State2 = State#state{dirty_queue = [Item | State#state.dirty_queue]},
- doit_loop(State2)
- end;
-
- {From, {sync_dirty, Tid, Commit, Tab}} ->
- case lists:member(Tab, State#state.blocked_tabs) of
- false ->
- do_sync_dirty(From, Tid, Commit, Tab),
- doit_loop(State);
- true ->
- Item = {sync_dirty, From, Tid, Commit, Tab},
- State2 = State#state{dirty_queue = [Item | State#state.dirty_queue]},
- doit_loop(State2)
- end;
-
- {From, start_outer} -> %% Create and associate ets_tab with Tid
- case catch ?ets_new_table(mnesia_trans_store, [bag, public]) of
- {'EXIT', Reason} -> %% system limit
- Msg = "Cannot create an ets table for the "
- "local transaction store",
- reply(From, {error, {system_limit, Msg, Reason}}, State);
- Etab ->
- tmlink(From),
- C = mnesia_recover:incr_trans_tid_serial(),
- ?ets_insert(Etab, {nodes, node()}),
- Tid = #tid{pid = tmpid(From), counter = C},
- A2 = [{Tid , [Etab]} | Coordinators],
- S2 = State#state{coordinators = A2},
- reply(From, {new_tid, Tid, Etab}, S2)
- end;
-
- {From, {ask_commit, Protocol, Tid, Commit, DiscNs, RamNs}} ->
- ?eval_debug_fun({?MODULE, doit_ask_commit},
- [{tid, Tid}, {prot, Protocol}]),
- mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
- Pid =
- case Protocol of
- asym_trans when node(Tid#tid.pid) /= node() ->
- Args = [tmpid(From), Tid, Commit, DiscNs, RamNs],
- spawn_link(?MODULE, commit_participant, Args);
- _ when node(Tid#tid.pid) /= node() -> %% *_sym_trans
- reply(From, {vote_yes, Tid}),
- nopid
- end,
- P = #participant{tid = Tid,
- pid = Pid,
- commit = Commit,
- disc_nodes = DiscNs,
- ram_nodes = RamNs,
- protocol = Protocol},
- State2 = State#state{participants = [P | Participants]},
- doit_loop(State2);
-
- {Tid, do_commit} ->
- case mnesia_lib:key_search_delete(Tid, #participant.tid, Participants) of
- {none, _} ->
- verbose("Tried to commit a non participant transaction ~p~n",
- [Tid]),
- doit_loop(State);
- {P, Participants2} ->
- ?eval_debug_fun({?MODULE, do_commit, pre},
- [{tid, Tid}, {participant, P}]),
- case P#participant.pid of
- nopid ->
- Commit = P#participant.commit,
- Member = lists:member(node(), P#participant.disc_nodes),
- if Member == false ->
- ignore;
- P#participant.protocol == sym_trans ->
- mnesia_log:log(Commit);
- P#participant.protocol == sync_sym_trans ->
- mnesia_log:slog(Commit)
- end,
- mnesia_recover:note_decision(Tid, committed),
- do_commit(Tid, Commit),
- if
- P#participant.protocol == sync_sym_trans ->
- Tid#tid.pid ! {?MODULE, node(), {committed, Tid}};
- true ->
- ignore
- end,
- mnesia_locker:release_tid(Tid),
- transaction_terminated(Tid),
- ?eval_debug_fun({?MODULE, do_commit, post}, [{tid, Tid}, {pid, nopid}]),
- doit_loop(State#state{participants = Participants2});
- Pid when pid(Pid) ->
- Pid ! {Tid, committed},
- ?eval_debug_fun({?MODULE, do_commit, post}, [{tid, Tid}, {pid, Pid}]),
- doit_loop(State)
- end
- end;
-
- {Tid, simple_commit} ->
- mnesia_recover:note_decision(Tid, committed),
- mnesia_locker:release_tid(Tid),
- transaction_terminated(Tid),
- doit_loop(State);
-
- {Tid, {do_abort, Reason}} ->
- ?eval_debug_fun({?MODULE, do_abort, pre}, [{tid, Tid}]),
- mnesia_locker:release_tid(Tid),
- case mnesia_lib:key_search_delete(Tid, #participant.tid, Participants) of
- {none, _} ->
- verbose("Tried to abort a non participant transaction ~p: ~p~n",
- [Tid, Reason]),
- doit_loop(State);
- {P, Participants2} ->
- case P#participant.pid of
- nopid ->
- Commit = P#participant.commit,
- mnesia_recover:note_decision(Tid, aborted),
- do_abort(Tid, Commit),
- if
- P#participant.protocol == sync_sym_trans ->
- Tid#tid.pid ! {?MODULE, node(), {aborted, Tid}};
- true ->
- ignore
- end,
- transaction_terminated(Tid),
- ?eval_debug_fun({?MODULE, do_abort, post}, [{tid, Tid}, {pid, nopid}]),
- doit_loop(State#state{participants = Participants2});
- Pid when pid(Pid) ->
- Pid ! {Tid, {do_abort, Reason}},
- ?eval_debug_fun({?MODULE, do_abort, post},
- [{tid, Tid}, {pid, Pid}]),
- doit_loop(State)
- end
- end;
-
- {From, {add_store, Tid}} -> %% new store for nested transaction
- case catch ?ets_new_table(mnesia_trans_store, [bag, public]) of
- {'EXIT', Reason} -> %% system limit
- Msg = "Cannot create an ets table for a nested "
- "local transaction store",
- reply(From, {error, {system_limit, Msg, Reason}}, State);
- Etab ->
- A2 = add_coord_store(Coordinators, Tid, Etab),
- reply(From, {new_store, Etab},
- State#state{coordinators = A2})
- end;
-
- {From, {del_store, Tid, Current, Obsolete, PropagateStore}} ->
- opt_propagate_store(Current, Obsolete, PropagateStore),
- A2 = del_coord_store(Coordinators, Tid, Current, Obsolete),
- reply(From, store_erased, State#state{coordinators = A2});
-
- {'EXIT', Pid, Reason} ->
- handle_exit(Pid, Reason, State);
-
- {From, {restart, Tid, Store}} ->
- A2 = restore_stores(Coordinators, Tid, Store),
- ?ets_match_delete(Store, '_'),
- ?ets_insert(Store, {nodes, node()}),
- reply(From, {restarted, Tid}, State#state{coordinators = A2});
-
- {delete_transaction, Tid} ->
- %% used to clear transactions which are committed
- %% in coordinator or participant processes
- case mnesia_lib:key_search_delete(Tid, #participant.tid, Participants) of
- {none, _} ->
- case mnesia_lib:key_search_delete(Tid, 1, Coordinators) of
- {none, _} ->
- verbose("** ERROR ** Tried to delete a non transaction ~p~n",
- [Tid]),
- doit_loop(State);
- {{_Tid, Etabs}, A2} ->
- erase_ets_tabs(Etabs),
- transaction_terminated(Tid),
- doit_loop(State#state{coordinators = A2})
- end;
- {_P, Participants2} ->
- transaction_terminated(Tid),
- State2 = State#state{participants = Participants2},
- doit_loop(State2)
- end;
-
- {sync_trans_serial, Tid} ->
- %% Do the Lamport thing here
- mnesia_recover:sync_trans_tid_serial(Tid),
- doit_loop(State);
-
- {From, info} ->
- reply(From, {info, Participants, Coordinators}, State);
-
- {mnesia_down, N} ->
- verbose("Got mnesia_down from ~p, reconfiguring...~n", [N]),
- reconfigure_coordinators(N, Coordinators),
-
- Tids = [P#participant.tid || P <- Participants],
- reconfigure_participants(N, Participants),
- mnesia_monitor:mnesia_down(?MODULE, {N, Tids}),
- doit_loop(State);
-
- {From, {unblock_me, Tab}} ->
- case lists:member(Tab, State#state.blocked_tabs) of
- false ->
- verbose("Wrong dirty Op blocked on ~p ~p ~p",
- [node(), Tab, From]),
- reply(From, unblocked),
- doit_loop(State);
- true ->
- Item = {Tab, unblock_me, From},
- State2 = State#state{dirty_queue = [Item | State#state.dirty_queue]},
- doit_loop(State2)
- end;
-
- {From, {block_tab, Tab}} ->
- State2 = State#state{blocked_tabs = [Tab | State#state.blocked_tabs]},
- reply(From, ok, State2);
-
- {From, {unblock_tab, Tab}} ->
- BlockedTabs2 = State#state.blocked_tabs -- [Tab],
- case lists:member(Tab, BlockedTabs2) of
- false ->
- mnesia_controller:unblock_table(Tab),
- Queue = process_dirty_queue(Tab, State#state.dirty_queue),
- State2 = State#state{blocked_tabs = BlockedTabs2,
- dirty_queue = Queue},
- reply(From, ok, State2);
- true ->
- State2 = State#state{blocked_tabs = BlockedTabs2},
- reply(From, ok, State2)
- end;
-
- {From, {prepare_checkpoint, Cp}} ->
- Res = mnesia_checkpoint:tm_prepare(Cp),
- case Res of
- {ok, _Name, IgnoreNew, _Node} ->
- prepare_pending_coordinators(Coordinators, IgnoreNew),
- prepare_pending_participants(Participants, IgnoreNew);
- {error, _Reason} ->
- ignore
- end,
- reply(From, Res, State);
-
- {system, From, Msg} ->
- dbg_out("~p got {system, ~p, ~p}~n", [?MODULE, From, Msg]),
- sys:handle_system_msg(Msg, From, Sup, ?MODULE, [], State);
-
- Msg ->
- verbose("** ERROR ** ~p got unexpected message: ~p~n", [?MODULE, Msg]),
- doit_loop(State)
- end.
-
-do_sync_dirty(From, Tid, Commit, _Tab) ->
- ?eval_debug_fun({?MODULE, sync_dirty, pre}, [{tid, Tid}]),
- Res = (catch do_dirty(Tid, Commit)),
- ?eval_debug_fun({?MODULE, sync_dirty, post}, [{tid, Tid}]),
- From ! {?MODULE, node(), {dirty_res, Res}}.
-
-do_async_dirty(Tid, Commit, _Tab) ->
- ?eval_debug_fun({?MODULE, async_dirty, pre}, [{tid, Tid}]),
- catch do_dirty(Tid, Commit),
- ?eval_debug_fun({?MODULE, async_dirty, post}, [{tid, Tid}]).
-
-%% Process items in fifo order
-process_dirty_queue(Tab, [Item | Queue]) ->
- Queue2 = process_dirty_queue(Tab, Queue),
- case Item of
- {async_dirty, Tid, Commit, Tab} ->
- do_async_dirty(Tid, Commit, Tab),
- Queue2;
- {sync_dirty, From, Tid, Commit, Tab} ->
- do_sync_dirty(From, Tid, Commit, Tab),
- Queue2;
- {Tab, unblock_me, From} ->
- reply(From, unblocked),
- Queue2;
- _ ->
- [Item | Queue2]
- end;
-process_dirty_queue(_Tab, []) ->
- [].
-
-prepare_pending_coordinators([{Tid, [Store | _Etabs]} | Coords], IgnoreNew) ->
- case catch ?ets_lookup(Store, pending) of
- [] ->
- prepare_pending_coordinators(Coords, IgnoreNew);
- [Pending] ->
- case lists:member(Tid, IgnoreNew) of
- false ->
- mnesia_checkpoint:tm_enter_pending(Pending);
- true ->
- ignore
- end,
- prepare_pending_coordinators(Coords, IgnoreNew);
- {'EXIT', _} ->
- prepare_pending_coordinators(Coords, IgnoreNew)
- end;
-prepare_pending_coordinators([], _IgnoreNew) ->
- ok.
-
-prepare_pending_participants([Part | Parts], IgnoreNew) ->
- Tid = Part#participant.tid,
- D = Part#participant.disc_nodes,
- R = Part#participant.ram_nodes,
- case lists:member(Tid, IgnoreNew) of
- false ->
- mnesia_checkpoint:tm_enter_pending(Tid, D, R);
- true ->
- ignore
- end,
- prepare_pending_participants(Parts, IgnoreNew);
-prepare_pending_participants([], _IgnoreNew) ->
- ok.
-
-handle_exit(Pid, Reason, State) when node(Pid) /= node() ->
- %% We got exit from a remote fool
- dbg_out("~p got remote EXIT from unknown ~p~n",
- [?MODULE, {Pid, Reason}]),
- doit_loop(State);
-
-handle_exit(Pid, _Reason, State) when Pid == State#state.supervisor ->
- %% Our supervisor has died, time to stop
- do_stop(State);
-
-handle_exit(Pid, Reason, State) ->
- %% Check if it is a coordinator
- case pid_search_delete(Pid, State#state.coordinators) of
- {none, _} ->
- %% Check if it is a participant
- case mnesia_lib:key_search_delete(Pid, #participant.pid, State#state.participants) of
- {none, _} ->
- %% We got exit from a local fool
- verbose("** ERROR ** ~p got local EXIT from unknown process: ~p~n",
- [?MODULE, {Pid, Reason}]),
- doit_loop(State);
-
- {P, RestP} when record(P, participant) ->
- fatal("Participant ~p in transaction ~p died ~p~n",
- [P#participant.pid, P#participant.tid, Reason]),
- doit_loop(State#state{participants = RestP})
- end;
-
- {{Tid, Etabs}, RestC} ->
- %% A local coordinator has died and
- %% we must determine the outcome of the
- %% transaction and tell mnesia_tm on the
- %% other nodes about it and then recover
- %% locally.
- recover_coordinator(Tid, Etabs),
- doit_loop(State#state{coordinators = RestC})
- end.
-
-recover_coordinator(Tid, Etabs) ->
- verbose("Coordinator ~p in transaction ~p died.~n", [Tid#tid.pid, Tid]),
-
- Store = hd(Etabs),
- CheckNodes = get_nodes(Store),
- TellNodes = CheckNodes -- [node()],
- case catch arrange(Tid, Store, async) of
- {'EXIT', Reason} ->
- dbg_out("Recovery of coordinator ~p failed:~n", [Tid, Reason]),
- Protocol = asym_trans,
- tell_outcome(Tid, Protocol, node(), CheckNodes, TellNodes);
- {_N, Prep} ->
- %% Tell the participants about the outcome
- Protocol = Prep#prep.protocol,
- Outcome = tell_outcome(Tid, Protocol, node(), CheckNodes, TellNodes),
-
- %% Recover locally
- CR = Prep#prep.records,
- {DiscNs, RamNs} = commit_nodes(CR, [], []),
- {value, Local} = lists:keysearch(node(), #commit.node, CR),
-
- ?eval_debug_fun({?MODULE, recover_coordinator, pre},
- [{tid, Tid}, {outcome, Outcome}, {prot, Protocol}]),
- recover_coordinator(Tid, Protocol, Outcome, Local, DiscNs, RamNs),
- ?eval_debug_fun({?MODULE, recover_coordinator, post},
- [{tid, Tid}, {outcome, Outcome}, {prot, Protocol}])
-
- end,
- erase_ets_tabs(Etabs),
- transaction_terminated(Tid),
- mnesia_locker:release_tid(Tid).
-
-recover_coordinator(Tid, sym_trans, committed, Local, _, _) ->
- mnesia_recover:note_decision(Tid, committed),
- do_dirty(Tid, Local);
-recover_coordinator(Tid, sym_trans, aborted, _Local, _, _) ->
- mnesia_recover:note_decision(Tid, aborted);
-recover_coordinator(Tid, sync_sym_trans, committed, Local, _, _) ->
- mnesia_recover:note_decision(Tid, committed),
- do_dirty(Tid, Local);
-recover_coordinator(Tid, sync_sym_trans, aborted, _Local, _, _) ->
- mnesia_recover:note_decision(Tid, aborted);
-
-recover_coordinator(Tid, asym_trans, committed, Local, DiscNs, RamNs) ->
- D = #decision{tid = Tid, outcome = committed,
- disc_nodes = DiscNs, ram_nodes = RamNs},
- mnesia_recover:log_decision(D),
- do_commit(Tid, Local);
-recover_coordinator(Tid, asym_trans, aborted, Local, DiscNs, RamNs) ->
- D = #decision{tid = Tid, outcome = aborted,
- disc_nodes = DiscNs, ram_nodes = RamNs},
- mnesia_recover:log_decision(D),
- do_abort(Tid, Local).
-
-restore_stores([{Tid, Etstabs} | Tail], Tid, Store) ->
- Remaining = lists:delete(Store, Etstabs),
- erase_ets_tabs(Remaining),
- [{Tid, [Store]} | Tail];
-restore_stores([H | T], Tid, Store) ->
- [H | restore_stores(T, Tid, Store)].
-%% No NIL case on purpose
-
-add_coord_store([{Tid, Stores} | Coordinators], Tid, Etab) ->
- [{Tid, [Etab | Stores]} | Coordinators];
-add_coord_store([H | T], Tid, Etab) ->
- [H | add_coord_store(T, Tid, Etab)].
-%% no NIL case on purpose
-
-del_coord_store([{Tid, Stores} | Coordinators], Tid, Current, Obsolete) ->
- Rest =
- case Stores of
- [Obsolete, Current | Tail] -> Tail;
- [Current, Obsolete | Tail] -> Tail
- end,
- ?ets_delete_table(Obsolete),
- [{Tid, [Current | Rest]} | Coordinators];
-del_coord_store([H | T], Tid, Current, Obsolete) ->
- [H | del_coord_store(T, Tid, Current, Obsolete)].
-%% no NIL case on purpose
-
-erase_ets_tabs([H | T]) ->
- ?ets_delete_table(H),
- erase_ets_tabs(T);
-erase_ets_tabs([]) ->
- ok.
-
-%% Deletes a pid from a list of participants
-%% or from a list of coordinators and returns
-%% {none, All} or {Tr, Rest}
-pid_search_delete(Pid, Trs) ->
- pid_search_delete(Pid, Trs, none, []).
-pid_search_delete(Pid, [Tr = {Tid, _Ts} | Trs], _Val, Ack) when Tid#tid.pid == Pid ->
- pid_search_delete(Pid, Trs, Tr, Ack);
-pid_search_delete(Pid, [Tr | Trs], Val, Ack) ->
- pid_search_delete(Pid, Trs, Val, [Tr | Ack]);
-
-pid_search_delete(_Pid, [], Val, Ack) ->
- {Val, Ack}.
-
-%% When TM gets an EXIT sig, we must also check to see
-%% if the crashing transaction is in the Participant list
-%%
-%% search_participant_for_pid([Participant | Tail], Pid) ->
-%% Tid = Participant#participant.tid,
-%% if
-%% Tid#tid.pid == Pid ->
-%% {coordinator, Participant};
-%% Participant#participant.pid == Pid ->
-%% {participant, Participant};
-%% true ->
-%% search_participant_for_pid(Tail, Pid)
-%% end;
-%% search_participant_for_pid([], _) ->
-%% fool.
-
-transaction_terminated(Tid) ->
- mnesia_checkpoint:tm_exit_pending(Tid),
- Pid = Tid#tid.pid,
- if
- node(Pid) == node() ->
- unlink(Pid);
- true -> %% Do the Lamport thing here
- mnesia_recover:sync_trans_tid_serial(Tid)
- end.
-
-non_transaction(OldState, Fun, Args, ActivityKind, Mod) ->
- Id = {ActivityKind, self()},
- NewState = {Mod, Id, non_transaction},
- put(mnesia_activity_state, NewState),
- %% I Want something uniqe here, references are expensive
- Ref = mNeSia_nOn_TrAnSacTioN,
- RefRes = (catch {Ref, apply(Fun, Args)}),
- case OldState of
- undefined -> erase(mnesia_activity_state);
- _ -> put(mnesia_activity_state, OldState)
- end,
- case RefRes of
- {Ref, Res} ->
- case Res of
- {'EXIT', Reason} -> exit(Reason);
- {aborted, Reason} -> mnesia:abort(Reason);
- _ -> Res
- end;
- {'EXIT', Reason} ->
- exit(Reason);
- Throw ->
- throw(Throw)
- end.
-
-transaction(OldTidTs, Fun, Args, Retries, Mod, Type) ->
- Factor = 1,
- case OldTidTs of
- undefined -> % Outer
- execute_outer(Mod, Fun, Args, Factor, Retries, Type);
- {_OldMod, Tid, Ts} -> % Nested
- execute_inner(Mod, Tid, Ts, Fun, Args, Factor, Retries, Type);
- _ -> % Bad nesting
- {aborted, nested_transaction}
- end.
-
-execute_outer(Mod, Fun, Args, Factor, Retries, Type) ->
- case req(start_outer) of
- {error, Reason} ->
- {aborted, Reason};
- {new_tid, Tid, Store} ->
- Ts = #tidstore{store = Store},
- NewTidTs = {Mod, Tid, Ts},
- put(mnesia_activity_state, NewTidTs),
- execute_transaction(Fun, Args, Factor, Retries, Type)
- end.
-
-execute_inner(Mod, Tid, Ts, Fun, Args, Factor, Retries, Type) ->
- case req({add_store, Tid}) of
- {error, Reason} ->
- {aborted, Reason};
- {new_store, Ets} ->
- copy_ets(Ts#tidstore.store, Ets),
- Up = [Ts#tidstore.store | Ts#tidstore.up_stores],
- NewTs = Ts#tidstore{level = 1 + Ts#tidstore.level,
- store = Ets,
- up_stores = Up},
- NewTidTs = {Mod, Tid, NewTs},
- put(mnesia_activity_state, NewTidTs),
- execute_transaction(Fun, Args, Factor, Retries, Type)
- end.
-
-copy_ets(From, To) ->
- do_copy_ets(?ets_first(From), From, To).
-do_copy_ets('$end_of_table', _,_) ->
- ok;
-do_copy_ets(K, From, To) ->
- Objs = ?ets_lookup(From, K),
- insert_objs(Objs, To),
- do_copy_ets(?ets_next(From, K), From, To).
-
-insert_objs([H|T], Tab) ->
- ?ets_insert(Tab, H),
- insert_objs(T, Tab);
-insert_objs([], _Tab) ->
- ok.
-
-execute_transaction(Fun, Args, Factor, Retries, Type) ->
- case catch apply_fun(Fun, Args, Type) of
- {'EXIT', Reason} ->
- check_exit(Fun, Args, Factor, Retries, Reason, Type);
- {'atomic', Value} ->
- mnesia_lib:incr_counter(trans_commits),
- erase(mnesia_activity_state),
- %% no need to clear locks, already done by commit ...
- %% Flush any un processed mnesia_down messages we might have
- flush_downs(),
- {'atomic', Value};
- {nested_atomic, Value} ->
- mnesia_lib:incr_counter(trans_commits),
- {'atomic', Value};
- Value -> %% User called throw
- Reason = {aborted, {throw, Value}},
- return_abort(Fun, Args, Reason)
- end.
-
-apply_fun(Fun, Args, Type) ->
- Result = apply(Fun, Args),
- case t_commit(Type) of
- do_commit ->
- {'atomic', Result};
- do_commit_nested ->
- {nested_atomic, Result};
- {do_abort, {aborted, Reason}} ->
- {'EXIT', {aborted, Reason}};
- {do_abort, Reason} ->
- {'EXIT', {aborted, Reason}}
- end.
-
-check_exit(Fun, Args, Factor, Retries, Reason, Type) ->
- case Reason of
- {aborted, C} when record(C, cyclic) ->
- maybe_restart(Fun, Args, Factor, Retries, Type, C);
- {aborted, {node_not_running, N}} ->
- maybe_restart(Fun, Args, Factor, Retries, Type, {node_not_running, N});
- {aborted, {bad_commit, N}} ->
- maybe_restart(Fun, Args, Factor, Retries, Type, {bad_commit, N});
- _ ->
- return_abort(Fun, Args, Reason)
- end.
-
-maybe_restart(Fun, Args, Factor, Retries, Type, Why) ->
- {Mod, Tid, Ts} = get(mnesia_activity_state),
- case try_again(Retries) of
- yes when Ts#tidstore.level == 1 ->
- restart(Mod, Tid, Ts, Fun, Args, Factor, Retries, Type, Why);
- yes ->
- return_abort(Fun, Args, Why);
- no ->
- return_abort(Fun, Args, {aborted, nomore})
- end.
-
-try_again(infinity) -> yes;
-try_again(X) when number(X) , X > 1 -> yes;
-try_again(_) -> no.
-
-%% We can only restart toplevel transactions.
-%% If a deadlock situation occurs in a nested transaction
-%% The whole thing including all nested transactions need to be
-%% restarted. The stack is thus popped by a consequtive series of
-%% exit({aborted, #cyclic{}}) calls
-
-restart(Mod, Tid, Ts, Fun, Args, Factor0, Retries0, Type, Why) ->
- mnesia_lib:incr_counter(trans_restarts),
- Retries = decr(Retries0),
- case Why of
- {bad_commit, _N} ->
- return_abort(Fun, Args, Why),
- Factor = 1,
- SleepTime = mnesia_lib:random_time(Factor, Tid#tid.counter),
- dbg_out("Restarting transaction ~w: in ~wms ~w~n", [Tid, SleepTime, Why]),
- timer:sleep(SleepTime),
- execute_outer(Mod, Fun, Args, Factor, Retries, Type);
- {node_not_running, _N} -> %% Avoids hanging in receive_release_tid_ack
- return_abort(Fun, Args, Why),
- Factor = 1,
- SleepTime = mnesia_lib:random_time(Factor, Tid#tid.counter),
- dbg_out("Restarting transaction ~w: in ~wms ~w~n", [Tid, SleepTime, Why]),
- timer:sleep(SleepTime),
- execute_outer(Mod, Fun, Args, Factor, Retries, Type);
- _ ->
- SleepTime = mnesia_lib:random_time(Factor0, Tid#tid.counter),
- dbg_out("Restarting transaction ~w: in ~wms ~w~n", [Tid, SleepTime, Why]),
-
- if
- Factor0 /= 10 ->
- ignore;
- true ->
- %% Our serial may be much larger than other nodes ditto
- AllNodes = val({current, db_nodes}),
- verbose("Sync serial ~p~n", [Tid]),
- rpc:abcast(AllNodes, ?MODULE, {sync_trans_serial, Tid})
- end,
- intercept_friends(Tid, Ts),
- Store = Ts#tidstore.store,
- Nodes = get_nodes(Store),
- ?MODULE ! {self(), {restart, Tid, Store}},
- mnesia_locker:send_release_tid(Nodes, Tid),
- timer:sleep(SleepTime),
- mnesia_locker:receive_release_tid_acc(Nodes, Tid),
- case rec() of
- {restarted, Tid} ->
- execute_transaction(Fun, Args, Factor0 + 1,
- Retries, Type);
- {error, Reason} ->
- mnesia:abort(Reason)
- end
- end.
-
-decr(infinity) -> infinity;
-decr(X) when integer(X), X > 1 -> X - 1;
-decr(_X) -> 0.
-
-return_abort(Fun, Args, Reason) ->
- {Mod, Tid, Ts} = get(mnesia_activity_state),
- OldStore = Ts#tidstore.store,
- Nodes = get_nodes(OldStore),
- intercept_friends(Tid, Ts),
- catch mnesia_lib:incr_counter(trans_failures),
- Level = Ts#tidstore.level,
- if
- Level == 1 ->
- mnesia_locker:async_release_tid(Nodes, Tid),
- ?MODULE ! {delete_transaction, Tid},
- erase(mnesia_activity_state),
- dbg_out("Transaction ~p calling ~p with ~p, failed ~p~n",
- [Tid, Fun, Args, Reason]),
- flush_downs(),
- {aborted, mnesia_lib:fix_error(Reason)};
- true ->
- %% Nested transaction
- [NewStore | Tail] = Ts#tidstore.up_stores,
- req({del_store, Tid, NewStore, OldStore, true}),
- Ts2 = Ts#tidstore{store = NewStore,
- up_stores = Tail,
- level = Level - 1},
- NewTidTs = {Mod, Tid, Ts2},
- put(mnesia_activity_state, NewTidTs),
- case Reason of
- #cyclic{} ->
- exit({aborted, Reason});
- {node_not_running, _N} ->
- exit({aborted, Reason});
- {bad_commit, _N}->
- exit({aborted, Reason});
- _ ->
- {aborted, mnesia_lib:fix_error(Reason)}
- end
- end.
-
-flush_downs() ->
- receive
- {?MODULE, _, _} -> flush_downs(); % Votes
- {mnesia_down, _} -> flush_downs()
- after 0 -> flushed
- end.
-
-put_activity_id(undefined) ->
- erase_activity_id();
-put_activity_id({Mod, Tid, Ts}) when record(Tid, tid), record(Ts, tidstore) ->
- flush_downs(),
- Store = Ts#tidstore.store,
- ?ets_insert(Store, {friends, self()}),
- NewTidTs = {Mod, Tid, Ts},
- put(mnesia_activity_state, NewTidTs);
-put_activity_id(SimpleState) ->
- put(mnesia_activity_state, SimpleState).
-
-erase_activity_id() ->
- flush_downs(),
- erase(mnesia_activity_state).
-
-get_nodes(Store) ->
- case catch ?ets_lookup_element(Store, nodes, 2) of
- {'EXIT', _} -> [node()];
- Nodes -> Nodes
- end.
-
-get_friends(Store) ->
- case catch ?ets_lookup_element(Store, friends, 2) of
- {'EXIT', _} -> [];
- Friends -> Friends
- end.
-
-opt_propagate_store(_Current, _Obsolete, false) ->
- ok;
-opt_propagate_store(Current, Obsolete, true) ->
- propagate_store(Current, nodes, get_nodes(Obsolete)),
- propagate_store(Current, friends, get_friends(Obsolete)).
-
-propagate_store(Store, Var, [Val | Vals]) ->
- ?ets_insert(Store, {Var, Val}),
- propagate_store(Store, Var, Vals);
-propagate_store(_Store, _Var, []) ->
- ok.
-
-%% Tell all processes that are cooperating with the current transaction
-intercept_friends(_Tid, Ts) ->
- Friends = get_friends(Ts#tidstore.store),
- Message = {activity_ended, undefined, self()},
- intercept_best_friend(Friends, Message).
-
-intercept_best_friend([], _Message) ->
- ok;
-intercept_best_friend([Pid | _], Message) ->
- Pid ! Message,
- wait_for_best_friend(Pid, 0).
-
-wait_for_best_friend(Pid, Timeout) ->
- receive
- {'EXIT', Pid, _} -> ok;
- {activity_ended, _, Pid} -> ok
- after Timeout ->
- case my_process_is_alive(Pid) of
- true -> wait_for_best_friend(Pid, 1000);
- false -> ok
- end
- end.
-
-my_process_is_alive(Pid) ->
- case catch erlang:is_process_alive(Pid) of % New BIF in R5
- true ->
- true;
- false ->
- false;
- {'EXIT', _} -> % Pre R5 backward compatibility
- case process_info(Pid, message_queue_len) of
- undefined -> false;
- _ -> true
- end
- end.
-
-dirty(Protocol, Item) ->
- {{Tab, Key}, _Val, _Op} = Item,
- Tid = {dirty, self()},
- Prep = prepare_items(Tid, Tab, Key, [Item], #prep{protocol= Protocol}),
- CR = Prep#prep.records,
- case Protocol of
- async_dirty ->
- %% Send commit records to the other involved nodes,
- %% but do only wait for one node to complete.
- %% Preferrably, the local node if possible.
-
- ReadNode = val({Tab, where_to_read}),
- {WaitFor, FirstRes} = async_send_dirty(Tid, CR, Tab, ReadNode),
- rec_dirty(WaitFor, FirstRes);
-
- sync_dirty ->
- %% Send commit records to the other involved nodes,
- %% and wait for all nodes to complete
- {WaitFor, FirstRes} = sync_send_dirty(Tid, CR, Tab, []),
- rec_dirty(WaitFor, FirstRes);
- _ ->
- mnesia:abort({bad_activity, Protocol})
- end.
-
-%% This is the commit function, The first thing it does,
-%% is to find out which nodes that have been participating
-%% in this particular transaction, all of the mnesia_locker:lock*
-%% functions insert the names of the nodes where it aquires locks
-%% into the local shadow Store
-%% This function exacutes in the context of the user process
-t_commit(Type) ->
- {Mod, Tid, Ts} = get(mnesia_activity_state),
- Store = Ts#tidstore.store,
- if
- Ts#tidstore.level == 1 ->
- intercept_friends(Tid, Ts),
- %% N is number of updates
- case arrange(Tid, Store, Type) of
- {N, Prep} when N > 0 ->
- multi_commit(Prep#prep.protocol,
- Tid, Prep#prep.records, Store);
- {0, Prep} ->
- multi_commit(read_only, Tid, Prep#prep.records, Store)
- end;
- true ->
- %% nested commit
- Level = Ts#tidstore.level,
- [Obsolete | Tail] = Ts#tidstore.up_stores,
- req({del_store, Tid, Store, Obsolete, false}),
- NewTs = Ts#tidstore{store = Store,
- up_stores = Tail,
- level = Level - 1},
- NewTidTs = {Mod, Tid, NewTs},
- put(mnesia_activity_state, NewTidTs),
- do_commit_nested
- end.
-
-%% This function arranges for all objects we shall write in S to be
-%% in a list of {Node, CommitRecord}
-%% Important function for the performance of mnesia.
-
-arrange(Tid, Store, Type) ->
- %% The local node is always included
- Nodes = get_nodes(Store),
- Recs = prep_recs(Nodes, []),
- Key = ?ets_first(Store),
- N = 0,
- Prep =
- case Type of
- async -> #prep{protocol = sym_trans, records = Recs};
- sync -> #prep{protocol = sync_sym_trans, records = Recs}
- end,
- case catch do_arrange(Tid, Store, Key, Prep, N) of
- {'EXIT', Reason} ->
- dbg_out("do_arrange failed ~p ~p~n", [Reason, Tid]),
- case Reason of
- {aborted, R} ->
- mnesia:abort(R);
- _ ->
- mnesia:abort(Reason)
- end;
- {New, Prepared} ->
- {New, Prepared#prep{records = reverse(Prepared#prep.records)}}
- end.
-
-reverse([]) ->
- [];
-reverse([H|R]) when record(H, commit) ->
- [
- H#commit{
- ram_copies = lists:reverse(H#commit.ram_copies),
- disc_copies = lists:reverse(H#commit.disc_copies),
- disc_only_copies = lists:reverse(H#commit.disc_only_copies),
- snmp = lists:reverse(H#commit.snmp)
- }
- | reverse(R)].
-
-prep_recs([N | Nodes], Recs) ->
- prep_recs(Nodes, [#commit{decision = presume_commit, node = N} | Recs]);
-prep_recs([], Recs) ->
- Recs.
-
-%% storage_types is a list of {Node, Storage} tuples
-%% where each tuple represents an active replica
-do_arrange(Tid, Store, {Tab, Key}, Prep, N) ->
- Oid = {Tab, Key},
- Items = ?ets_lookup(Store, Oid), %% Store is a bag
- P2 = prepare_items(Tid, Tab, Key, Items, Prep),
- do_arrange(Tid, Store, ?ets_next(Store, Oid), P2, N + 1);
-do_arrange(Tid, Store, SchemaKey, Prep, N) when SchemaKey == op ->
- Items = ?ets_lookup(Store, SchemaKey), %% Store is a bag
- P2 = prepare_schema_items(Tid, Items, Prep),
- do_arrange(Tid, Store, ?ets_next(Store, SchemaKey), P2, N + 1);
-do_arrange(Tid, Store, RestoreKey, Prep, N) when RestoreKey == restore_op ->
- [{restore_op, R}] = ?ets_lookup(Store, RestoreKey),
- Fun = fun({Tab, Key}, CommitRecs, _RecName, Where, Snmp) ->
- Item = [{{Tab, Key}, {Tab, Key}, delete}],
- do_prepare_items(Tid, Tab, Key, Where, Snmp, Item, CommitRecs);
- (BupRec, CommitRecs, RecName, Where, Snmp) ->
- Tab = element(1, BupRec),
- Key = element(2, BupRec),
- Item =
- if
- Tab == RecName ->
- [{{Tab, Key}, BupRec, write}];
- true ->
- BupRec2 = setelement(1, BupRec, RecName),
- [{{Tab, Key}, BupRec2, write}]
- end,
- do_prepare_items(Tid, Tab, Key, Where, Snmp, Item, CommitRecs)
- end,
- Recs2 = mnesia_schema:arrange_restore(R, Fun, Prep#prep.records),
- P2 = Prep#prep{protocol = asym_trans, records = Recs2},
- do_arrange(Tid, Store, ?ets_next(Store, RestoreKey), P2, N + 1);
-do_arrange(_Tid, _Store, '$end_of_table', Prep, N) ->
- {N, Prep};
-do_arrange(Tid, Store, IgnoredKey, Prep, N) -> %% locks, nodes ... local atoms...
- do_arrange(Tid, Store, ?ets_next(Store, IgnoredKey), Prep, N).
-
-%% Returns a prep record with all items in reverse order
-prepare_schema_items(Tid, Items, Prep) ->
- Types = [{N, schema_ops} || N <- val({current, db_nodes})],
- Recs = prepare_nodes(Tid, Types, Items, Prep#prep.records, schema),
- Prep#prep{protocol = asym_trans, records = Recs}.
-
-%% Returns a prep record with all items in reverse order
-prepare_items(Tid, Tab, Key, Items, Prep) when Prep#prep.prev_tab == Tab ->
- Types = Prep#prep.prev_types,
- Snmp = Prep#prep.prev_snmp,
- Recs = Prep#prep.records,
- Recs2 = do_prepare_items(Tid, Tab, Key, Types, Snmp, Items, Recs),
- Prep#prep{records = Recs2};
-
-prepare_items(Tid, Tab, Key, Items, Prep) ->
- Types = val({Tab, where_to_commit}),
- case Types of
- [] -> mnesia:abort({no_exists, Tab});
- {blocked, _} ->
- unblocked = req({unblock_me, Tab}),
- prepare_items(Tid, Tab, Key, Items, Prep);
- _ ->
- Snmp = val({Tab, snmp}),
- Recs2 = do_prepare_items(Tid, Tab, Key, Types,
- Snmp, Items, Prep#prep.records),
- Prep2 = Prep#prep{records = Recs2, prev_tab = Tab,
- prev_types = Types, prev_snmp = Snmp},
- check_prep(Prep2, Types)
- end.
-
-do_prepare_items(Tid, Tab, Key, Types, Snmp, Items, Recs) ->
- Recs2 = prepare_snmp(Tid, Tab, Key, Types, Snmp, Items, Recs), % May exit
- prepare_nodes(Tid, Types, Items, Recs2, normal).
-
-prepare_snmp(Tab, Key, Items) ->
- case val({Tab, snmp}) of
- [] ->
- [];
- Ustruct when Key /= '_' ->
- {_Oid, _Val, Op} = hd(Items),
- %% Still making snmp oid (not used) because we want to catch errors here
- %% And also it keeps backwards comp. with old nodes.
- SnmpOid = mnesia_snmp_hook:key_to_oid(Tab, Key, Ustruct), % May exit
- [{Op, Tab, Key, SnmpOid}];
- _ ->
- [{clear_table, Tab}]
- end.
-
-prepare_snmp(_Tid, _Tab, _Key, _Types, [], _Items, Recs) ->
- Recs;
-
-prepare_snmp(Tid, Tab, Key, Types, Us, Items, Recs) ->
- if Key /= '_' ->
- {_Oid, _Val, Op} = hd(Items),
- SnmpOid = mnesia_snmp_hook:key_to_oid(Tab, Key, Us), % May exit
- prepare_nodes(Tid, Types, [{Op, Tab, Key, SnmpOid}], Recs, snmp);
- Key == '_' ->
- prepare_nodes(Tid, Types, [{clear_table, Tab}], Recs, snmp)
- end.
-
-check_prep(Prep, Types) when Prep#prep.types == Types ->
- Prep;
-check_prep(Prep, Types) when Prep#prep.types == undefined ->
- Prep#prep{types = Types};
-check_prep(Prep, _Types) ->
- Prep#prep{protocol = asym_trans}.
-
-%% Returns a list of commit records
-prepare_nodes(Tid, [{Node, Storage} | Rest], Items, C, Kind) ->
- {Rec, C2} = pick_node(Tid, Node, C, []),
- Rec2 = prepare_node(Node, Storage, Items, Rec, Kind),
- [Rec2 | prepare_nodes(Tid, Rest, Items, C2, Kind)];
-prepare_nodes(_Tid, [], _Items, CommitRecords, _Kind) ->
- CommitRecords.
-
-pick_node(Tid, Node, [Rec | Rest], Done) ->
- if
- Rec#commit.node == Node ->
- {Rec, Done ++ Rest};
- true ->
- pick_node(Tid, Node, Rest, [Rec | Done])
- end;
-pick_node(_Tid, Node, [], Done) ->
- {#commit{decision = presume_commit, node = Node}, Done}.
-
-prepare_node(Node, Storage, [Item | Items], Rec, Kind) when Kind == snmp ->
- Rec2 = Rec#commit{snmp = [Item | Rec#commit.snmp]},
- prepare_node(Node, Storage, Items, Rec2, Kind);
-prepare_node(Node, Storage, [Item | Items], Rec, Kind) when Kind /= schema ->
- Rec2 =
- case Storage of
- ram_copies ->
- Rec#commit{ram_copies = [Item | Rec#commit.ram_copies]};
- disc_copies ->
- Rec#commit{disc_copies = [Item | Rec#commit.disc_copies]};
- disc_only_copies ->
- Rec#commit{disc_only_copies =
- [Item | Rec#commit.disc_only_copies]}
- end,
- prepare_node(Node, Storage, Items, Rec2, Kind);
-prepare_node(_Node, _Storage, Items, Rec, Kind)
- when Kind == schema, Rec#commit.schema_ops == [] ->
- Rec#commit{schema_ops = Items};
-prepare_node(_Node, _Storage, [], Rec, _Kind) ->
- Rec.
-
-%% multi_commit((Protocol, Tid, CommitRecords, Store)
-%% Local work is always performed in users process
-multi_commit(read_only, Tid, CR, _Store) ->
- %% This featherweight commit protocol is used when no
- %% updates has been performed in the transaction.
-
- {DiscNs, RamNs} = commit_nodes(CR, [], []),
- Msg = {Tid, simple_commit},
- rpc:abcast(DiscNs -- [node()], ?MODULE, Msg),
- rpc:abcast(RamNs -- [node()], ?MODULE, Msg),
- mnesia_recover:note_decision(Tid, committed),
- mnesia_locker:release_tid(Tid),
- ?MODULE ! {delete_transaction, Tid},
- do_commit;
-
-multi_commit(sym_trans, Tid, CR, Store) ->
- %% This lightweight commit protocol is used when all
- %% the involved tables are replicated symetrically.
- %% Their storage types must match on each node.
- %%
- %% 1 Ask the other involved nodes if they want to commit
- %% All involved nodes votes yes if they are up
- %% 2a Somebody has voted no
- %% Tell all yes voters to do_abort
- %% 2b Everybody has voted yes
- %% Tell everybody to do_commit. I.e. that they should
- %% prepare the commit, log the commit record and
- %% perform the updates.
- %%
- %% The outcome is kept 3 minutes in the transient decision table.
- %%
- %% Recovery:
- %% If somebody dies before the coordinator has
- %% broadcasted do_commit, the transaction is aborted.
- %%
- %% If a participant dies, the table load algorithm
- %% ensures that the contents of the involved tables
- %% are picked from another node.
- %%
- %% If the coordinator dies, each participants checks
- %% the outcome with all the others. If all are uncertain
- %% about the outcome, the transaction is aborted. If
- %% somebody knows the outcome the others will follow.
-
- {DiscNs, RamNs} = commit_nodes(CR, [], []),
- Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
- ?ets_insert(Store, Pending),
-
- {WaitFor, Local} = ask_commit(sym_trans, Tid, CR, DiscNs, RamNs),
- {Outcome, []} = rec_all(WaitFor, Tid, do_commit, []),
- ?eval_debug_fun({?MODULE, multi_commit_sym},
- [{tid, Tid}, {outcome, Outcome}]),
- rpc:abcast(DiscNs -- [node()], ?MODULE, {Tid, Outcome}),
- rpc:abcast(RamNs -- [node()], ?MODULE, {Tid, Outcome}),
- case Outcome of
- do_commit ->
- mnesia_recover:note_decision(Tid, committed),
- do_dirty(Tid, Local),
- mnesia_locker:release_tid(Tid),
- ?MODULE ! {delete_transaction, Tid};
- {do_abort, _Reason} ->
- mnesia_recover:note_decision(Tid, aborted)
- end,
- ?eval_debug_fun({?MODULE, multi_commit_sym, post},
- [{tid, Tid}, {outcome, Outcome}]),
- Outcome;
-
-multi_commit(sync_sym_trans, Tid, CR, Store) ->
- %% This protocol is the same as sym_trans except that it
- %% uses syncronized calls to disk_log and syncronized commits
- %% when several nodes are involved.
-
- {DiscNs, RamNs} = commit_nodes(CR, [], []),
- Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
- ?ets_insert(Store, Pending),
-
- {WaitFor, Local} = ask_commit(sync_sym_trans, Tid, CR, DiscNs, RamNs),
- {Outcome, []} = rec_all(WaitFor, Tid, do_commit, []),
- ?eval_debug_fun({?MODULE, multi_commit_sym_sync},
- [{tid, Tid}, {outcome, Outcome}]),
- rpc:abcast(DiscNs -- [node()], ?MODULE, {Tid, Outcome}),
- rpc:abcast(RamNs -- [node()], ?MODULE, {Tid, Outcome}),
- case Outcome of
- do_commit ->
- mnesia_recover:note_decision(Tid, committed),
- mnesia_log:slog(Local),
- do_commit(Tid, Local),
- %% Just wait for completion result is ignore.
- rec_all(WaitFor, Tid, ignore, []),
- mnesia_locker:release_tid(Tid),
- ?MODULE ! {delete_transaction, Tid};
- {do_abort, _Reason} ->
- mnesia_recover:note_decision(Tid, aborted)
- end,
- ?eval_debug_fun({?MODULE, multi_commit_sym, post},
- [{tid, Tid}, {outcome, Outcome}]),
- Outcome;
-
-multi_commit(asym_trans, Tid, CR, Store) ->
- %% This more expensive commit protocol is used when
- %% table definitions are changed (schema transactions).
- %% It is also used when the involved tables are
- %% replicated asymetrically. If the storage type differs
- %% on at least one node this protocol is used.
- %%
- %% 1 Ask the other involved nodes if they want to commit.
- %% All involved nodes prepares the commit, logs a presume_abort
- %% commit record and votes yes or no depending of the
- %% outcome of the prepare. The preparation is also performed
- %% by the coordinator.
- %%
- %% 2a Somebody has died or voted no
- %% Tell all yes voters to do_abort
- %% 2b Everybody has voted yes
- %% Put a unclear marker in the log.
- %% Tell the others to pre_commit. I.e. that they should
- %% put a unclear marker in the log and reply
- %% acc_pre_commit when they are done.
- %%
- %% 3a Somebody died
- %% Tell the remaining participants to do_abort
- %% 3b Everybody has replied acc_pre_commit
- %% Tell everybody to committed. I.e that they should
- %% put a committed marker in the log, perform the updates
- %% and reply done_commit when they are done. The coordinator
- %% must wait with putting his committed marker inte the log
- %% until the committed has been sent to all the others.
- %% Then he performs local commit before collecting replies.
- %%
- %% 4 Everybody has either died or replied done_commit
- %% Return to the caller.
- %%
- %% Recovery:
- %% If the coordinator dies, the participants (and
- %% the coordinator when he starts again) must do
- %% the following:
- %%
- %% If we have no unclear marker in the log we may
- %% safely abort, since we know that nobody may have
- %% decided to commit yet.
- %%
- %% If we have a committed marker in the log we may
- %% safely commit since we know that everybody else
- %% also will come to this conclusion.
- %%
- %% If we have a unclear marker but no committed
- %% in the log we are uncertain about the real outcome
- %% of the transaction and must ask the others before
- %% we can decide what to do. If someone knows the
- %% outcome we will do the same. If nobody knows, we
- %% will wait for the remaining involved nodes to come
- %% up. When all involved nodes are up and uncertain,
- %% we decide to commit (first put a committed marker
- %% in the log, then do the updates).
-
- D = #decision{tid = Tid, outcome = presume_abort},
- {D2, CR2} = commit_decision(D, CR, [], []),
- DiscNs = D2#decision.disc_nodes,
- RamNs = D2#decision.ram_nodes,
- Pending = mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
- ?ets_insert(Store, Pending),
- {WaitFor, Local} = ask_commit(asym_trans, Tid, CR2, DiscNs, RamNs),
- SchemaPrep = (catch mnesia_schema:prepare_commit(Tid, Local, {coord, WaitFor})),
- {Votes, Pids} = rec_all(WaitFor, Tid, do_commit, []),
-
- ?eval_debug_fun({?MODULE, multi_commit_asym_got_votes},
- [{tid, Tid}, {votes, Votes}]),
- case Votes of
- do_commit ->
- case SchemaPrep of
- {_Modified, C, DumperMode} when record(C, commit) ->
- mnesia_log:log(C), % C is not a binary
- ?eval_debug_fun({?MODULE, multi_commit_asym_log_commit_rec},
- [{tid, Tid}]),
-
- D3 = C#commit.decision,
- D4 = D3#decision{outcome = unclear},
- mnesia_recover:log_decision(D4),
- ?eval_debug_fun({?MODULE, multi_commit_asym_log_commit_dec},
- [{tid, Tid}]),
- tell_participants(Pids, {Tid, pre_commit}),
- %% Now we are uncertain and we do not know
- %% if all participants have logged that
- %% they are uncertain or not
- rec_acc_pre_commit(Pids, Tid, Store, C,
- do_commit, DumperMode, [], []);
- {'EXIT', Reason} ->
- %% The others have logged the commit
- %% record but they are not uncertain
- mnesia_recover:note_decision(Tid, aborted),
- ?eval_debug_fun({?MODULE, multi_commit_asym_prepare_exit},
- [{tid, Tid}]),
- tell_participants(Pids, {Tid, {do_abort, Reason}}),
- do_abort(Tid, Local),
- {do_abort, Reason}
- end;
-
- {do_abort, Reason} ->
- %% The others have logged the commit
- %% record but they are not uncertain
- mnesia_recover:note_decision(Tid, aborted),
- ?eval_debug_fun({?MODULE, multi_commit_asym_do_abort}, [{tid, Tid}]),
- tell_participants(Pids, {Tid, {do_abort, Reason}}),
- do_abort(Tid, Local),
- {do_abort, Reason}
- end.
-
-%% Returns do_commit or {do_abort, Reason}
-rec_acc_pre_commit([Pid | Tail], Tid, Store, Commit, Res, DumperMode,
- GoodPids, SchemaAckPids) ->
- receive
- {?MODULE, _, {acc_pre_commit, Tid, Pid, true}} ->
- rec_acc_pre_commit(Tail, Tid, Store, Commit, Res, DumperMode,
- [Pid | GoodPids], [Pid | SchemaAckPids]);
-
- {?MODULE, _, {acc_pre_commit, Tid, Pid, false}} ->
- rec_acc_pre_commit(Tail, Tid, Store, Commit, Res, DumperMode,
- [Pid | GoodPids], SchemaAckPids);
-
- {?MODULE, _, {acc_pre_commit, Tid, Pid}} ->
- %% Kept for backwards compatibility. Remove after Mnesia 4.x
- rec_acc_pre_commit(Tail, Tid, Store, Commit, Res, DumperMode,
- [Pid | GoodPids], [Pid | SchemaAckPids]);
-
- {mnesia_down, Node} when Node == node(Pid) ->
- AbortRes = {do_abort, {bad_commit, Node}},
- rec_acc_pre_commit(Tail, Tid, Store, Commit, AbortRes, DumperMode,
- GoodPids, SchemaAckPids)
- end;
-rec_acc_pre_commit([], Tid, Store, Commit, Res, DumperMode, GoodPids, SchemaAckPids) ->
- D = Commit#commit.decision,
- case Res of
- do_commit ->
- %% Now everybody knows that the others
- %% has voted yes. We also know that
- %% everybody are uncertain.
- prepare_sync_schema_commit(Store, SchemaAckPids),
- tell_participants(GoodPids, {Tid, committed}),
- D2 = D#decision{outcome = committed},
- mnesia_recover:log_decision(D2),
- ?eval_debug_fun({?MODULE, rec_acc_pre_commit_log_commit},
- [{tid, Tid}]),
-
- %% Now we have safely logged committed
- %% and we can recover without asking others
- do_commit(Tid, Commit, DumperMode),
- ?eval_debug_fun({?MODULE, rec_acc_pre_commit_done_commit},
- [{tid, Tid}]),
- sync_schema_commit(Tid, Store, SchemaAckPids),
- mnesia_locker:release_tid(Tid),
- ?MODULE ! {delete_transaction, Tid};
-
- {do_abort, Reason} ->
- tell_participants(GoodPids, {Tid, {do_abort, Reason}}),
- D2 = D#decision{outcome = aborted},
- mnesia_recover:log_decision(D2),
- ?eval_debug_fun({?MODULE, rec_acc_pre_commit_log_abort},
- [{tid, Tid}]),
- do_abort(Tid, Commit),
- ?eval_debug_fun({?MODULE, rec_acc_pre_commit_done_abort},
- [{tid, Tid}])
- end,
- Res.
-
-%% Note all nodes in case of mnesia_down mgt
-prepare_sync_schema_commit(_Store, []) ->
- ok;
-prepare_sync_schema_commit(Store, [Pid | Pids]) ->
- ?ets_insert(Store, {waiting_for_commit_ack, node(Pid)}),
- prepare_sync_schema_commit(Store, Pids).
-
-sync_schema_commit(_Tid, _Store, []) ->
- ok;
-sync_schema_commit(Tid, Store, [Pid | Tail]) ->
- receive
- {?MODULE, _, {schema_commit, Tid, Pid}} ->
- ?ets_match_delete(Store, {waiting_for_commit_ack, node(Pid)}),
- sync_schema_commit(Tid, Store, Tail);
-
- {mnesia_down, Node} when Node == node(Pid) ->
- ?ets_match_delete(Store, {waiting_for_commit_ack, Node}),
- sync_schema_commit(Tid, Store, Tail)
- end.
-
-tell_participants([Pid | Pids], Msg) ->
- Pid ! Msg,
- tell_participants(Pids, Msg);
-tell_participants([], _Msg) ->
- ok.
-
-%% No need for trapping exits. We are only linked
-%% to mnesia_tm and if it dies we should also die.
-%% The same goes for disk_log and dets.
-commit_participant(Coord, Tid, Bin, DiscNs, RamNs) when binary(Bin) ->
- Commit = binary_to_term(Bin),
- commit_participant(Coord, Tid, Bin, Commit, DiscNs, RamNs);
-commit_participant(Coord, Tid, C, DiscNs, RamNs) when record(C, commit) ->
- commit_participant(Coord, Tid, C, C, DiscNs, RamNs).
-
-commit_participant(Coord, Tid, Bin, C0, DiscNs, _RamNs) ->
- ?eval_debug_fun({?MODULE, commit_participant, pre}, [{tid, Tid}]),
- case catch mnesia_schema:prepare_commit(Tid, C0, {part, Coord}) of
- {Modified, C, DumperMode} when record(C, commit) ->
- %% If we can not find any local unclear decision
- %% we should presume abort at startup recovery
- case lists:member(node(), DiscNs) of
- false ->
- ignore;
- true ->
- case Modified of
- false -> mnesia_log:log(Bin);
- true -> mnesia_log:log(C)
- end
- end,
- ?eval_debug_fun({?MODULE, commit_participant, vote_yes},
- [{tid, Tid}]),
- reply(Coord, {vote_yes, Tid, self()}),
-
- receive
- {Tid, pre_commit} ->
- D = C#commit.decision,
- mnesia_recover:log_decision(D#decision{outcome = unclear}),
- ?eval_debug_fun({?MODULE, commit_participant, pre_commit},
- [{tid, Tid}]),
- Expect_schema_ack = C#commit.schema_ops /= [],
- reply(Coord, {acc_pre_commit, Tid, self(), Expect_schema_ack}),
-
- %% Now we are vulnerable for failures, since
- %% we cannot decide without asking others
- receive
- {Tid, committed} ->
- mnesia_recover:log_decision(D#decision{outcome = committed}),
- ?eval_debug_fun({?MODULE, commit_participant, log_commit},
- [{tid, Tid}]),
- do_commit(Tid, C, DumperMode),
- case Expect_schema_ack of
- false -> ignore;
- true -> reply(Coord, {schema_commit, Tid, self()})
- end,
- ?eval_debug_fun({?MODULE, commit_participant, do_commit},
- [{tid, Tid}]);
-
- {Tid, {do_abort, _Reason}} ->
- mnesia_recover:log_decision(D#decision{outcome = aborted}),
- ?eval_debug_fun({?MODULE, commit_participant, log_abort},
- [{tid, Tid}]),
- mnesia_schema:undo_prepare_commit(Tid, C),
- ?eval_debug_fun({?MODULE, commit_participant, undo_prepare},
- [{tid, Tid}]);
-
- {'EXIT', _, _} ->
- mnesia_recover:log_decision(D#decision{outcome = aborted}),
- ?eval_debug_fun({?MODULE, commit_participant, exit_log_abort},
- [{tid, Tid}]),
- mnesia_schema:undo_prepare_commit(Tid, C),
- ?eval_debug_fun({?MODULE, commit_participant, exit_undo_prepare},
- [{tid, Tid}]);
-
- Msg ->
- verbose("** ERROR ** commit_participant ~p, got unexpected msg: ~p~n",
- [Tid, Msg])
- end;
- {Tid, {do_abort, _Reason}} ->
- mnesia_schema:undo_prepare_commit(Tid, C),
- ?eval_debug_fun({?MODULE, commit_participant, pre_commit_undo_prepare},
- [{tid, Tid}]);
-
- {'EXIT', _, _} ->
- mnesia_schema:undo_prepare_commit(Tid, C),
- ?eval_debug_fun({?MODULE, commit_participant, pre_commit_undo_prepare}, [{tid, Tid}]);
-
- Msg ->
- verbose("** ERROR ** commit_participant ~p, got unexpected msg: ~p~n",
- [Tid, Msg])
- end;
-
- {'EXIT', Reason} ->
- ?eval_debug_fun({?MODULE, commit_participant, vote_no},
- [{tid, Tid}]),
- reply(Coord, {vote_no, Tid, Reason}),
- mnesia_schema:undo_prepare_commit(Tid, C0)
- end,
- mnesia_locker:release_tid(Tid),
- ?MODULE ! {delete_transaction, Tid},
- unlink(whereis(?MODULE)),
- exit(normal).
-
-do_abort(Tid, Bin) when binary(Bin) ->
- %% Possible optimization:
- %% If we want we could pass arround a flag
- %% that tells us whether the binary contains
- %% schema ops or not. Only if the binary
- %% contains schema ops there are meningful
- %% unpack the binary and perform
- %% mnesia_schema:undo_prepare_commit/1.
- do_abort(Tid, binary_to_term(Bin));
-do_abort(Tid, Commit) ->
- mnesia_schema:undo_prepare_commit(Tid, Commit),
- Commit.
-
-do_dirty(Tid, Commit) when Commit#commit.schema_ops == [] ->
- mnesia_log:log(Commit),
- do_commit(Tid, Commit).
-
-%% do_commit(Tid, CommitRecord)
-do_commit(Tid, Bin) when binary(Bin) ->
- do_commit(Tid, binary_to_term(Bin));
-do_commit(Tid, C) ->
- do_commit(Tid, C, optional).
-do_commit(Tid, Bin, DumperMode) when binary(Bin) ->
- do_commit(Tid, binary_to_term(Bin), DumperMode);
-do_commit(Tid, C, DumperMode) ->
- mnesia_dumper:update(Tid, C#commit.schema_ops, DumperMode),
- R = do_snmp(Tid, C#commit.snmp),
- R2 = do_update(Tid, ram_copies, C#commit.ram_copies, R),
- R3 = do_update(Tid, disc_copies, C#commit.disc_copies, R2),
- do_update(Tid, disc_only_copies, C#commit.disc_only_copies, R3).
-
-%% Update the items
-do_update(Tid, Storage, [Op | Ops], OldRes) ->
- case catch do_update_op(Tid, Storage, Op) of
- ok ->
- do_update(Tid, Storage, Ops, OldRes);
- {'EXIT', Reason} ->
- %% This may only happen when we recently have
- %% deleted our local replica, changed storage_type
- %% or transformed table
- %% BUGBUG: Updates may be lost if storage_type is changed.
- %% Determine actual storage type and try again.
- %% BUGBUG: Updates may be lost if table is transformed.
-
- verbose("do_update in ~w failed: ~p -> {'EXIT', ~p}~n",
- [Tid, Op, Reason]),
- do_update(Tid, Storage, Ops, OldRes);
- NewRes ->
- do_update(Tid, Storage, Ops, NewRes)
- end;
-do_update(_Tid, _Storage, [], Res) ->
- Res.
-
-do_update_op(Tid, Storage, {{Tab, K}, Obj, write}) ->
- commit_write(?catch_val({Tab, commit_work}), Tid,
- Tab, K, Obj, undefined),
- mnesia_lib:db_put(Storage, Tab, Obj);
-
-do_update_op(Tid, Storage, {{Tab, K}, Val, delete}) ->
- commit_delete(?catch_val({Tab, commit_work}), Tid, Tab, K, Val, undefined),
- mnesia_lib:db_erase(Storage, Tab, K);
-
-do_update_op(Tid, Storage, {{Tab, K}, {RecName, Incr}, update_counter}) ->
- {NewObj, OldObjs} =
- case catch mnesia_lib:db_update_counter(Storage, Tab, K, Incr) of
- NewVal when integer(NewVal), NewVal >= 0 ->
- {{RecName, K, NewVal}, [{RecName, K, NewVal - Incr}]};
- _ ->
- Zero = {RecName, K, 0},
- mnesia_lib:db_put(Storage, Tab, Zero),
- {Zero, []}
- end,
- commit_update(?catch_val({Tab, commit_work}), Tid, Tab,
- K, NewObj, OldObjs),
- element(3, NewObj);
-
-do_update_op(Tid, Storage, {{Tab, Key}, Obj, delete_object}) ->
- commit_del_object(?catch_val({Tab, commit_work}),
- Tid, Tab, Key, Obj, undefined),
- mnesia_lib:db_match_erase(Storage, Tab, Obj);
-
-do_update_op(Tid, Storage, {{Tab, Key}, Obj, clear_table}) ->
- commit_clear(?catch_val({Tab, commit_work}), Tid, Tab, Key, Obj),
- mnesia_lib:db_match_erase(Storage, Tab, Obj).
-
-commit_write([], _, _, _, _, _) -> ok;
-commit_write([{checkpoints, CpList}|R], Tid, Tab, K, Obj, Old) ->
- mnesia_checkpoint:tm_retain(Tid, Tab, K, write, CpList),
- commit_write(R, Tid, Tab, K, Obj, Old);
-commit_write([H|R], Tid, Tab, K, Obj, Old)
- when element(1, H) == subscribers ->
- mnesia_subscr:report_table_event(H, Tab, Tid, Obj, write, Old),
- commit_write(R, Tid, Tab, K, Obj, Old);
-commit_write([H|R], Tid, Tab, K, Obj, Old)
- when element(1, H) == index ->
- mnesia_index:add_index(H, Tab, K, Obj, Old),
- commit_write(R, Tid, Tab, K, Obj, Old).
-
-commit_update([], _, _, _, _, _) -> ok;
-commit_update([{checkpoints, CpList}|R], Tid, Tab, K, Obj, _) ->
- Old = mnesia_checkpoint:tm_retain(Tid, Tab, K, write, CpList),
- commit_update(R, Tid, Tab, K, Obj, Old);
-commit_update([H|R], Tid, Tab, K, Obj, Old)
- when element(1, H) == subscribers ->
- mnesia_subscr:report_table_event(H, Tab, Tid, Obj, write, Old),
- commit_update(R, Tid, Tab, K, Obj, Old);
-commit_update([H|R], Tid, Tab, K, Obj, Old)
- when element(1, H) == index ->
- mnesia_index:add_index(H, Tab, K, Obj, Old),
- commit_update(R, Tid, Tab, K, Obj, Old).
-
-commit_delete([], _, _, _, _, _) -> ok;
-commit_delete([{checkpoints, CpList}|R], Tid, Tab, K, Obj, _) ->
- Old = mnesia_checkpoint:tm_retain(Tid, Tab, K, delete, CpList),
- commit_delete(R, Tid, Tab, K, Obj, Old);
-commit_delete([H|R], Tid, Tab, K, Obj, Old)
- when element(1, H) == subscribers ->
- mnesia_subscr:report_table_event(H, Tab, Tid, Obj, delete, Old),
- commit_delete(R, Tid, Tab, K, Obj, Old);
-commit_delete([H|R], Tid, Tab, K, Obj, Old)
- when element(1, H) == index ->
- mnesia_index:delete_index(H, Tab, K),
- commit_delete(R, Tid, Tab, K, Obj, Old).
-
-commit_del_object([], _, _, _, _, _) -> ok;
-commit_del_object([{checkpoints, CpList}|R], Tid, Tab, K, Obj, _) ->
- Old = mnesia_checkpoint:tm_retain(Tid, Tab, K, delete_object, CpList),
- commit_del_object(R, Tid, Tab, K, Obj, Old);
-commit_del_object([H|R], Tid, Tab, K, Obj, Old)
- when element(1, H) == subscribers ->
- mnesia_subscr:report_table_event(H, Tab, Tid, Obj, delete_object, Old),
- commit_del_object(R, Tid, Tab, K, Obj, Old);
-commit_del_object([H|R], Tid, Tab, K, Obj, Old)
- when element(1, H) == index ->
- mnesia_index:del_object_index(H, Tab, K, Obj, Old),
- commit_del_object(R, Tid, Tab, K, Obj, Old).
-
-commit_clear([], _, _, _, _) -> ok;
-commit_clear([{checkpoints, CpList}|R], Tid, Tab, K, Obj) ->
- mnesia_checkpoint:tm_retain(Tid, Tab, K, clear_table, CpList),
- commit_clear(R, Tid, Tab, K, Obj);
-commit_clear([H|R], Tid, Tab, K, Obj)
- when element(1, H) == subscribers ->
- mnesia_subscr:report_table_event(H, Tab, Tid, Obj, clear_table, undefined),
- commit_clear(R, Tid, Tab, K, Obj);
-commit_clear([H|R], Tid, Tab, K, Obj)
- when element(1, H) == index ->
- mnesia_index:clear_index(H, Tab, K, Obj),
- commit_clear(R, Tid, Tab, K, Obj).
-
-do_snmp(_, []) -> ok;
-do_snmp(Tid, [Head | Tail]) ->
- case catch mnesia_snmp_hook:update(Head) of
- {'EXIT', Reason} ->
- %% This should only happen when we recently have
- %% deleted our local replica or recently deattached
- %% the snmp table
-
- verbose("do_snmp in ~w failed: ~p -> {'EXIT', ~p}~n",
- [Tid, Head, Reason]);
- ok ->
- ignore
- end,
- do_snmp(Tid, Tail).
-
-commit_nodes([C | Tail], AccD, AccR)
- when C#commit.disc_copies == [],
- C#commit.disc_only_copies == [],
- C#commit.schema_ops == [] ->
- commit_nodes(Tail, AccD, [C#commit.node | AccR]);
-commit_nodes([C | Tail], AccD, AccR) ->
- commit_nodes(Tail, [C#commit.node | AccD], AccR);
-commit_nodes([], AccD, AccR) ->
- {AccD, AccR}.
-
-commit_decision(D, [C | Tail], AccD, AccR) ->
- N = C#commit.node,
- {D2, Tail2} =
- case C#commit.schema_ops of
- [] when C#commit.disc_copies == [],
- C#commit.disc_only_copies == [] ->
- commit_decision(D, Tail, AccD, [N | AccR]);
- [] ->
- commit_decision(D, Tail, [N | AccD], AccR);
- Ops ->
- case ram_only_ops(N, Ops) of
- true ->
- commit_decision(D, Tail, AccD, [N | AccR]);
- false ->
- commit_decision(D, Tail, [N | AccD], AccR)
- end
- end,
- {D2, [C#commit{decision = D2} | Tail2]};
-commit_decision(D, [], AccD, AccR) ->
- {D#decision{disc_nodes = AccD, ram_nodes = AccR}, []}.
-
-ram_only_ops(N, [{op, change_table_copy_type, N, _FromS, _ToS, Cs} | _Ops ]) ->
- case lists:member({name, schema}, Cs) of
- true ->
- %% We always use disk if change type of the schema
- false;
- false ->
- not lists:member(N, val({schema, disc_copies}))
- end;
-
-ram_only_ops(N, _Ops) ->
- not lists:member(N, val({schema, disc_copies})).
-
-%% Returns {WaitFor, Res}
-sync_send_dirty(Tid, [Head | Tail], Tab, WaitFor) ->
- Node = Head#commit.node,
- if
- Node == node() ->
- {WF, _} = sync_send_dirty(Tid, Tail, Tab, WaitFor),
- Res = do_dirty(Tid, Head),
- {WF, Res};
- true ->
- {?MODULE, Node} ! {self(), {sync_dirty, Tid, Head, Tab}},
- sync_send_dirty(Tid, Tail, Tab, [Node | WaitFor])
- end;
-sync_send_dirty(_Tid, [], _Tab, WaitFor) ->
- {WaitFor, {'EXIT', {aborted, {node_not_running, WaitFor}}}}.
-
-%% Returns {WaitFor, Res}
-async_send_dirty(_Tid, _Nodes, Tab, nowhere) ->
- {[], {'EXIT', {aborted, {no_exists, Tab}}}};
-async_send_dirty(Tid, Nodes, Tab, ReadNode) ->
- async_send_dirty(Tid, Nodes, Tab, ReadNode, [], ok).
-
-async_send_dirty(Tid, [Head | Tail], Tab, ReadNode, WaitFor, Res) ->
- Node = Head#commit.node,
- if
- ReadNode == Node, Node == node() ->
- NewRes = do_dirty(Tid, Head),
- async_send_dirty(Tid, Tail, Tab, ReadNode, WaitFor, NewRes);
- ReadNode == Node ->
- {?MODULE, Node} ! {self(), {sync_dirty, Tid, Head, Tab}},
- NewRes = {'EXIT', {aborted, {node_not_running, Node}}},
- async_send_dirty(Tid, Tail, Tab, ReadNode, [Node | WaitFor], NewRes);
- true ->
- {?MODULE, Node} ! {self(), {async_dirty, Tid, Head, Tab}},
- async_send_dirty(Tid, Tail, Tab, ReadNode, WaitFor, Res)
- end;
-async_send_dirty(_Tid, [], _Tab, _ReadNode, WaitFor, Res) ->
- {WaitFor, Res}.
-
-rec_dirty([Node | Tail], Res) when Node /= node() ->
- NewRes = get_dirty_reply(Node, Res),
- rec_dirty(Tail, NewRes);
-rec_dirty([], Res) ->
- Res.
-
-get_dirty_reply(Node, Res) ->
- receive
- {?MODULE, Node, {'EXIT', Reason}} ->
- {'EXIT', {aborted, {badarg, Reason}}};
- {?MODULE, Node, {dirty_res, ok}} ->
- case Res of
- {'EXIT', {aborted, {node_not_running, _Node}}} ->
- ok;
- _ ->
- %% Prioritize bad results, but node_not_running
- Res
- end;
- {?MODULE, Node, {dirty_res, Reply}} ->
- Reply;
- {mnesia_down, Node} ->
- %% It's ok to ignore mnesia_down's
- %% since we will make the replicas
- %% consistent again when Node is started
- Res
- after 1000 ->
- case lists:member(Node, val({current, db_nodes})) of
- true ->
- get_dirty_reply(Node, Res);
- false ->
- Res
- end
- end.
-
-%% Assume that CommitRecord is no binary
-%% Return {Res, Pids}
-ask_commit(Protocol, Tid, CR, DiscNs, RamNs) ->
- ask_commit(Protocol, Tid, CR, DiscNs, RamNs, [], no_local).
-
-ask_commit(Protocol, Tid, [Head | Tail], DiscNs, RamNs, WaitFor, Local) ->
- Node = Head#commit.node,
- if
- Node == node() ->
- ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, WaitFor, Head);
- true ->
- Bin = opt_term_to_binary(Protocol, Head, DiscNs++RamNs),
- Msg = {ask_commit, Protocol, Tid, Bin, DiscNs, RamNs},
- {?MODULE, Node} ! {self(), Msg},
- ask_commit(Protocol, Tid, Tail, DiscNs, RamNs, [Node | WaitFor], Local)
- end;
-ask_commit(_Protocol, _Tid, [], _DiscNs, _RamNs, WaitFor, Local) ->
- {WaitFor, Local}.
-
-opt_term_to_binary(asym_trans, Head, Nodes) ->
- opt_term_to_binary(Nodes, Head);
-opt_term_to_binary(_Protocol, Head, _Nodes) ->
- Head.
-
-opt_term_to_binary([], Head) ->
- term_to_binary(Head);
-opt_term_to_binary([H|R], Head) ->
- case mnesia_monitor:needs_protocol_conversion(H) of
- true -> Head;
- false ->
- opt_term_to_binary(R, Head)
- end.
-
-rec_all([Node | Tail], Tid, Res, Pids) ->
- receive
- {?MODULE, Node, {vote_yes, Tid}} ->
- rec_all(Tail, Tid, Res, Pids);
- {?MODULE, Node, {vote_yes, Tid, Pid}} ->
- rec_all(Tail, Tid, Res, [Pid | Pids]);
- {?MODULE, Node, {vote_no, Tid, Reason}} ->
- rec_all(Tail, Tid, {do_abort, Reason}, Pids);
- {?MODULE, Node, {committed, Tid}} ->
- rec_all(Tail, Tid, Res, Pids);
- {?MODULE, Node, {aborted, Tid}} ->
- rec_all(Tail, Tid, Res, Pids);
-
- {mnesia_down, Node} ->
- rec_all(Tail, Tid, {do_abort, {bad_commit, Node}}, Pids)
- end;
-rec_all([], _Tid, Res, Pids) ->
- {Res, Pids}.
-
-get_transactions() ->
- {info, Participant, Coordinator} = req(info),
- lists:map(fun({Tid, _Tabs}) ->
- Status = tr_status(Tid,Participant),
- {Tid#tid.counter, Tid#tid.pid, Status}
- end,Coordinator).
-
-tr_status(Tid,Participant) ->
- case lists:keymember(Tid, 1, Participant) of
- true -> participant;
- false -> coordinator
- end.
-
-get_info(Timeout) ->
- case whereis(?MODULE) of
- undefined ->
- {timeout, Timeout};
- Pid ->
- Pid ! {self(), info},
- receive
- {?MODULE, _, {info, Part, Coord}} ->
- {info, Part, Coord}
- after Timeout ->
- {timeout, Timeout}
- end
- end.
-
-display_info(Stream, {timeout, T}) ->
- io:format(Stream, "---> No info about coordinator and participant transactions, "
- "timeout ~p <--- ~n", [T]);
-
-display_info(Stream, {info, Part, Coord}) ->
- io:format(Stream, "---> Participant transactions <--- ~n", []),
- lists:foreach(fun(P) -> pr_participant(Stream, P) end, Part),
- io:format(Stream, "---> Coordinator transactions <---~n", []),
- lists:foreach(fun({Tid, _Tabs}) -> pr_tid(Stream, Tid) end, Coord).
-
-pr_participant(Stream, P) ->
- Commit0 = P#participant.commit,
- Commit =
- if
- binary(Commit0) -> binary_to_term(Commit0);
- true -> Commit0
- end,
- pr_tid(Stream, P#participant.tid),
- io:format(Stream, "with participant objects ~p~n", [Commit]).
-
-
-pr_tid(Stream, Tid) ->
- io:format(Stream, "Tid: ~p (owned by ~p) ~n",
- [Tid#tid.counter, Tid#tid.pid]).
-
-info(Serial) ->
- io:format( "Info about transaction with serial == ~p~n", [Serial]),
- {info, Participant, Trs} = req(info),
- search_pr_participant(Serial, Participant),
- search_pr_coordinator(Serial, Trs).
-
-
-search_pr_coordinator(_S, []) -> no;
-search_pr_coordinator(S, [{Tid, _Ts}|Tail]) ->
- case Tid#tid.counter of
- S ->
- io:format( "Tid is coordinator, owner == \n", []),
- display_pid_info(Tid#tid.pid),
- search_pr_coordinator(S, Tail);
- _ ->
- search_pr_coordinator(S, Tail)
- end.
-
-search_pr_participant(_S, []) ->
- false;
-search_pr_participant(S, [ P | Tail]) ->
- Tid = P#participant.tid,
- Commit0 = P#participant.commit,
- if
- Tid#tid.counter == S ->
- io:format( "Tid is participant to commit, owner == \n", []),
- Pid = Tid#tid.pid,
- display_pid_info(Pid),
- io:format( "Tid wants to write objects \n",[]),
- Commit =
- if
- binary(Commit0) -> binary_to_term(Commit0);
- true -> Commit0
- end,
-
- io:format("~p~n", [Commit]),
- search_pr_participant(S,Tail); %% !!!!!
- true ->
- search_pr_participant(S, Tail)
- end.
-
-display_pid_info(Pid) ->
- case rpc:pinfo(Pid) of
- undefined ->
- io:format( "Dead process \n");
- Info ->
- Call = fetch(initial_call, Info),
- Curr = case fetch(current_function, Info) of
- {Mod,F,Args} when list(Args) ->
- {Mod,F,length(Args)};
- Other ->
- Other
- end,
- Reds = fetch(reductions, Info),
- LM = length(fetch(messages, Info)),
- pformat(io_lib:format("~p", [Pid]),
- io_lib:format("~p", [Call]),
- io_lib:format("~p", [Curr]), Reds, LM)
- end.
-
-pformat(A1, A2, A3, A4, A5) ->
- io:format( "~-12s ~-21s ~-21s ~9w ~4w~n", [A1,A2,A3,A4,A5]).
-
-fetch(Key, Info) ->
- case lists:keysearch(Key, 1, Info) of
- {value, {_, Val}} ->
- Val;
- _ ->
- 0
- end.
-
-
-%%%%%%%%%%%%%%%%%%%%
-%%%%%%%%%%%%%%%%%%%%% reconfigure stuff comes here ......
-%%%%%%%%%%%%%%%%%%%%%
-
-reconfigure_coordinators(N, [{Tid, [Store | _]} | Coordinators]) ->
- case mnesia_recover:outcome(Tid, unknown) of
- committed ->
- WaitingNodes = ?ets_lookup(Store, waiting_for_commit_ack),
- case lists:keymember(N, 2, WaitingNodes) of
- false ->
- ignore; % avoid spurious mnesia_down messages
- true ->
- send_mnesia_down(Tid, Store, N)
- end;
- aborted ->
- ignore; % avoid spurious mnesia_down messages
- _ ->
- %% Tell the coordinator about the mnesia_down
- send_mnesia_down(Tid, Store, N)
- end,
- reconfigure_coordinators(N, Coordinators);
-reconfigure_coordinators(_N, []) ->
- ok.
-
-send_mnesia_down(Tid, Store, Node) ->
- Msg = {mnesia_down, Node},
- send_to_pids([Tid#tid.pid | get_friends(Store)], Msg).
-
-send_to_pids([Pid | Pids], Msg) ->
- Pid ! Msg,
- send_to_pids(Pids, Msg);
-send_to_pids([], _Msg) ->
- ok.
-
-reconfigure_participants(N, [P | Tail]) ->
- case lists:member(N, P#participant.disc_nodes) or
- lists:member(N, P#participant.ram_nodes) of
- false ->
- %% Ignore, since we are not a participant
- %% in the transaction.
- reconfigure_participants(N, Tail);
-
- true ->
- %% We are on a participant node, lets
- %% check if the dead one was a
- %% participant or a coordinator.
- Tid = P#participant.tid,
- if
- node(Tid#tid.pid) /= N ->
- %% Another participant node died. Ignore.
- reconfigure_participants(N, Tail);
-
- true ->
- %% The coordinator node has died and
- %% we must determine the outcome of the
- %% transaction and tell mnesia_tm on all
- %% nodes (including the local node) about it
- verbose("Coordinator ~p in transaction ~p died~n",
- [Tid#tid.pid, Tid]),
-
- Nodes = P#participant.disc_nodes ++
- P#participant.ram_nodes,
- AliveNodes = Nodes -- [N],
- Protocol = P#participant.protocol,
- tell_outcome(Tid, Protocol, N, AliveNodes, AliveNodes),
- reconfigure_participants(N, Tail)
- end
- end;
-reconfigure_participants(_, []) ->
- [].
-
-%% We need to determine the outcome of the transaction and
-%% tell mnesia_tm on all involved nodes (including the local node)
-%% about the outcome.
-tell_outcome(Tid, Protocol, Node, CheckNodes, TellNodes) ->
- Outcome = mnesia_recover:what_happened(Tid, Protocol, CheckNodes),
- case Outcome of
- aborted ->
- rpc:abcast(TellNodes, ?MODULE, {Tid,{do_abort, {mnesia_down, Node}}});
- committed ->
- rpc:abcast(TellNodes, ?MODULE, {Tid, do_commit})
- end,
- Outcome.
-
-do_stop(#state{coordinators = Coordinators}) ->
- Msg = {mnesia_down, node()},
- lists:foreach(fun({Tid, _}) -> Tid#tid.pid ! Msg end, Coordinators),
- mnesia_checkpoint:stop(),
- mnesia_log:stop(),
- exit(shutdown).
-
-%%%%%%%%%%%%%%%%%%%%%%%%%%%
-%% System upgrade
-
-system_continue(_Parent, _Debug, State) ->
- doit_loop(State).
-
-system_terminate(_Reason, _Parent, _Debug, State) ->
- do_stop(State).
-
-system_code_change(State, _Module, _OldVsn, _Extra) ->
- {ok, State}.