aboutsummaryrefslogtreecommitdiffstats
path: root/lib/mnesia/examples/mnesia_tpcb.erl
diff options
context:
space:
mode:
authorErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
committerErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
commit84adefa331c4159d432d22840663c38f155cd4c1 (patch)
treebff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/mnesia/examples/mnesia_tpcb.erl
downloadotp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz
otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2
otp-84adefa331c4159d432d22840663c38f155cd4c1.zip
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/mnesia/examples/mnesia_tpcb.erl')
-rw-r--r--lib/mnesia/examples/mnesia_tpcb.erl1268
1 files changed, 1268 insertions, 0 deletions
diff --git a/lib/mnesia/examples/mnesia_tpcb.erl b/lib/mnesia/examples/mnesia_tpcb.erl
new file mode 100644
index 0000000000..903c53a21c
--- /dev/null
+++ b/lib/mnesia/examples/mnesia_tpcb.erl
@@ -0,0 +1,1268 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1996-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%%
+%% MODULE
+%%
+%% mnesia_tpcb - TPC-B benchmarking of Mnesia
+%%
+%% DESCRIPTION
+%%
+%% The metrics used in the TPC-B benchmark are throughput as measured
+%% in transactions per second (TPS). The benchmark uses a single,
+%% simple update-intensive transaction to load the database system.
+%% The single transaction type provides a simple, repeatable
+%% unit of work, and is designed to exercise the basic components of
+%% a database system.
+%%
+%% The definition of the TPC-B states lots of detailed rules and
+%% conditions that must be fullfilled, e.g. how the ACID (atomicity,
+%% consistency, isolation and durability) properties are verified,
+%% how the random numbers must be distributed, minimum sizes of
+%% the different types of records, minimum duration of the benchmark,
+%% formulas to calculate prices (dollars per tps), disclosure issues
+%% etc. Please, see http://www.tpc.org/ about the nitty gritty details.
+%%
+%% The TPC-B benchmark is stated in terms of a hypothetical bank. The
+%% bank has one or more branches. Each branch has multiple tellers. The
+%% bank has many customers, each with an account. The database represents
+%% the cash position of each entity (branch, teller and account) and a
+%% history of recent transactions run by the bank. The transaction
+%% represents the work done when a customer makes a deposit or a
+%% withdrawal against his account. The transaction is performed by a
+%% teller at some branch.
+%%
+%% Each process that performs TPC-B transactions is called a driver.
+%% Drivers generates teller_id, account_id and delta amount of
+%% money randomly. An account, a teller and a branch are read, their
+%% balances are adjusted and a history record is created. The driver
+%% measures the time for 3 reads, 3 writes and 1 create.
+%%
+%% GETTING STARTED
+%%
+%% Generate tables and run with default configuration:
+%%
+%% mnesia_tpcb:start().
+%%
+%% A little bit more advanced;
+%%
+%% spawn(mnesia_tpcb, start, [[[{n_drivers_per_node, 8}, {stop_after, infinity}]]),
+%% mnesia_tpcb:stop().
+%%
+%% Really advanced;
+%%
+%% mnesia_tpcb:init(([{n_branches, 8}, {replica_type, disc_only_copies}]),
+%% mnesia_tpcb:run(([{n_drivers_per_node, 8}]),
+%% mnesia_tpcb:run(([{n_drivers_per_node, 64}]).
+%%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+-module(mnesia_tpcb).
+-author('[email protected]').
+
+-export([
+ config/2,
+ count_balance/0,
+ driver_init/2,
+ init/1,
+ reporter_init/2,
+ run/1,
+ start/0,
+ start/1,
+ start/2,
+ stop/0,
+ real_trans/5,
+ verify_tabs/0,
+ reply_gen_branch/3,
+ frag_add_delta/7,
+
+ conflict_test/1,
+ dist_test/1,
+ replica_test/1,
+ sticky_replica_test/1,
+ remote_test/1,
+ remote_frag2_test/1
+ ]).
+
+-define(SECOND, 1000000).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%%% Account record, total size must be at least 100 bytes
+
+-define(ACCOUNT_FILLER,
+ {123456789012345678901234567890123456789012345678901234567890,
+ 123456789012345678901234567890123456789012345678901234567890,
+ 123456789012345678901234567890123456789012345678901234}).
+
+-record(account,
+ {
+ id = 0, % Unique account id
+ branch_id = 0, % Branch where the account is held
+ balance = 0, % Account balance
+ filler = ?ACCOUNT_FILLER % Gap filler to ensure size >= 100 bytes
+ }).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%%% Branch record, total size must be at least 100 bytes
+
+-define(BRANCH_FILLER,
+ {123456789012345678901234567890123456789012345678901234567890,
+ 123456789012345678901234567890123456789012345678901234567890,
+ 123456789012345678901234567890123456789012345678901234567890}).
+
+-record(branch,
+ {
+ id = 0, % Unique branch id
+ balance = 0, % Total balance of whole branch
+ filler = ?BRANCH_FILLER % Gap filler to ensure size >= 100 bytes
+ }).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%%% Teller record, total size must be at least 100 bytes
+
+-define(TELLER_FILLER,
+ {123456789012345678901234567890123456789012345678901234567890,
+ 123456789012345678901234567890123456789012345678901234567890,
+ 1234567890123456789012345678901234567890123456789012345678}).
+
+-record(teller,
+ {
+ id = 0, % Unique teller id
+ branch_id = 0, % Branch where the teller is located
+ balance = 0, % Teller balance
+ filler = ?TELLER_FILLER % Gap filler to ensure size >= 100 bytes
+ }).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%%% History record, total size must be at least 50 bytes
+
+-define(HISTORY_FILLER, 1234567890).
+
+-record(history,
+ {
+ history_id = {0, 0}, % {DriverId, DriverLocalHistoryid}
+ time_stamp = now(), % Time point during active transaction
+ branch_id = 0, % Branch associated with teller
+ teller_id = 0, % Teller invlolved in transaction
+ account_id = 0, % Account updated by transaction
+ amount = 0, % Amount (delta) specified by transaction
+ filler = ?HISTORY_FILLER % Gap filler to ensure size >= 50 bytes
+ }).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+-record(tab_config,
+ {
+ db_nodes = [node()],
+ n_replicas = 1, % Ignored for non-fragmented tables
+ replica_nodes = [node()],
+ replica_type = ram_copies,
+ use_running_mnesia = false,
+ n_fragments = 0,
+ n_branches = 1,
+ n_tellers_per_branch = 10, % Must be 10
+ n_accounts_per_branch = 100000, % Must be 100000
+ branch_filler = ?BRANCH_FILLER,
+ account_filler = ?ACCOUNT_FILLER,
+ teller_filler = ?TELLER_FILLER
+ }).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+-record(run_config,
+ {
+ driver_nodes = [node()],
+ n_drivers_per_node = 1,
+ use_running_mnesia = false,
+ stop_after = timer:minutes(15), % Minimum 15 min
+ report_interval = timer:minutes(1),
+ use_sticky_locks = false,
+ spawn_near_branch = false,
+ activity_type = transaction,
+ reuse_history_id = false
+ }).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+-record(time,
+ {
+ n_trans = 0,
+ min_n = 0,
+ max_n = 0,
+ acc_time = 0,
+ max_time = 0
+ }).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+-record(driver_state,
+ {
+ driver_id,
+ driver_node,
+ seed,
+ n_local_branches,
+ local_branches,
+ tab_config,
+ run_config,
+ history_id,
+ time = #time{},
+ acc_time = #time{},
+ reuse_history_id
+ }).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+-record(reporter_state,
+ {
+ driver_pids,
+ starter_pid,
+ n_iters = 0,
+ prev_tps = 0,
+ curr = #time{},
+ acc = #time{},
+ init_micros,
+ prev_micros,
+ run_config
+ }).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% One driver on each node, table not replicated
+
+config(frag_test, ReplicaType) ->
+ Remote = nodes(),
+ Local = node(),
+ Nodes = [Local | Remote],
+ [
+ {n_branches, length(Nodes)},
+ {n_fragments, length(Nodes)},
+ {replica_nodes, Nodes},
+ {db_nodes, Nodes},
+ {driver_nodes, Nodes},
+ {n_accounts_per_branch, 100},
+ {replica_type, ReplicaType},
+ {stop_after, timer:minutes(1)},
+ {report_interval, timer:seconds(10)},
+ {reuse_history_id, true}
+ ];
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% One driver on each node, table replicated to two nodes.
+
+config(frag2_test, ReplicaType) ->
+ Remote = nodes(),
+ Local = node(),
+ Nodes = [Local | Remote],
+ [
+ {n_branches, length(Nodes)},
+ {n_fragments, length(Nodes)},
+ {n_replicas, 2},
+ {replica_nodes, Nodes},
+ {db_nodes, Nodes},
+ {driver_nodes, Nodes},
+ {n_accounts_per_branch, 100},
+ {replica_type, ReplicaType},
+ {stop_after, timer:minutes(1)},
+ {report_interval, timer:seconds(10)},
+ {reuse_history_id, true}
+ ];
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% One driver on this node, table replicated to all nodes.
+
+config(replica_test, ReplicaType) ->
+ Remote = nodes(),
+ Local = node(),
+ Nodes = [Local | Remote],
+ [
+ {db_nodes, Nodes},
+ {driver_nodes, [Local]},
+ {replica_nodes, Nodes},
+ {n_accounts_per_branch, 100},
+ {replica_type, ReplicaType},
+ {stop_after, timer:minutes(1)},
+ {report_interval, timer:seconds(10)},
+ {reuse_history_id, true}
+ ];
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% One driver on this node, table replicated to all nodes.
+
+config(sticky_replica_test, ReplicaType) ->
+ Remote = nodes(),
+ Local = node(),
+ Nodes = [Local | Remote],
+ [
+ {db_nodes, Nodes},
+ {driver_nodes, [node()]},
+ {replica_nodes, Nodes},
+ {n_accounts_per_branch, 100},
+ {replica_type, ReplicaType},
+ {use_sticky_locks, true},
+ {stop_after, timer:minutes(1)},
+ {report_interval, timer:seconds(10)},
+ {reuse_history_id, true}
+ ];
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Ten drivers per node, tables replicated to all nodes, lots of branches
+
+config(dist_test, ReplicaType) ->
+ Remote = nodes(),
+ Local = node(),
+ Nodes = [Local | Remote],
+ [
+ {db_nodes, Nodes},
+ {driver_nodes, Nodes},
+ {replica_nodes, Nodes},
+ {n_drivers_per_node, 10},
+ {n_branches, 10 * length(Nodes) * 100},
+ {n_accounts_per_branch, 10},
+ {replica_type, ReplicaType},
+ {stop_after, timer:minutes(1)},
+ {report_interval, timer:seconds(10)},
+ {reuse_history_id, true}
+ ];
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Ten drivers per node, tables replicated to all nodes, single branch
+
+config(conflict_test, ReplicaType) ->
+ Remote = nodes(),
+ Local = node(),
+ Nodes = [Local | Remote],
+ [
+ {db_nodes, Nodes},
+ {driver_nodes, Nodes},
+ {replica_nodes, Nodes},
+ {n_drivers_per_node, 10},
+ {n_branches, 1},
+ {n_accounts_per_branch, 10},
+ {replica_type, ReplicaType},
+ {stop_after, timer:minutes(1)},
+ {report_interval, timer:seconds(10)},
+ {reuse_history_id, true}
+ ];
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% One driver on this node, table replicated to all other nodes.
+
+config(remote_test, ReplicaType) ->
+ Remote = nodes(),
+ Local = node(),
+ Nodes = [Local | Remote],
+ [
+ {db_nodes, Nodes},
+ {driver_nodes, [Local]},
+ {replica_nodes, Remote},
+ {n_accounts_per_branch, 100},
+ {replica_type, ReplicaType},
+ {stop_after, timer:minutes(1)},
+ {report_interval, timer:seconds(10)},
+ {reuse_history_id, true}
+ ];
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% One driver on this node, table replicated to two other nodes.
+
+config(remote_frag2_test, ReplicaType) ->
+ Remote = nodes(),
+ Local = node(),
+ Nodes = [Local | Remote],
+ [
+ {n_branches, length(Remote)},
+ {n_fragments, length(Remote)},
+ {n_replicas, 2},
+ {replica_nodes, Remote},
+ {db_nodes, Nodes},
+ {driver_nodes, [Local]},
+ {n_accounts_per_branch, 100},
+ {replica_type, ReplicaType},
+ {stop_after, timer:minutes(1)},
+ {report_interval, timer:seconds(10)},
+ {reuse_history_id, true}
+ ].
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+start(What, ReplicaType) ->
+ spawn_link(?MODULE, start, [config(What, ReplicaType)]).
+
+replica_test(ReplicaType) ->
+ start(replica_test, ReplicaType).
+
+sticky_replica_test(ReplicaType) ->
+ start(sticky_replica_test, ReplicaType).
+
+dist_test(ReplicaType) ->
+ start(dist_test, ReplicaType).
+
+conflict_test(ReplicaType) ->
+ start(conflict_test, ReplicaType).
+
+remote_test(ReplicaType) ->
+ start(remote_test, ReplicaType).
+
+remote_frag2_test(ReplicaType) ->
+ start(remote_frag2_test, ReplicaType).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Args is a list of {Key, Val} tuples where Key is a field name
+%% in either the record tab_config or run_config. Unknown keys are ignored.
+
+start() ->
+ start([]).
+start(Args) ->
+ init(Args),
+ run(Args).
+
+list2rec(List, Fields, DefaultTuple) ->
+ [Name|Defaults] = tuple_to_list(DefaultTuple),
+ List2 = list2rec(List, Fields, Defaults, []),
+ list_to_tuple([Name] ++ List2).
+
+list2rec(_List, [], [], Acc) ->
+ Acc;
+list2rec(List, [F|Fields], [D|Defaults], Acc) ->
+ {Val, List2} =
+ case lists:keysearch(F, 1, List) of
+ false ->
+ {D, List};
+ {value, {F, NewVal}} ->
+ {NewVal, lists:keydelete(F, 1, List)}
+ end,
+ list2rec(List2, Fields, Defaults, Acc ++ [Val]).
+
+stop() ->
+ case whereis(mnesia_tpcb) of
+ undefined ->
+ {error, not_running};
+ Pid ->
+ sync_stop(Pid)
+ end.
+
+sync_stop(Pid) ->
+ Pid ! {self(), stop},
+ receive
+ {Pid, {stopped, Res}} -> Res
+ after timer:minutes(1) ->
+ exit(Pid, kill),
+ {error, brutal_kill}
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%%% Initialization
+
+%% Args is a list of {Key, Val} tuples where Key is a field name
+%% in the record tab_config, unknown keys are ignored.
+
+init(Args) ->
+ TabConfig0 = list2rec(Args, record_info(fields, tab_config), #tab_config{}),
+ TabConfig =
+ if
+ TabConfig0#tab_config.n_fragments =:= 0 ->
+ TabConfig0#tab_config{n_replicas = length(TabConfig0#tab_config.replica_nodes)};
+ true ->
+ TabConfig0
+ end,
+ Tags = record_info(fields, tab_config),
+ Fun = fun(F, Pos) -> {{F, element(Pos, TabConfig)}, Pos + 1} end,
+ {List, _} = lists:mapfoldl(Fun, 2, Tags),
+ io:format("TPC-B: Table config: ~p ~n", [List]),
+
+ DbNodes = TabConfig#tab_config.db_nodes,
+ stop(),
+ if
+ TabConfig#tab_config.use_running_mnesia =:= true ->
+ ignore;
+ true ->
+ rpc:multicall(DbNodes, mnesia, lkill, []),
+ case mnesia:delete_schema(DbNodes) of
+ ok ->
+ case mnesia:create_schema(DbNodes) of
+ ok ->
+ {Replies, BadNodes} =
+ rpc:multicall(DbNodes, mnesia, start, []),
+ case [Res || Res <- Replies, Res =/= ok] of
+ [] when BadNodes =:= [] ->
+ ok;
+ BadRes ->
+ io:format("TPC-B: <ERROR> "
+ "Failed to start ~p: ~p~n",
+ [BadNodes, BadRes]),
+ exit({start_failed, BadRes, BadNodes})
+ end;
+ {error, Reason} ->
+ io:format("TPC-B: <ERROR> "
+ "Failed to create schema on disc: ~p~n",
+ [Reason]),
+ exit({create_schema_failed, Reason})
+ end;
+ {error, Reason} ->
+ io:format("TPC-B: <ERROR> "
+ "Failed to delete schema on disc: ~p~n",
+ [Reason]),
+ exit({delete_schema_failed, Reason})
+ end
+ end,
+ gen_tabs(TabConfig).
+
+gen_tabs(TC) ->
+ create_tab(TC, branch, record_info(fields, branch),
+ undefined),
+ create_tab(TC, account, record_info(fields, account),
+ {branch, #account.branch_id}),
+ create_tab(TC, teller, record_info(fields, teller),
+ {branch, #teller.branch_id}),
+ create_tab(TC, history, record_info(fields, history),
+ {branch, #history.branch_id}),
+
+ NB = TC#tab_config.n_branches,
+ NT = TC#tab_config.n_tellers_per_branch,
+ NA = TC#tab_config.n_accounts_per_branch,
+ io:format("TPC-B: Generating ~p branches a ~p bytes~n",
+ [NB, size(term_to_binary(default_branch(TC)))]),
+ io:format("TPC-B: Generating ~p * ~p tellers a ~p bytes~n",
+ [NB, NT, size(term_to_binary(default_teller(TC)))]),
+ io:format("TPC-B: Generating ~p * ~p accounts a ~p bytes~n",
+ [NB, NA, size(term_to_binary(default_account(TC)))]),
+ io:format("TPC-B: Generating 0 history records a ~p bytes~n",
+ [size(term_to_binary(default_history(TC)))]),
+ gen_branches(TC),
+
+ case verify_tabs() of
+ ok ->
+ ignore;
+ {error, Reason} ->
+ io:format("TPC-B: <ERROR> Inconsistent tables: ~w~n",
+ [Reason]),
+ exit({inconsistent_tables, Reason})
+ end.
+
+create_tab(TC, Name, Attrs, _ForeignKey) when TC#tab_config.n_fragments =:= 0 ->
+ Nodes = TC#tab_config.replica_nodes,
+ Type = TC#tab_config.replica_type,
+ Def = [{Type, Nodes}, {attributes, Attrs}],
+ create_tab(Name, Def);
+create_tab(TC, Name, Attrs, ForeignKey) ->
+ NReplicas = TC#tab_config.n_replicas,
+ NodePool = TC#tab_config.replica_nodes,
+ Type = TC#tab_config.replica_type,
+ NF = TC#tab_config.n_fragments,
+ Props = [{n_fragments, NF},
+ {node_pool, NodePool},
+ {n_copies(Type), NReplicas},
+ {foreign_key, ForeignKey}],
+ Def = [{frag_properties, Props},
+ {attributes, Attrs}],
+ create_tab(Name, Def).
+
+create_tab(Name, Def) ->
+ mnesia:delete_table(Name),
+ case mnesia:create_table(Name, Def) of
+ {atomic, ok} ->
+ ok;
+ {aborted, Reason} ->
+ io:format("TPC-B: <ERROR> failed to create table ~w ~w: ~p~n",
+ [Name, Def, Reason]),
+ exit({create_table_failed, Reason})
+ end.
+
+n_copies(Type) ->
+ case Type of
+ ram_copies -> n_ram_copies;
+ disc_copies -> n_disc_copies;
+ disc_only_copies -> n_disc_only_copies
+ end.
+
+gen_branches(TC) ->
+ First = 0,
+ Last = First + TC#tab_config.n_branches - 1,
+ GenPids = gen_branches(TC, First, Last, []),
+ wait_for_gen(GenPids).
+
+wait_for_gen([]) ->
+ ok;
+wait_for_gen(Pids) ->
+ receive
+ {branch_generated, Pid} -> wait_for_gen(lists:delete(Pid, Pids));
+ Exit ->
+ exit({tpcb_failed, Exit})
+ end.
+
+gen_branches(TC, BranchId, Last, UsedNs) when BranchId =< Last ->
+ UsedNs2 = get_branch_nodes(BranchId, UsedNs),
+ Node = hd(UsedNs2),
+ Pid = spawn_link(Node, ?MODULE, reply_gen_branch,
+ [self(), TC, BranchId]),
+ [Pid | gen_branches(TC, BranchId + 1, Last, UsedNs2)];
+gen_branches(_, _, _, _) ->
+ [].
+
+reply_gen_branch(ReplyTo, TC, BranchId) ->
+ gen_branch(TC, BranchId),
+ ReplyTo ! {branch_generated, self()},
+ unlink(ReplyTo).
+
+%% Returns a new list of nodes with the best node as head
+get_branch_nodes(BranchId, UsedNs) ->
+ WriteNs = table_info({branch, BranchId}, where_to_write),
+ WeightedNs = [{n_duplicates(N, UsedNs, 0), N} || N <- WriteNs],
+ [{_, LeastUsed} | _ ] = lists:sort(WeightedNs),
+ [LeastUsed | UsedNs].
+
+n_duplicates(_N, [], Count) ->
+ Count;
+n_duplicates(N, [N | Tail], Count) ->
+ n_duplicates(N, Tail, Count + 1);
+n_duplicates(N, [_ | Tail], Count) ->
+ n_duplicates(N, Tail, Count).
+
+gen_branch(TC, BranchId) ->
+ A = default_account(TC),
+ NA = TC#tab_config.n_accounts_per_branch,
+ FirstA = BranchId * NA,
+ ArgsA = [FirstA, FirstA + NA - 1, BranchId, A],
+ ok = mnesia:activity(async_dirty, fun gen_accounts/4, ArgsA, mnesia_frag),
+
+ T = default_teller(TC),
+ NT = TC#tab_config.n_tellers_per_branch,
+ FirstT = BranchId * NT,
+ ArgsT = [FirstT, FirstT + NT - 1, BranchId, T],
+ ok = mnesia:activity(async_dirty, fun gen_tellers/4, ArgsT, mnesia_frag),
+
+ B = default_branch(TC),
+ FunB = fun() -> mnesia:write(branch, B#branch{id = BranchId}, write) end,
+ ok = mnesia:activity(sync_dirty, FunB, [], mnesia_frag).
+
+gen_tellers(Id, Last, BranchId, T) when Id =< Last ->
+ mnesia:write(teller, T#teller{id = Id, branch_id=BranchId}, write),
+ gen_tellers(Id + 1, Last, BranchId, T);
+gen_tellers(_, _, _, _) ->
+ ok.
+
+gen_accounts(Id, Last, BranchId, A) when Id =< Last ->
+ mnesia:write(account, A#account{id = Id, branch_id=BranchId}, write),
+ gen_accounts(Id + 1, Last, BranchId, A);
+gen_accounts(_, _, _, _) ->
+ ok.
+
+default_branch(TC) -> #branch{filler = TC#tab_config.branch_filler}.
+default_teller(TC) -> #teller{filler = TC#tab_config.teller_filler}.
+default_account(TC) -> #account{filler = TC#tab_config.account_filler}.
+default_history(_TC) -> #history{}.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%%% Run the benchmark
+
+%% Args is a list of {Key, Val} tuples where Key is a field name
+%% in the record run_config, unknown keys are ignored.
+run(Args) ->
+ RunConfig = list2rec(Args, record_info(fields, run_config), #run_config{}),
+ Tags = record_info(fields, run_config),
+ Fun = fun(F, Pos) -> {{F, element(Pos, RunConfig)}, Pos + 1} end,
+ {List, _} = lists:mapfoldl(Fun, 2, Tags),
+ io:format("TPC-B: Run config: ~p ~n", [List]),
+
+ Pid = spawn_link(?MODULE, reporter_init, [self(), RunConfig]),
+ receive
+ {Pid, {stopped, Res}} ->
+ Res; % Stopped by other process
+ Else ->
+ {tpcb_got, Else}
+ after RunConfig#run_config.stop_after ->
+ sync_stop(Pid)
+ end.
+
+reporter_init(Starter, RC) ->
+ register(mnesia_tpcb, self()),
+ process_flag(trap_exit, true),
+ DbNodes = mnesia:system_info(db_nodes),
+ if
+ RC#run_config.use_running_mnesia =:= true ->
+ ignore;
+ true ->
+ {Replies, BadNodes} =
+ rpc:multicall(DbNodes, mnesia, start, []),
+ case [Res || Res <- Replies, Res =/= ok] of
+ [] when BadNodes =:= [] ->
+ ok;
+ BadRes ->
+ io:format("TPC-B: <ERROR> "
+ "Failed to start ~w: ~p~n",
+ [BadNodes, BadRes]),
+ exit({start_failed, BadRes, BadNodes})
+ end,
+ verify_tabs()
+ end,
+
+ N = table_info(branch, size),
+ NT = table_info(teller, size) div N,
+ NA = table_info(account, size) div N,
+
+ {Type, NF, RepNodes} = table_storage(branch),
+ TC = #tab_config{n_fragments = NF,
+ n_branches = N,
+ n_tellers_per_branch = NT,
+ n_accounts_per_branch = NA,
+ db_nodes = DbNodes,
+ replica_nodes = RepNodes,
+ replica_type = Type
+ },
+ Drivers = start_drivers(RC, TC),
+ Now = now_to_micros(erlang:now()),
+ State = #reporter_state{driver_pids = Drivers,
+ run_config = RC,
+ starter_pid = Starter,
+ init_micros = Now,
+ prev_micros = Now
+ },
+ case catch reporter_loop(State) of
+ {'EXIT', Reason} ->
+ io:format("TPC-B: Abnormal termination: ~p~n", [Reason]),
+ if
+ RC#run_config.use_running_mnesia =:= true ->
+ ignore;
+ true ->
+ rpc:multicall(DbNodes, mnesia, lkill, [])
+ end,
+ unlink(Starter),
+ Starter ! {self(), {stopped, {error, Reason}}}, % To be sure
+ exit(shutdown);
+ {ok, Stopper, State2} ->
+ Time = State2#reporter_state.acc,
+ Res =
+ case verify_tabs() of
+ ok ->
+ {ok, Time};
+ {error, Reason} ->
+ io:format("TPC-B: <ERROR> Inconsistent tables, ~p~n",
+ [{error, Reason}]),
+ {error, Reason}
+ end,
+ if
+ RC#run_config.use_running_mnesia =:= true ->
+ ignore;
+ true ->
+ rpc:multicall(DbNodes, mnesia, stop, [])
+ end,
+ unlink(Starter),
+ Starter ! {self(), {stopped, Res}},
+ if
+ Stopper =/= Starter ->
+ Stopper ! {self(), {stopped, Res}};
+ true ->
+ ignore
+ end,
+ exit(shutdown)
+ end.
+
+table_info(Tab, Item) ->
+ Fun = fun() -> mnesia:table_info(Tab, Item) end,
+ mnesia:activity(sync_dirty, Fun, mnesia_frag).
+
+%% Returns {Storage, NFragments, ReplicaNodes}
+table_storage(Tab) ->
+ case mnesia:table_info(branch, frag_properties) of
+ [] ->
+ NFO = 0,
+ NR = length(mnesia:table_info(Tab, ram_copies)),
+ ND = length(mnesia:table_info(Tab, disc_copies)),
+ NDO = length(mnesia:table_info(Tab, disc_only_copies)),
+ if
+ NR =/= 0 -> {ram_copies, NFO, NR};
+ ND =/= 0 -> {disc_copies, NFO, ND};
+ NDO =/= 0 -> {disc_copies, NFO, NDO}
+ end;
+ Props ->
+ {value, NFO} = lists:keysearch(n_fragments, 1, Props),
+ NR = table_info(Tab, n_ram_copies),
+ ND = table_info(Tab, n_disc_copies),
+ NDO = table_info(Tab, n_disc_only_copies),
+ if
+ NR =/= 0 -> {ram_copies, NFO, NR};
+ ND =/= 0 -> {disc_copies, NFO, ND};
+ NDO =/= 0 -> {disc_copies, NFO, NDO}
+ end
+ end.
+
+reporter_loop(State) ->
+ RC = State#reporter_state.run_config,
+ receive
+ {From, stop} ->
+ {ok, From, call_drivers(State, stop)};
+ {'EXIT', Pid, Reason} when Pid =:= State#reporter_state.starter_pid ->
+ %% call_drivers(State, stop),
+ exit({starter_died, Pid, Reason})
+ after RC#run_config.report_interval ->
+ Iters = State#reporter_state.n_iters,
+ State2 = State#reporter_state{n_iters = Iters + 1},
+ case call_drivers(State2, report) of
+ State3 when State3#reporter_state.driver_pids =/= [] ->
+ State4 = State3#reporter_state{curr = #time{}},
+ reporter_loop(State4);
+ _ ->
+ exit(drivers_died)
+ end
+ end.
+
+call_drivers(State, Msg) ->
+ Drivers = State#reporter_state.driver_pids,
+ lists:foreach(fun(Pid) -> Pid ! {self(), Msg} end, Drivers),
+ State2 = show_report(calc_reports(Drivers, State)),
+ case Msg =:= stop of
+ true ->
+ Acc = State2#reporter_state.acc,
+ Init = State2#reporter_state.init_micros,
+ show_report(State2#reporter_state{n_iters = 0,
+ curr = Acc,
+ prev_micros = Init});
+ false ->
+ ignore
+ end,
+ State2.
+
+calc_reports([], State) ->
+ State;
+calc_reports([Pid|Drivers], State) ->
+ receive
+ {'EXIT', P, Reason} when P =:= State#reporter_state.starter_pid ->
+ exit({starter_died, P, Reason});
+ {'EXIT', Pid, Reason} ->
+ exit({driver_died, Pid, Reason});
+ {Pid, Time} when is_record(Time, time) ->
+ %% io:format("~w: ~w~n", [Pid, Time]),
+ A = add_time(State#reporter_state.acc, Time),
+ C = add_time(State#reporter_state.curr, Time),
+ State2 = State#reporter_state{acc = A, curr = C},
+ calc_reports(Drivers, State2)
+ end.
+
+add_time(Acc, New) ->
+ Acc#time{n_trans = New#time.n_trans + Acc#time.n_trans,
+ min_n = lists:min([New#time.n_trans, Acc#time.min_n] -- [0]),
+ max_n = lists:max([New#time.n_trans, Acc#time.max_n]),
+ acc_time = New#time.acc_time + Acc#time.acc_time,
+ max_time = lists:max([New#time.max_time, Acc#time.max_time])}.
+
+-define(AVOID_DIV_ZERO(_What_), try (_What_) catch _:_ -> 0 end).
+
+show_report(State) ->
+ Now = now_to_micros(erlang:now()),
+ Iters = State#reporter_state.n_iters,
+ Time = State#reporter_state.curr,
+ Max = Time#time.max_time,
+ N = Time#time.n_trans,
+ Avg = ?AVOID_DIV_ZERO(Time#time.acc_time div N),
+ AliveN = length(State#reporter_state.driver_pids),
+ Tps = ?AVOID_DIV_ZERO((?SECOND * AliveN) div Avg),
+ PrevTps= State#reporter_state.prev_tps,
+ {DiffSign, DiffTps} = signed_diff(Iters, Tps, PrevTps),
+ Unfairness = ?AVOID_DIV_ZERO(Time#time.max_n / Time#time.min_n),
+ BruttoAvg = ?AVOID_DIV_ZERO((Now - State#reporter_state.prev_micros) div N),
+%% io:format("n_iters=~p, n_trans=~p, n_drivers=~p, avg=~p, now=~p, prev=~p~n",
+%% [Iters, N, AliveN, BruttoAvg, Now, State#reporter_state.prev_micros]),
+ BruttoTps = ?AVOID_DIV_ZERO(?SECOND div BruttoAvg),
+ case Iters > 0 of
+ true ->
+ io:format("TPC-B: ~p iter ~s~p diff ~p (~p) tps ~p avg micros ~p max micros ~p unfairness~n",
+ [Iters, DiffSign, DiffTps, Tps, BruttoTps, Avg, Max, Unfairness]);
+ false ->
+ io:format("TPC-B: ~p (~p) transactions per second, "
+ "duration of longest transaction was ~p milliseconds~n",
+ [Tps, BruttoTps, Max div 1000])
+ end,
+ State#reporter_state{prev_tps = Tps, prev_micros = Now}.
+
+signed_diff(Iters, Curr, Prev) ->
+ case Iters > 1 of
+ true -> sign(Curr - Prev);
+ false -> sign(0)
+ end.
+
+sign(N) when N > 0 -> {"+", N};
+sign(N) -> {"", N}.
+
+now_to_micros({Mega, Secs, Micros}) ->
+ DT = calendar:now_to_datetime({Mega, Secs, 0}),
+ S = calendar:datetime_to_gregorian_seconds(DT),
+ (S * ?SECOND) + Micros.
+
+start_drivers(RC, TC) ->
+ LastHistoryId = table_info(history, size),
+ Reuse = RC#run_config.reuse_history_id,
+ DS = #driver_state{tab_config = TC,
+ run_config = RC,
+ n_local_branches = 0,
+ local_branches = [],
+ history_id = LastHistoryId,
+ reuse_history_id = Reuse},
+ Nodes = RC#run_config.driver_nodes,
+ NB = TC#tab_config.n_branches,
+ First = 0,
+ AllBranches = lists:seq(First, First + NB - 1),
+ ND = RC#run_config.n_drivers_per_node,
+ Spawn = fun(Spec) ->
+ Node = Spec#driver_state.driver_node,
+ spawn_link(Node, ?MODULE, driver_init, [Spec, AllBranches])
+ end,
+ Specs = [DS#driver_state{driver_id = Id, driver_node = N}
+ || N <- Nodes,
+ Id <- lists:seq(1, ND)],
+ Specs2 = lists:sort(lists:flatten(Specs)),
+ {Specs3, OrphanBranches} = alloc_local_branches(AllBranches, Specs2, []),
+ case length(OrphanBranches) of
+ N when N =< 10 ->
+ io:format("TPC-B: Orphan branches: ~p~n", [OrphanBranches]);
+ N ->
+ io:format("TPC-B: Orphan branches: ~p~n", [N])
+ end,
+ [Spawn(Spec) || Spec <- Specs3].
+
+alloc_local_branches([BranchId | Tail], Specs, OrphanBranches) ->
+ Nodes = table_info({branch, BranchId}, where_to_write),
+ LocalSpecs = [DS || DS <- Specs,
+ lists:member(DS#driver_state.driver_node, Nodes)],
+ case lists:keysort(#driver_state.n_local_branches, LocalSpecs) of
+ [] ->
+ alloc_local_branches(Tail, Specs, [BranchId | OrphanBranches]);
+ [DS | _] ->
+ LocalNB = DS#driver_state.n_local_branches + 1,
+ LocalBranches = [BranchId | DS#driver_state.local_branches],
+ DS2 = DS#driver_state{n_local_branches = LocalNB,
+ local_branches = LocalBranches},
+ Specs2 = Specs -- [DS],
+ Specs3 = [DS2 | Specs2],
+ alloc_local_branches(Tail, Specs3, OrphanBranches)
+ end;
+alloc_local_branches([], Specs, OrphanBranches) ->
+ {Specs, OrphanBranches}.
+
+driver_init(DS, AllBranches) ->
+ Seed = erlang:now(),
+ DS2 =
+ if
+ DS#driver_state.n_local_branches =:= 0 ->
+ DS#driver_state{seed = Seed,
+ n_local_branches = length(AllBranches),
+ local_branches = AllBranches};
+ true ->
+ DS#driver_state{seed = Seed}
+ end,
+ io:format("TPC-B: Driver ~p started as ~p on node ~p with ~p local branches~n",
+ [DS2#driver_state.driver_id, self(), node(), DS2#driver_state.n_local_branches]),
+ driver_loop(DS2).
+
+driver_loop(DS) ->
+ receive
+ {From, report} ->
+ From ! {self(), DS#driver_state.time},
+ Acc = add_time(DS#driver_state.time, DS#driver_state.acc_time),
+ DS2 = DS#driver_state{time=#time{}, acc_time = Acc}, % Reset timer
+ DS3 = calc_trans(DS2),
+ driver_loop(DS3);
+ {From, stop} ->
+ Acc = add_time(DS#driver_state.time, DS#driver_state.acc_time),
+ io:format("TPC-B: Driver ~p (~p) on node ~p stopped: ~w~n",
+ [DS#driver_state.driver_id, self(), node(self()), Acc]),
+ From ! {self(), DS#driver_state.time},
+ unlink(From),
+ exit(stopped)
+ after 0 ->
+ DS2 = calc_trans(DS),
+ driver_loop(DS2)
+ end.
+
+calc_trans(DS) ->
+ {Micros, DS2} = time_trans(DS),
+ Time = DS2#driver_state.time,
+ Time2 = Time#time{n_trans = Time#time.n_trans + 1,
+ acc_time = Time#time.acc_time + Micros,
+ max_time = lists:max([Micros, Time#time.max_time])
+ },
+ case DS#driver_state.reuse_history_id of
+ false ->
+ HistoryId = DS#driver_state.history_id + 1,
+ DS2#driver_state{time=Time2, history_id = HistoryId};
+ true ->
+ DS2#driver_state{time=Time2}
+ end.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+%% Generate teller_id, account_id and delta
+%% Time the TPC-B transaction
+time_trans(DS) ->
+ OldSeed = get(random_seed), % Avoid interference with Mnesia
+ put(random_seed, DS#driver_state.seed),
+ Random = random:uniform(),
+ NewSeed = get(random_seed),
+ case OldSeed of
+ undefined -> erase(random_seed);
+ _ -> put(random_seed, OldSeed)
+ end,
+
+ TC = DS#driver_state.tab_config,
+ RC = DS#driver_state.run_config,
+ {Branchid, Args} = random_to_args(Random, DS),
+ {Fun, Mod} = trans_type(TC, RC),
+ {Time, Res} = timer:tc(?MODULE, real_trans, [RC, Branchid, Fun, Args, Mod]),
+
+ case Res of
+ AccountBal when is_integer(AccountBal) ->
+ {Time, DS#driver_state{seed = NewSeed}};
+ Other ->
+ exit({crash, Other, Args, Random, DS})
+ end.
+
+random_to_args(Random, DS) ->
+ DriverId = DS#driver_state.driver_id,
+ TC = DS#driver_state.tab_config,
+ HistoryId = DS#driver_state.history_id,
+ Delta = trunc(Random * 1999998) - 999999, % -999999 <= Delta <= +999999
+
+ Branches = DS#driver_state.local_branches,
+ NB = DS#driver_state.n_local_branches,
+ NT = TC#tab_config.n_tellers_per_branch,
+ NA = TC#tab_config.n_accounts_per_branch,
+ Tmp = trunc(Random * NB * NT),
+ BranchPos = (Tmp div NT) + 1,
+ BranchId =
+ case TC#tab_config.n_fragments of
+ 0 -> BranchPos - 1;
+ _ -> lists:nth(BranchPos, Branches)
+ end,
+ RelativeTellerId = Tmp div NT,
+ TellerId = (BranchId * NT) + RelativeTellerId,
+ {AccountBranchId, AccountId} =
+ if
+ Random >= 0.85, NB > 1 ->
+ %% Pick from a remote account
+ TmpAccountId= trunc(Random * (NB - 1) * NA),
+ TmpAccountBranchId = TmpAccountId div NA,
+ if
+ TmpAccountBranchId =:= BranchId ->
+ {TmpAccountBranchId + 1, TmpAccountId + NA};
+ true ->
+ {TmpAccountBranchId, TmpAccountId}
+ end;
+ true ->
+ %% Pick from a local account
+ RelativeAccountId = trunc(Random * NA),
+ TmpAccountId = (BranchId * NA) + RelativeAccountId,
+ {BranchId, TmpAccountId}
+ end,
+
+ {BranchId, [DriverId, BranchId, TellerId, AccountBranchId, AccountId, HistoryId, Delta]}.
+
+real_trans(RC, BranchId, Fun, Args, Mod) ->
+ Type = RC#run_config.activity_type,
+ case RC#run_config.spawn_near_branch of
+ false ->
+ mnesia:activity(Type, Fun, Args, Mod);
+ true ->
+ Node = table_info({branch, BranchId}, where_to_read),
+ case rpc:call(Node, mnesia, activity, [Type, Fun, Args, Mod]) of
+ {badrpc, Reason} -> exit(Reason);
+ Other -> Other
+ end
+ end.
+
+trans_type(TC, RC) ->
+ if
+ TC#tab_config.n_fragments =:= 0,
+ RC#run_config.use_sticky_locks =:= false ->
+ {fun add_delta/7, mnesia};
+ TC#tab_config.n_fragments =:= 0,
+ RC#run_config.use_sticky_locks =:= true ->
+ {fun sticky_add_delta/7, mnesia};
+ TC#tab_config.n_fragments > 0,
+ RC#run_config.use_sticky_locks =:= false ->
+ {fun frag_add_delta/7, mnesia_frag}
+ end.
+
+%%
+%% Runs the TPC-B defined transaction and returns NewAccountBalance
+%%
+
+add_delta(DriverId, BranchId, TellerId, _AccountBranchId, AccountId, HistoryId, Delta) ->
+ %% Grab write lock already when the record is read
+
+ %% Add delta to branch balance
+ [B] = mnesia:read(branch, BranchId, write),
+ NewB = B#branch{balance = B#branch.balance + Delta},
+ ok = mnesia:write(branch, NewB, write),
+
+ %% Add delta to teller balance
+ [T] = mnesia:read(teller, TellerId, write),
+ NewT = T#teller{balance = T#teller.balance + Delta},
+ ok = mnesia:write(teller, NewT, write),
+
+ %% Add delta to account balance
+ [A] = mnesia:read(account, AccountId, write),
+ NewA = A#account{balance = A#account.balance + Delta},
+ ok = mnesia:write(account, NewA, write),
+
+ %% Append to history log
+ History = #history{history_id = {DriverId, HistoryId},
+ account_id = AccountId,
+ teller_id = TellerId,
+ branch_id = BranchId,
+ amount = Delta
+ },
+ ok = mnesia:write(history, History, write),
+
+ %% Return account balance
+ NewA#account.balance.
+
+sticky_add_delta(DriverId, BranchId, TellerId, _AccountBranchId, AccountId, HistoryId, Delta) ->
+ %% Grab orinary read lock when the record is read
+ %% Grab sticky write lock when the record is written
+ %% This transaction would benefit of an early stick_write lock at read
+
+ %% Add delta to branch balance
+ [B] = mnesia:read(branch, BranchId, read),
+ NewB = B#branch{balance = B#branch.balance + Delta},
+ ok = mnesia:write(branch, NewB, sticky_write),
+
+ %% Add delta to teller balance
+ [T] = mnesia:read(teller, TellerId, read),
+ NewT = T#teller{balance = T#teller.balance + Delta},
+ ok = mnesia:write(teller, NewT, sticky_write),
+
+ %% Add delta to account balance
+ [A] = mnesia:read(account, AccountId, read),
+ NewA = A#account{balance = A#account.balance + Delta},
+ ok = mnesia:write(account, NewA, sticky_write),
+
+ %% Append to history log
+ History = #history{history_id = {DriverId, HistoryId},
+ account_id = AccountId,
+ teller_id = TellerId,
+ branch_id = BranchId,
+ amount = Delta
+ },
+ ok = mnesia:write(history, History, sticky_write),
+
+ %% Return account balance
+ NewA#account.balance.
+
+frag_add_delta(DriverId, BranchId, TellerId, AccountBranchId, AccountId, HistoryId, Delta) ->
+ %% Access fragmented table
+ %% Grab write lock already when the record is read
+
+ %% Add delta to branch balance
+ [B] = mnesia:read(branch, BranchId, write),
+ NewB = B#branch{balance = B#branch.balance + Delta},
+ ok = mnesia:write(NewB),
+
+ %% Add delta to teller balance
+ [T] = mnesia:read({teller, BranchId}, TellerId, write),
+ NewT = T#teller{balance = T#teller.balance + Delta},
+ ok = mnesia:write(NewT),
+
+ %% Add delta to account balance
+ %%io:format("frag_add_delta(~p): ~p\n", [node(), {account, BranchId, AccountId}]),
+ [A] = mnesia:read({account, AccountBranchId}, AccountId, write),
+ NewA = A#account{balance = A#account.balance + Delta},
+ ok = mnesia:write(NewA),
+
+ %% Append to history log
+ History = #history{history_id = {DriverId, HistoryId},
+ account_id = AccountId,
+ teller_id = TellerId,
+ branch_id = BranchId,
+ amount = Delta
+ },
+ ok = mnesia:write(History),
+
+ %% Return account balance
+ NewA#account.balance.
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Verify table consistency
+
+verify_tabs() ->
+ Nodes = mnesia:system_info(running_db_nodes),
+ case lists:member(node(), Nodes) of
+ true ->
+ Tabs = [branch, teller, account, history],
+ io:format("TPC-B: Verifying tables: ~w~n", [Tabs]),
+ rpc:multicall(Nodes, mnesia, wait_for_tables, [Tabs, infinity]),
+
+ Fun = fun() ->
+ mnesia:write_lock_table(branch),
+ mnesia:write_lock_table(teller),
+ mnesia:write_lock_table(account),
+ mnesia:write_lock_table(history),
+ {Res, BadNodes} =
+ rpc:multicall(Nodes, ?MODULE, count_balance, []),
+ check_balance(Res, BadNodes)
+ end,
+ case mnesia:transaction(Fun) of
+ {atomic, Res} -> Res;
+ {aborted, Reason} -> {error, Reason}
+ end;
+ false ->
+ {error, "Must be initiated from a running db_node"}
+ end.
+
+%% Returns a list of {Table, Node, Balance} tuples
+%% Assumes that no updates are performed
+
+-record(summary, {table, node, balance, size}).
+
+count_balance() ->
+ [count_balance(branch, #branch.balance),
+ count_balance(teller, #teller.balance),
+ count_balance(account, #account.balance)].
+
+count_balance(Tab, BalPos) ->
+ Frags = table_info(Tab, frag_names),
+ count_balance(Tab, Frags, 0, 0, BalPos).
+
+count_balance(Tab, [Frag | Frags], Bal, Size, BalPos) ->
+ First = mnesia:dirty_first(Frag),
+ {Bal2, Size2} = count_frag_balance(Frag, First, Bal, Size, BalPos),
+ count_balance(Tab, Frags, Bal2, Size2, BalPos);
+count_balance(Tab, [], Bal, Size, _BalPos) ->
+ #summary{table = Tab, node = node(), balance = Bal, size = Size}.
+
+count_frag_balance(_Frag, '$end_of_table', Bal, Size, _BalPos) ->
+ {Bal, Size};
+count_frag_balance(Frag, Key, Bal, Size, BalPos) ->
+ [Record] = mnesia:dirty_read({Frag, Key}),
+ Bal2 = Bal + element(BalPos, Record),
+ Next = mnesia:dirty_next(Frag, Key),
+ count_frag_balance(Frag, Next, Bal2, Size + 1, BalPos).
+
+check_balance([], []) ->
+ mnesia:abort({"No balance"});
+check_balance(Summaries, []) ->
+ [One | Rest] = lists:flatten(Summaries),
+ Balance = One#summary.balance,
+ %% Size = One#summary.size,
+ case [S || S <- Rest, S#summary.balance =/= Balance] of
+ [] ->
+ ok;
+ BadSummaries ->
+ mnesia:abort({"Bad balance", One, BadSummaries})
+ end;
+check_balance(_, BadNodes) ->
+ mnesia:abort({"Bad nodes", BadNodes}).