%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1996-2010. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
%% MODULE
%%
%% mnesia_tpcb - TPC-B benchmarking of Mnesia
%%
%% DESCRIPTION
%%
%% The metrics used in the TPC-B benchmark are throughput as measured
%% in transactions per second (TPS). The benchmark uses a single,
%% simple update-intensive transaction to load the database system.
%% The single transaction type provides a simple, repeatable
%% unit of work, and is designed to exercise the basic components of
%% a database system.
%%
%% The definition of the TPC-B states lots of detailed rules and
%% conditions that must be fullfilled, e.g. how the ACID (atomicity,
%% consistency, isolation and durability) properties are verified,
%% how the random numbers must be distributed, minimum sizes of
%% the different types of records, minimum duration of the benchmark,
%% formulas to calculate prices (dollars per tps), disclosure issues
%% etc. Please, see http://www.tpc.org/ about the nitty gritty details.
%%
%% The TPC-B benchmark is stated in terms of a hypothetical bank. The
%% bank has one or more branches. Each branch has multiple tellers. The
%% bank has many customers, each with an account. The database represents
%% the cash position of each entity (branch, teller and account) and a
%% history of recent transactions run by the bank. The transaction
%% represents the work done when a customer makes a deposit or a
%% withdrawal against his account. The transaction is performed by a
%% teller at some branch.
%%
%% Each process that performs TPC-B transactions is called a driver.
%% Drivers generates teller_id, account_id and delta amount of
%% money randomly. An account, a teller and a branch are read, their
%% balances are adjusted and a history record is created. The driver
%% measures the time for 3 reads, 3 writes and 1 create.
%%
%% GETTING STARTED
%%
%% Generate tables and run with default configuration:
%%
%% mnesia_tpcb:start().
%%
%% A little bit more advanced;
%%
%% spawn(mnesia_tpcb, start, [[[{n_drivers_per_node, 8}, {stop_after, infinity}]]),
%% mnesia_tpcb:stop().
%%
%% Really advanced;
%%
%% mnesia_tpcb:init(([{n_branches, 8}, {replica_type, disc_only_copies}]),
%% mnesia_tpcb:run(([{n_drivers_per_node, 8}]),
%% mnesia_tpcb:run(([{n_drivers_per_node, 64}]).
%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-module(mnesia_tpcb).
-author('hakan@erix.ericsson.se').
-export([
config/2,
count_balance/0,
driver_init/2,
init/1,
reporter_init/2,
run/1,
start/0,
start/1,
start/2,
stop/0,
real_trans/5,
verify_tabs/0,
reply_gen_branch/3,
frag_add_delta/7,
conflict_test/1,
dist_test/1,
replica_test/1,
sticky_replica_test/1,
remote_test/1,
remote_frag2_test/1
]).
-define(SECOND, 1000000).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Account record, total size must be at least 100 bytes
-define(ACCOUNT_FILLER,
{123456789012345678901234567890123456789012345678901234567890,
123456789012345678901234567890123456789012345678901234567890,
123456789012345678901234567890123456789012345678901234}).
-record(account,
{
id = 0, % Unique account id
branch_id = 0, % Branch where the account is held
balance = 0, % Account balance
filler = ?ACCOUNT_FILLER % Gap filler to ensure size >= 100 bytes
}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Branch record, total size must be at least 100 bytes
-define(BRANCH_FILLER,
{123456789012345678901234567890123456789012345678901234567890,
123456789012345678901234567890123456789012345678901234567890,
123456789012345678901234567890123456789012345678901234567890}).
-record(branch,
{
id = 0, % Unique branch id
balance = 0, % Total balance of whole branch
filler = ?BRANCH_FILLER % Gap filler to ensure size >= 100 bytes
}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Teller record, total size must be at least 100 bytes
-define(TELLER_FILLER,
{123456789012345678901234567890123456789012345678901234567890,
123456789012345678901234567890123456789012345678901234567890,
1234567890123456789012345678901234567890123456789012345678}).
-record(teller,
{
id = 0, % Unique teller id
branch_id = 0, % Branch where the teller is located
balance = 0, % Teller balance
filler = ?TELLER_FILLER % Gap filler to ensure size >= 100 bytes
}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% History record, total size must be at least 50 bytes
-define(HISTORY_FILLER, 1234567890).
-record(history,
{
history_id = {0, 0}, % {DriverId, DriverLocalHistoryid}
time_stamp = now(), % Time point during active transaction
branch_id = 0, % Branch associated with teller
teller_id = 0, % Teller invlolved in transaction
account_id = 0, % Account updated by transaction
amount = 0, % Amount (delta) specified by transaction
filler = ?HISTORY_FILLER % Gap filler to ensure size >= 50 bytes
}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-record(tab_config,
{
db_nodes = [node()],
n_replicas = 1, % Ignored for non-fragmented tables
replica_nodes = [node()],
replica_type = ram_copies,
use_running_mnesia = false,
n_fragments = 0,
n_branches = 1,
n_tellers_per_branch = 10, % Must be 10
n_accounts_per_branch = 100000, % Must be 100000
branch_filler = ?BRANCH_FILLER,
account_filler = ?ACCOUNT_FILLER,
teller_filler = ?TELLER_FILLER
}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-record(run_config,
{
driver_nodes = [node()],
n_drivers_per_node = 1,
use_running_mnesia = false,
stop_after = timer:minutes(15), % Minimum 15 min
report_interval = timer:minutes(1),
use_sticky_locks = false,
spawn_near_branch = false,
activity_type = transaction,
reuse_history_id = false
}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-record(time,
{
n_trans = 0,
min_n = 0,
max_n = 0,
acc_time = 0,
max_time = 0
}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-record(driver_state,
{
driver_id,
driver_node,
seed,
n_local_branches,
local_branches,
tab_config,
run_config,
history_id,
time = #time{},
acc_time = #time{},
reuse_history_id
}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-record(reporter_state,
{
driver_pids,
starter_pid,
n_iters = 0,
prev_tps = 0,
curr = #time{},
acc = #time{},
init_micros,
prev_micros,
run_config
}).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% One driver on each node, table not replicated
config(frag_test, ReplicaType) ->
Remote = nodes(),
Local = node(),
Nodes = [Local | Remote],
[
{n_branches, length(Nodes)},
{n_fragments, length(Nodes)},
{replica_nodes, Nodes},
{db_nodes, Nodes},
{driver_nodes, Nodes},
{n_accounts_per_branch, 100},
{replica_type, ReplicaType},
{stop_after, timer:minutes(1)},
{report_interval, timer:seconds(10)},
{reuse_history_id, true}
];
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% One driver on each node, table replicated to two nodes.
config(frag2_test, ReplicaType) ->
Remote = nodes(),
Local = node(),
Nodes = [Local | Remote],
[
{n_branches, length(Nodes)},
{n_fragments, length(Nodes)},
{n_replicas, 2},
{replica_nodes, Nodes},
{db_nodes, Nodes},
{driver_nodes, Nodes},
{n_accounts_per_branch, 100},
{replica_type, ReplicaType},
{stop_after, timer:minutes(1)},
{report_interval, timer:seconds(10)},
{reuse_history_id, true}
];
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% One driver on this node, table replicated to all nodes.
config(replica_test, ReplicaType) ->
Remote = nodes(),
Local = node(),
Nodes = [Local | Remote],
[
{db_nodes, Nodes},
{driver_nodes, [Local]},
{replica_nodes, Nodes},
{n_accounts_per_branch, 100},
{replica_type, ReplicaType},
{stop_after, timer:minutes(1)},
{report_interval, timer:seconds(10)},
{reuse_history_id, true}
];
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% One driver on this node, table replicated to all nodes.
config(sticky_replica_test, ReplicaType) ->
Remote = nodes(),
Local = node(),
Nodes = [Local | Remote],
[
{db_nodes, Nodes},
{driver_nodes, [node()]},
{replica_nodes, Nodes},
{n_accounts_per_branch, 100},
{replica_type, ReplicaType},
{use_sticky_locks, true},
{stop_after, timer:minutes(1)},
{report_interval, timer:seconds(10)},
{reuse_history_id, true}
];
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Ten drivers per node, tables replicated to all nodes, lots of branches
config(dist_test, ReplicaType) ->
Remote = nodes(),
Local = node(),
Nodes = [Local | Remote],
[
{db_nodes, Nodes},
{driver_nodes, Nodes},
{replica_nodes, Nodes},
{n_drivers_per_node, 10},
{n_branches, 10 * length(Nodes) * 100},
{n_accounts_per_branch, 10},
{replica_type, ReplicaType},
{stop_after, timer:minutes(1)},
{report_interval, timer:seconds(10)},
{reuse_history_id, true}
];
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Ten drivers per node, tables replicated to all nodes, single branch
config(conflict_test, ReplicaType) ->
Remote = nodes(),
Local = node(),
Nodes = [Local | Remote],
[
{db_nodes, Nodes},
{driver_nodes, Nodes},
{replica_nodes, Nodes},
{n_drivers_per_node, 10},
{n_branches, 1},
{n_accounts_per_branch, 10},
{replica_type, ReplicaType},
{stop_after, timer:minutes(1)},
{report_interval, timer:seconds(10)},
{reuse_history_id, true}
];
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% One driver on this node, table replicated to all other nodes.
config(remote_test, ReplicaType) ->
Remote = nodes(),
Local = node(),
Nodes = [Local | Remote],
[
{db_nodes, Nodes},
{driver_nodes, [Local]},
{replica_nodes, Remote},
{n_accounts_per_branch, 100},
{replica_type, ReplicaType},
{stop_after, timer:minutes(1)},
{report_interval, timer:seconds(10)},
{reuse_history_id, true}
];
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% One driver on this node, table replicated to two other nodes.
config(remote_frag2_test, ReplicaType) ->
Remote = nodes(),
Local = node(),
Nodes = [Local | Remote],
[
{n_branches, length(Remote)},
{n_fragments, length(Remote)},
{n_replicas, 2},
{replica_nodes, Remote},
{db_nodes, Nodes},
{driver_nodes, [Local]},
{n_accounts_per_branch, 100},
{replica_type, ReplicaType},
{stop_after, timer:minutes(1)},
{report_interval, timer:seconds(10)},
{reuse_history_id, true}
].
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
start(What, ReplicaType) ->
spawn_link(?MODULE, start, [config(What, ReplicaType)]).
replica_test(ReplicaType) ->
start(replica_test, ReplicaType).
sticky_replica_test(ReplicaType) ->
start(sticky_replica_test, ReplicaType).
dist_test(ReplicaType) ->
start(dist_test, ReplicaType).
conflict_test(ReplicaType) ->
start(conflict_test, ReplicaType).
remote_test(ReplicaType) ->
start(remote_test, ReplicaType).
remote_frag2_test(ReplicaType) ->
start(remote_frag2_test, ReplicaType).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Args is a list of {Key, Val} tuples where Key is a field name
%% in either the record tab_config or run_config. Unknown keys are ignored.
start() ->
start([]).
start(Args) ->
init(Args),
run(Args).
list2rec(List, Fields, DefaultTuple) ->
[Name|Defaults] = tuple_to_list(DefaultTuple),
List2 = list2rec(List, Fields, Defaults, []),
list_to_tuple([Name] ++ List2).
list2rec(_List, [], [], Acc) ->
Acc;
list2rec(List, [F|Fields], [D|Defaults], Acc) ->
{Val, List2} =
case lists:keysearch(F, 1, List) of
false ->
{D, List};
{value, {F, NewVal}} ->
{NewVal, lists:keydelete(F, 1, List)}
end,
list2rec(List2, Fields, Defaults, Acc ++ [Val]).
stop() ->
case whereis(mnesia_tpcb) of
undefined ->
{error, not_running};
Pid ->
sync_stop(Pid)
end.
sync_stop(Pid) ->
Pid ! {self(), stop},
receive
{Pid, {stopped, Res}} -> Res
after timer:minutes(1) ->
exit(Pid, kill),
{error, brutal_kill}
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Initialization
%% Args is a list of {Key, Val} tuples where Key is a field name
%% in the record tab_config, unknown keys are ignored.
init(Args) ->
TabConfig0 = list2rec(Args, record_info(fields, tab_config), #tab_config{}),
TabConfig =
if
TabConfig0#tab_config.n_fragments =:= 0 ->
TabConfig0#tab_config{n_replicas = length(TabConfig0#tab_config.replica_nodes)};
true ->
TabConfig0
end,
Tags = record_info(fields, tab_config),
Fun = fun(F, Pos) -> {{F, element(Pos, TabConfig)}, Pos + 1} end,
{List, _} = lists:mapfoldl(Fun, 2, Tags),
io:format("TPC-B: Table config: ~p ~n", [List]),
DbNodes = TabConfig#tab_config.db_nodes,
stop(),
if
TabConfig#tab_config.use_running_mnesia =:= true ->
ignore;
true ->
rpc:multicall(DbNodes, mnesia, lkill, []),
case mnesia:delete_schema(DbNodes) of
ok ->
case mnesia:create_schema(DbNodes) of
ok ->
{Replies, BadNodes} =
rpc:multicall(DbNodes, mnesia, start, []),
case [Res || Res <- Replies, Res =/= ok] of
[] when BadNodes =:= [] ->
ok;
BadRes ->
io:format("TPC-B: <ERROR> "
"Failed to start ~p: ~p~n",
[BadNodes, BadRes]),
exit({start_failed, BadRes, BadNodes})
end;
{error, Reason} ->
io:format("TPC-B: <ERROR> "
"Failed to create schema on disc: ~p~n",
[Reason]),
exit({create_schema_failed, Reason})
end;
{error, Reason} ->
io:format("TPC-B: <ERROR> "
"Failed to delete schema on disc: ~p~n",
[Reason]),
exit({delete_schema_failed, Reason})
end
end,
gen_tabs(TabConfig).
gen_tabs(TC) ->
create_tab(TC, branch, record_info(fields, branch),
undefined),
create_tab(TC, account, record_info(fields, account),
{branch, #account.branch_id}),
create_tab(TC, teller, record_info(fields, teller),
{branch, #teller.branch_id}),
create_tab(TC, history, record_info(fields, history),
{branch, #history.branch_id}),
NB = TC#tab_config.n_branches,
NT = TC#tab_config.n_tellers_per_branch,
NA = TC#tab_config.n_accounts_per_branch,
io:format("TPC-B: Generating ~p branches a ~p bytes~n",
[NB, size(term_to_binary(default_branch(TC)))]),
io:format("TPC-B: Generating ~p * ~p tellers a ~p bytes~n",
[NB, NT, size(term_to_binary(default_teller(TC)))]),
io:format("TPC-B: Generating ~p * ~p accounts a ~p bytes~n",
[NB, NA, size(term_to_binary(default_account(TC)))]),
io:format("TPC-B: Generating 0 history records a ~p bytes~n",
[size(term_to_binary(default_history(TC)))]),
gen_branches(TC),
case verify_tabs() of
ok ->
ignore;
{error, Reason} ->
io:format("TPC-B: <ERROR> Inconsistent tables: ~w~n",
[Reason]),
exit({inconsistent_tables, Reason})
end.
create_tab(TC, Name, Attrs, _ForeignKey) when TC#tab_config.n_fragments =:= 0 ->
Nodes = TC#tab_config.replica_nodes,
Type = TC#tab_config.replica_type,
Def = [{Type, Nodes}, {attributes, Attrs}],
create_tab(Name, Def);
create_tab(TC, Name, Attrs, ForeignKey) ->
NReplicas = TC#tab_config.n_replicas,
NodePool = TC#tab_config.replica_nodes,
Type = TC#tab_config.replica_type,
NF = TC#tab_config.n_fragments,
Props = [{n_fragments, NF},
{node_pool, NodePool},
{n_copies(Type), NReplicas},
{foreign_key, ForeignKey}],
Def = [{frag_properties, Props},
{attributes, Attrs}],
create_tab(Name, Def).
create_tab(Name, Def) ->
mnesia:delete_table(Name),
case mnesia:create_table(Name, Def) of
{atomic, ok} ->
ok;
{aborted, Reason} ->
io:format("TPC-B: <ERROR> failed to create table ~w ~w: ~p~n",
[Name, Def, Reason]),
exit({create_table_failed, Reason})
end.
n_copies(Type) ->
case Type of
ram_copies -> n_ram_copies;
disc_copies -> n_disc_copies;
disc_only_copies -> n_disc_only_copies
end.
gen_branches(TC) ->
First = 0,
Last = First + TC#tab_config.n_branches - 1,
GenPids = gen_branches(TC, First, Last, []),
wait_for_gen(GenPids).
wait_for_gen([]) ->
ok;
wait_for_gen(Pids) ->
receive
{branch_generated, Pid} -> wait_for_gen(lists:delete(Pid, Pids));
Exit ->
exit({tpcb_failed, Exit})
end.
gen_branches(TC, BranchId, Last, UsedNs) when BranchId =< Last ->
UsedNs2 = get_branch_nodes(BranchId, UsedNs),
Node = hd(UsedNs2),
Pid = spawn_link(Node, ?MODULE, reply_gen_branch,
[self(), TC, BranchId]),
[Pid | gen_branches(TC, BranchId + 1, Last, UsedNs2)];
gen_branches(_, _, _, _) ->
[].
reply_gen_branch(ReplyTo, TC, BranchId) ->
gen_branch(TC, BranchId),
ReplyTo ! {branch_generated, self()},
unlink(ReplyTo).
%% Returns a new list of nodes with the best node as head
get_branch_nodes(BranchId, UsedNs) ->
WriteNs = table_info({branch, BranchId}, where_to_write),
WeightedNs = [{n_duplicates(N, UsedNs, 0), N} || N <- WriteNs],
[{_, LeastUsed} | _ ] = lists:sort(WeightedNs),
[LeastUsed | UsedNs].
n_duplicates(_N, [], Count) ->
Count;
n_duplicates(N, [N | Tail], Count) ->
n_duplicates(N, Tail, Count + 1);
n_duplicates(N, [_ | Tail], Count) ->
n_duplicates(N, Tail, Count).
gen_branch(TC, BranchId) ->
A = default_account(TC),
NA = TC#tab_config.n_accounts_per_branch,
FirstA = BranchId * NA,
ArgsA = [FirstA, FirstA + NA - 1, BranchId, A],
ok = mnesia:activity(async_dirty, fun gen_accounts/4, ArgsA, mnesia_frag),
T = default_teller(TC),
NT = TC#tab_config.n_tellers_per_branch,
FirstT = BranchId * NT,
ArgsT = [FirstT, FirstT + NT - 1, BranchId, T],
ok = mnesia:activity(async_dirty, fun gen_tellers/4, ArgsT, mnesia_frag),
B = default_branch(TC),
FunB = fun() -> mnesia:write(branch, B#branch{id = BranchId}, write) end,
ok = mnesia:activity(sync_dirty, FunB, [], mnesia_frag).
gen_tellers(Id, Last, BranchId, T) when Id =< Last ->
mnesia:write(teller, T#teller{id = Id, branch_id=BranchId}, write),
gen_tellers(Id + 1, Last, BranchId, T);
gen_tellers(_, _, _, _) ->
ok.
gen_accounts(Id, Last, BranchId, A) when Id =< Last ->
mnesia:write(account, A#account{id = Id, branch_id=BranchId}, write),
gen_accounts(Id + 1, Last, BranchId, A);
gen_accounts(_, _, _, _) ->
ok.
default_branch(TC) -> #branch{filler = TC#tab_config.branch_filler}.
default_teller(TC) -> #teller{filler = TC#tab_config.teller_filler}.
default_account(TC) -> #account{filler = TC#tab_config.account_filler}.
default_history(_TC) -> #history{}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%% Run the benchmark
%% Args is a list of {Key, Val} tuples where Key is a field name
%% in the record run_config, unknown keys are ignored.
run(Args) ->
RunConfig = list2rec(Args, record_info(fields, run_config), #run_config{}),
Tags = record_info(fields, run_config),
Fun = fun(F, Pos) -> {{F, element(Pos, RunConfig)}, Pos + 1} end,
{List, _} = lists:mapfoldl(Fun, 2, Tags),
io:format("TPC-B: Run config: ~p ~n", [List]),
Pid = spawn_link(?MODULE, reporter_init, [self(), RunConfig]),
receive
{Pid, {stopped, Res}} ->
Res; % Stopped by other process
Else ->
{tpcb_got, Else}
after RunConfig#run_config.stop_after ->
sync_stop(Pid)
end.
reporter_init(Starter, RC) ->
register(mnesia_tpcb, self()),
process_flag(trap_exit, true),
DbNodes = mnesia:system_info(db_nodes),
if
RC#run_config.use_running_mnesia =:= true ->
ignore;
true ->
{Replies, BadNodes} =
rpc:multicall(DbNodes, mnesia, start, []),
case [Res || Res <- Replies, Res =/= ok] of
[] when BadNodes =:= [] ->
ok;
BadRes ->
io:format("TPC-B: <ERROR> "
"Failed to start ~w: ~p~n",
[BadNodes, BadRes]),
exit({start_failed, BadRes, BadNodes})
end,
verify_tabs()
end,
N = table_info(branch, size),
NT = table_info(teller, size) div N,
NA = table_info(account, size) div N,
{Type, NF, RepNodes} = table_storage(branch),
TC = #tab_config{n_fragments = NF,
n_branches = N,
n_tellers_per_branch = NT,
n_accounts_per_branch = NA,
db_nodes = DbNodes,
replica_nodes = RepNodes,
replica_type = Type
},
Drivers = start_drivers(RC, TC),
Now = now_to_micros(erlang:now()),
State = #reporter_state{driver_pids = Drivers,
run_config = RC,
starter_pid = Starter,
init_micros = Now,
prev_micros = Now
},
case catch reporter_loop(State) of
{'EXIT', Reason} ->
io:format("TPC-B: Abnormal termination: ~p~n", [Reason]),
if
RC#run_config.use_running_mnesia =:= true ->
ignore;
true ->
rpc:multicall(DbNodes, mnesia, lkill, [])
end,
unlink(Starter),
Starter ! {self(), {stopped, {error, Reason}}}, % To be sure
exit(shutdown);
{ok, Stopper, State2} ->
Time = State2#reporter_state.acc,
Res =
case verify_tabs() of
ok ->
{ok, Time};
{error, Reason} ->
io:format("TPC-B: <ERROR> Inconsistent tables, ~p~n",
[{error, Reason}]),
{error, Reason}
end,
if
RC#run_config.use_running_mnesia =:= true ->
ignore;
true ->
rpc:multicall(DbNodes, mnesia, stop, [])
end,
unlink(Starter),
Starter ! {self(), {stopped, Res}},
if
Stopper =/= Starter ->
Stopper ! {self(), {stopped, Res}};
true ->
ignore
end,
exit(shutdown)
end.
table_info(Tab, Item) ->
Fun = fun() -> mnesia:table_info(Tab, Item) end,
mnesia:activity(sync_dirty, Fun, mnesia_frag).
%% Returns {Storage, NFragments, ReplicaNodes}
table_storage(Tab) ->
case mnesia:table_info(branch, frag_properties) of
[] ->
NFO = 0,
NR = length(mnesia:table_info(Tab, ram_copies)),
ND = length(mnesia:table_info(Tab, disc_copies)),
NDO = length(mnesia:table_info(Tab, disc_only_copies)),
if
NR =/= 0 -> {ram_copies, NFO, NR};
ND =/= 0 -> {disc_copies, NFO, ND};
NDO =/= 0 -> {disc_copies, NFO, NDO}
end;
Props ->
{value, NFO} = lists:keysearch(n_fragments, 1, Props),
NR = table_info(Tab, n_ram_copies),
ND = table_info(Tab, n_disc_copies),
NDO = table_info(Tab, n_disc_only_copies),
if
NR =/= 0 -> {ram_copies, NFO, NR};
ND =/= 0 -> {disc_copies, NFO, ND};
NDO =/= 0 -> {disc_copies, NFO, NDO}
end
end.
reporter_loop(State) ->
RC = State#reporter_state.run_config,
receive
{From, stop} ->
{ok, From, call_drivers(State, stop)};
{'EXIT', Pid, Reason} when Pid =:= State#reporter_state.starter_pid ->
%% call_drivers(State, stop),
exit({starter_died, Pid, Reason})
after RC#run_config.report_interval ->
Iters = State#reporter_state.n_iters,
State2 = State#reporter_state{n_iters = Iters + 1},
case call_drivers(State2, report) of
State3 when State3#reporter_state.driver_pids =/= [] ->
State4 = State3#reporter_state{curr = #time{}},
reporter_loop(State4);
_ ->
exit(drivers_died)
end
end.
call_drivers(State, Msg) ->
Drivers = State#reporter_state.driver_pids,
lists:foreach(fun(Pid) -> Pid ! {self(), Msg} end, Drivers),
State2 = show_report(calc_reports(Drivers, State)),
case Msg =:= stop of
true ->
Acc = State2#reporter_state.acc,
Init = State2#reporter_state.init_micros,
show_report(State2#reporter_state{n_iters = 0,
curr = Acc,
prev_micros = Init});
false ->
ignore
end,
State2.
calc_reports([], State) ->
State;
calc_reports([Pid|Drivers], State) ->
receive
{'EXIT', P, Reason} when P =:= State#reporter_state.starter_pid ->
exit({starter_died, P, Reason});
{'EXIT', Pid, Reason} ->
exit({driver_died, Pid, Reason});
{Pid, Time} when is_record(Time, time) ->
%% io:format("~w: ~w~n", [Pid, Time]),
A = add_time(State#reporter_state.acc, Time),
C = add_time(State#reporter_state.curr, Time),
State2 = State#reporter_state{acc = A, curr = C},
calc_reports(Drivers, State2)
end.
add_time(Acc, New) ->
Acc#time{n_trans = New#time.n_trans + Acc#time.n_trans,
min_n = lists:min([New#time.n_trans, Acc#time.min_n] -- [0]),
max_n = lists:max([New#time.n_trans, Acc#time.max_n]),
acc_time = New#time.acc_time + Acc#time.acc_time,
max_time = lists:max([New#time.max_time, Acc#time.max_time])}.
-define(AVOID_DIV_ZERO(_What_), try (_What_) catch _:_ -> 0 end).
show_report(State) ->
Now = now_to_micros(erlang:now()),
Iters = State#reporter_state.n_iters,
Time = State#reporter_state.curr,
Max = Time#time.max_time,
N = Time#time.n_trans,
Avg = ?AVOID_DIV_ZERO(Time#time.acc_time div N),
AliveN = length(State#reporter_state.driver_pids),
Tps = ?AVOID_DIV_ZERO((?SECOND * AliveN) div Avg),
PrevTps= State#reporter_state.prev_tps,
{DiffSign, DiffTps} = signed_diff(Iters, Tps, PrevTps),
Unfairness = ?AVOID_DIV_ZERO(Time#time.max_n / Time#time.min_n),
BruttoAvg = ?AVOID_DIV_ZERO((Now - State#reporter_state.prev_micros) div N),
%% io:format("n_iters=~p, n_trans=~p, n_drivers=~p, avg=~p, now=~p, prev=~p~n",
%% [Iters, N, AliveN, BruttoAvg, Now, State#reporter_state.prev_micros]),
BruttoTps = ?AVOID_DIV_ZERO(?SECOND div BruttoAvg),
case Iters > 0 of
true ->
io:format("TPC-B: ~p iter ~s~p diff ~p (~p) tps ~p avg micros ~p max micros ~p unfairness~n",
[Iters, DiffSign, DiffTps, Tps, BruttoTps, Avg, Max, Unfairness]);
false ->
io:format("TPC-B: ~p (~p) transactions per second, "
"duration of longest transaction was ~p milliseconds~n",
[Tps, BruttoTps, Max div 1000])
end,
State#reporter_state{prev_tps = Tps, prev_micros = Now}.
signed_diff(Iters, Curr, Prev) ->
case Iters > 1 of
true -> sign(Curr - Prev);
false -> sign(0)
end.
sign(N) when N > 0 -> {"+", N};
sign(N) -> {"", N}.
now_to_micros({Mega, Secs, Micros}) ->
DT = calendar:now_to_datetime({Mega, Secs, 0}),
S = calendar:datetime_to_gregorian_seconds(DT),
(S * ?SECOND) + Micros.
start_drivers(RC, TC) ->
LastHistoryId = table_info(history, size),
Reuse = RC#run_config.reuse_history_id,
DS = #driver_state{tab_config = TC,
run_config = RC,
n_local_branches = 0,
local_branches = [],
history_id = LastHistoryId,
reuse_history_id = Reuse},
Nodes = RC#run_config.driver_nodes,
NB = TC#tab_config.n_branches,
First = 0,
AllBranches = lists:seq(First, First + NB - 1),
ND = RC#run_config.n_drivers_per_node,
Spawn = fun(Spec) ->
Node = Spec#driver_state.driver_node,
spawn_link(Node, ?MODULE, driver_init, [Spec, AllBranches])
end,
Specs = [DS#driver_state{driver_id = Id, driver_node = N}
|| N <- Nodes,
Id <- lists:seq(1, ND)],
Specs2 = lists:sort(lists:flatten(Specs)),
{Specs3, OrphanBranches} = alloc_local_branches(AllBranches, Specs2, []),
case length(OrphanBranches) of
N when N =< 10 ->
io:format("TPC-B: Orphan branches: ~p~n", [OrphanBranches]);
N ->
io:format("TPC-B: Orphan branches: ~p~n", [N])
end,
[Spawn(Spec) || Spec <- Specs3].
alloc_local_branches([BranchId | Tail], Specs, OrphanBranches) ->
Nodes = table_info({branch, BranchId}, where_to_write),
LocalSpecs = [DS || DS <- Specs,
lists:member(DS#driver_state.driver_node, Nodes)],
case lists:keysort(#driver_state.n_local_branches, LocalSpecs) of
[] ->
alloc_local_branches(Tail, Specs, [BranchId | OrphanBranches]);
[DS | _] ->
LocalNB = DS#driver_state.n_local_branches + 1,
LocalBranches = [BranchId | DS#driver_state.local_branches],
DS2 = DS#driver_state{n_local_branches = LocalNB,
local_branches = LocalBranches},
Specs2 = Specs -- [DS],
Specs3 = [DS2 | Specs2],
alloc_local_branches(Tail, Specs3, OrphanBranches)
end;
alloc_local_branches([], Specs, OrphanBranches) ->
{Specs, OrphanBranches}.
driver_init(DS, AllBranches) ->
Seed = erlang:now(),
DS2 =
if
DS#driver_state.n_local_branches =:= 0 ->
DS#driver_state{seed = Seed,
n_local_branches = length(AllBranches),
local_branches = AllBranches};
true ->
DS#driver_state{seed = Seed}
end,
io:format("TPC-B: Driver ~p started as ~p on node ~p with ~p local branches~n",
[DS2#driver_state.driver_id, self(), node(), DS2#driver_state.n_local_branches]),
driver_loop(DS2).
driver_loop(DS) ->
receive
{From, report} ->
From ! {self(), DS#driver_state.time},
Acc = add_time(DS#driver_state.time, DS#driver_state.acc_time),
DS2 = DS#driver_state{time=#time{}, acc_time = Acc}, % Reset timer
DS3 = calc_trans(DS2),
driver_loop(DS3);
{From, stop} ->
Acc = add_time(DS#driver_state.time, DS#driver_state.acc_time),
io:format("TPC-B: Driver ~p (~p) on node ~p stopped: ~w~n",
[DS#driver_state.driver_id, self(), node(self()), Acc]),
From ! {self(), DS#driver_state.time},
unlink(From),
exit(stopped)
after 0 ->
DS2 = calc_trans(DS),
driver_loop(DS2)
end.
calc_trans(DS) ->
{Micros, DS2} = time_trans(DS),
Time = DS2#driver_state.time,
Time2 = Time#time{n_trans = Time#time.n_trans + 1,
acc_time = Time#time.acc_time + Micros,
max_time = lists:max([Micros, Time#time.max_time])
},
case DS#driver_state.reuse_history_id of
false ->
HistoryId = DS#driver_state.history_id + 1,
DS2#driver_state{time=Time2, history_id = HistoryId};
true ->
DS2#driver_state{time=Time2}
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Generate teller_id, account_id and delta
%% Time the TPC-B transaction
time_trans(DS) ->
OldSeed = get(random_seed), % Avoid interference with Mnesia
put(random_seed, DS#driver_state.seed),
Random = random:uniform(),
NewSeed = get(random_seed),
case OldSeed of
undefined -> erase(random_seed);
_ -> put(random_seed, OldSeed)
end,
TC = DS#driver_state.tab_config,
RC = DS#driver_state.run_config,
{Branchid, Args} = random_to_args(Random, DS),
{Fun, Mod} = trans_type(TC, RC),
{Time, Res} = timer:tc(?MODULE, real_trans, [RC, Branchid, Fun, Args, Mod]),
case Res of
AccountBal when is_integer(AccountBal) ->
{Time, DS#driver_state{seed = NewSeed}};
Other ->
exit({crash, Other, Args, Random, DS})
end.
random_to_args(Random, DS) ->
DriverId = DS#driver_state.driver_id,
TC = DS#driver_state.tab_config,
HistoryId = DS#driver_state.history_id,
Delta = trunc(Random * 1999998) - 999999, % -999999 <= Delta <= +999999
Branches = DS#driver_state.local_branches,
NB = DS#driver_state.n_local_branches,
NT = TC#tab_config.n_tellers_per_branch,
NA = TC#tab_config.n_accounts_per_branch,
Tmp = trunc(Random * NB * NT),
BranchPos = (Tmp div NT) + 1,
BranchId =
case TC#tab_config.n_fragments of
0 -> BranchPos - 1;
_ -> lists:nth(BranchPos, Branches)
end,
RelativeTellerId = Tmp div NT,
TellerId = (BranchId * NT) + RelativeTellerId,
{AccountBranchId, AccountId} =
if
Random >= 0.85, NB > 1 ->
%% Pick from a remote account
TmpAccountId= trunc(Random * (NB - 1) * NA),
TmpAccountBranchId = TmpAccountId div NA,
if
TmpAccountBranchId =:= BranchId ->
{TmpAccountBranchId + 1, TmpAccountId + NA};
true ->
{TmpAccountBranchId, TmpAccountId}
end;
true ->
%% Pick from a local account
RelativeAccountId = trunc(Random * NA),
TmpAccountId = (BranchId * NA) + RelativeAccountId,
{BranchId, TmpAccountId}
end,
{BranchId, [DriverId, BranchId, TellerId, AccountBranchId, AccountId, HistoryId, Delta]}.
real_trans(RC, BranchId, Fun, Args, Mod) ->
Type = RC#run_config.activity_type,
case RC#run_config.spawn_near_branch of
false ->
mnesia:activity(Type, Fun, Args, Mod);
true ->
Node = table_info({branch, BranchId}, where_to_read),
case rpc:call(Node, mnesia, activity, [Type, Fun, Args, Mod]) of
{badrpc, Reason} -> exit(Reason);
Other -> Other
end
end.
trans_type(TC, RC) ->
if
TC#tab_config.n_fragments =:= 0,
RC#run_config.use_sticky_locks =:= false ->
{fun add_delta/7, mnesia};
TC#tab_config.n_fragments =:= 0,
RC#run_config.use_sticky_locks =:= true ->
{fun sticky_add_delta/7, mnesia};
TC#tab_config.n_fragments > 0,
RC#run_config.use_sticky_locks =:= false ->
{fun frag_add_delta/7, mnesia_frag}
end.
%%
%% Runs the TPC-B defined transaction and returns NewAccountBalance
%%
add_delta(DriverId, BranchId, TellerId, _AccountBranchId, AccountId, HistoryId, Delta) ->
%% Grab write lock already when the record is read
%% Add delta to branch balance
[B] = mnesia:read(branch, BranchId, write),
NewB = B#branch{balance = B#branch.balance + Delta},
ok = mnesia:write(branch, NewB, write),
%% Add delta to teller balance
[T] = mnesia:read(teller, TellerId, write),
NewT = T#teller{balance = T#teller.balance + Delta},
ok = mnesia:write(teller, NewT, write),
%% Add delta to account balance
[A] = mnesia:read(account, AccountId, write),
NewA = A#account{balance = A#account.balance + Delta},
ok = mnesia:write(account, NewA, write),
%% Append to history log
History = #history{history_id = {DriverId, HistoryId},
account_id = AccountId,
teller_id = TellerId,
branch_id = BranchId,
amount = Delta
},
ok = mnesia:write(history, History, write),
%% Return account balance
NewA#account.balance.
sticky_add_delta(DriverId, BranchId, TellerId, _AccountBranchId, AccountId, HistoryId, Delta) ->
%% Grab orinary read lock when the record is read
%% Grab sticky write lock when the record is written
%% This transaction would benefit of an early stick_write lock at read
%% Add delta to branch balance
[B] = mnesia:read(branch, BranchId, read),
NewB = B#branch{balance = B#branch.balance + Delta},
ok = mnesia:write(branch, NewB, sticky_write),
%% Add delta to teller balance
[T] = mnesia:read(teller, TellerId, read),
NewT = T#teller{balance = T#teller.balance + Delta},
ok = mnesia:write(teller, NewT, sticky_write),
%% Add delta to account balance
[A] = mnesia:read(account, AccountId, read),
NewA = A#account{balance = A#account.balance + Delta},
ok = mnesia:write(account, NewA, sticky_write),
%% Append to history log
History = #history{history_id = {DriverId, HistoryId},
account_id = AccountId,
teller_id = TellerId,
branch_id = BranchId,
amount = Delta
},
ok = mnesia:write(history, History, sticky_write),
%% Return account balance
NewA#account.balance.
frag_add_delta(DriverId, BranchId, TellerId, AccountBranchId, AccountId, HistoryId, Delta) ->
%% Access fragmented table
%% Grab write lock already when the record is read
%% Add delta to branch balance
[B] = mnesia:read(branch, BranchId, write),
NewB = B#branch{balance = B#branch.balance + Delta},
ok = mnesia:write(NewB),
%% Add delta to teller balance
[T] = mnesia:read({teller, BranchId}, TellerId, write),
NewT = T#teller{balance = T#teller.balance + Delta},
ok = mnesia:write(NewT),
%% Add delta to account balance
%%io:format("frag_add_delta(~p): ~p\n", [node(), {account, BranchId, AccountId}]),
[A] = mnesia:read({account, AccountBranchId}, AccountId, write),
NewA = A#account{balance = A#account.balance + Delta},
ok = mnesia:write(NewA),
%% Append to history log
History = #history{history_id = {DriverId, HistoryId},
account_id = AccountId,
teller_id = TellerId,
branch_id = BranchId,
amount = Delta
},
ok = mnesia:write(History),
%% Return account balance
NewA#account.balance.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Verify table consistency
verify_tabs() ->
Nodes = mnesia:system_info(running_db_nodes),
case lists:member(node(), Nodes) of
true ->
Tabs = [branch, teller, account, history],
io:format("TPC-B: Verifying tables: ~w~n", [Tabs]),
rpc:multicall(Nodes, mnesia, wait_for_tables, [Tabs, infinity]),
Fun = fun() ->
mnesia:write_lock_table(branch),
mnesia:write_lock_table(teller),
mnesia:write_lock_table(account),
mnesia:write_lock_table(history),
{Res, BadNodes} =
rpc:multicall(Nodes, ?MODULE, count_balance, []),
check_balance(Res, BadNodes)
end,
case mnesia:transaction(Fun) of
{atomic, Res} -> Res;
{aborted, Reason} -> {error, Reason}
end;
false ->
{error, "Must be initiated from a running db_node"}
end.
%% Returns a list of {Table, Node, Balance} tuples
%% Assumes that no updates are performed
-record(summary, {table, node, balance, size}).
count_balance() ->
[count_balance(branch, #branch.balance),
count_balance(teller, #teller.balance),
count_balance(account, #account.balance)].
count_balance(Tab, BalPos) ->
Frags = table_info(Tab, frag_names),
count_balance(Tab, Frags, 0, 0, BalPos).
count_balance(Tab, [Frag | Frags], Bal, Size, BalPos) ->
First = mnesia:dirty_first(Frag),
{Bal2, Size2} = count_frag_balance(Frag, First, Bal, Size, BalPos),
count_balance(Tab, Frags, Bal2, Size2, BalPos);
count_balance(Tab, [], Bal, Size, _BalPos) ->
#summary{table = Tab, node = node(), balance = Bal, size = Size}.
count_frag_balance(_Frag, '$end_of_table', Bal, Size, _BalPos) ->
{Bal, Size};
count_frag_balance(Frag, Key, Bal, Size, BalPos) ->
[Record] = mnesia:dirty_read({Frag, Key}),
Bal2 = Bal + element(BalPos, Record),
Next = mnesia:dirty_next(Frag, Key),
count_frag_balance(Frag, Next, Bal2, Size + 1, BalPos).
check_balance([], []) ->
mnesia:abort({"No balance"});
check_balance(Summaries, []) ->
[One | Rest] = lists:flatten(Summaries),
Balance = One#summary.balance,
%% Size = One#summary.size,
case [S || S <- Rest, S#summary.balance =/= Balance] of
[] ->
ok;
BadSummaries ->
mnesia:abort({"Bad balance", One, BadSummaries})
end;
check_balance(_, BadNodes) ->
mnesia:abort({"Bad nodes", BadNodes}).