%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 1996-2010. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. %% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. %% %% %CopyrightEnd% %% %% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% %% MODULE %% %% mnesia_tpcb - TPC-B benchmarking of Mnesia %% %% DESCRIPTION %% %% The metrics used in the TPC-B benchmark are throughput as measured %% in transactions per second (TPS). The benchmark uses a single, %% simple update-intensive transaction to load the database system. %% The single transaction type provides a simple, repeatable %% unit of work, and is designed to exercise the basic components of %% a database system. %% %% The definition of the TPC-B states lots of detailed rules and %% conditions that must be fullfilled, e.g. how the ACID (atomicity, %% consistency, isolation and durability) properties are verified, %% how the random numbers must be distributed, minimum sizes of %% the different types of records, minimum duration of the benchmark, %% formulas to calculate prices (dollars per tps), disclosure issues %% etc. Please, see http://www.tpc.org/ about the nitty gritty details. %% %% The TPC-B benchmark is stated in terms of a hypothetical bank. The %% bank has one or more branches. Each branch has multiple tellers. The %% bank has many customers, each with an account. The database represents %% the cash position of each entity (branch, teller and account) and a %% history of recent transactions run by the bank. The transaction %% represents the work done when a customer makes a deposit or a %% withdrawal against his account. The transaction is performed by a %% teller at some branch. %% %% Each process that performs TPC-B transactions is called a driver. %% Drivers generates teller_id, account_id and delta amount of %% money randomly. An account, a teller and a branch are read, their %% balances are adjusted and a history record is created. The driver %% measures the time for 3 reads, 3 writes and 1 create. %% %% GETTING STARTED %% %% Generate tables and run with default configuration: %% %% mnesia_tpcb:start(). %% %% A little bit more advanced; %% %% spawn(mnesia_tpcb, start, [[[{n_drivers_per_node, 8}, {stop_after, infinity}]]), %% mnesia_tpcb:stop(). %% %% Really advanced; %% %% mnesia_tpcb:init(([{n_branches, 8}, {replica_type, disc_only_copies}]), %% mnesia_tpcb:run(([{n_drivers_per_node, 8}]), %% mnesia_tpcb:run(([{n_drivers_per_node, 64}]). %% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -module(mnesia_tpcb). -author('hakan@erix.ericsson.se'). -export([ config/2, count_balance/0, driver_init/2, init/1, reporter_init/2, run/1, start/0, start/1, start/2, stop/0, real_trans/5, verify_tabs/0, reply_gen_branch/3, frag_add_delta/7, conflict_test/1, dist_test/1, replica_test/1, sticky_replica_test/1, remote_test/1, remote_frag2_test/1 ]). -define(SECOND, 1000000). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Account record, total size must be at least 100 bytes -define(ACCOUNT_FILLER, {123456789012345678901234567890123456789012345678901234567890, 123456789012345678901234567890123456789012345678901234567890, 123456789012345678901234567890123456789012345678901234}). -record(account, { id = 0, % Unique account id branch_id = 0, % Branch where the account is held balance = 0, % Account balance filler = ?ACCOUNT_FILLER % Gap filler to ensure size >= 100 bytes }). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Branch record, total size must be at least 100 bytes -define(BRANCH_FILLER, {123456789012345678901234567890123456789012345678901234567890, 123456789012345678901234567890123456789012345678901234567890, 123456789012345678901234567890123456789012345678901234567890}). -record(branch, { id = 0, % Unique branch id balance = 0, % Total balance of whole branch filler = ?BRANCH_FILLER % Gap filler to ensure size >= 100 bytes }). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Teller record, total size must be at least 100 bytes -define(TELLER_FILLER, {123456789012345678901234567890123456789012345678901234567890, 123456789012345678901234567890123456789012345678901234567890, 1234567890123456789012345678901234567890123456789012345678}). -record(teller, { id = 0, % Unique teller id branch_id = 0, % Branch where the teller is located balance = 0, % Teller balance filler = ?TELLER_FILLER % Gap filler to ensure size >= 100 bytes }). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% History record, total size must be at least 50 bytes -define(HISTORY_FILLER, 1234567890). -record(history, { history_id = {0, 0}, % {DriverId, DriverLocalHistoryid} time_stamp = now(), % Time point during active transaction branch_id = 0, % Branch associated with teller teller_id = 0, % Teller invlolved in transaction account_id = 0, % Account updated by transaction amount = 0, % Amount (delta) specified by transaction filler = ?HISTORY_FILLER % Gap filler to ensure size >= 50 bytes }). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -record(tab_config, { db_nodes = [node()], n_replicas = 1, % Ignored for non-fragmented tables replica_nodes = [node()], replica_type = ram_copies, use_running_mnesia = false, n_fragments = 0, n_branches = 1, n_tellers_per_branch = 10, % Must be 10 n_accounts_per_branch = 100000, % Must be 100000 branch_filler = ?BRANCH_FILLER, account_filler = ?ACCOUNT_FILLER, teller_filler = ?TELLER_FILLER }). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -record(run_config, { driver_nodes = [node()], n_drivers_per_node = 1, use_running_mnesia = false, stop_after = timer:minutes(15), % Minimum 15 min report_interval = timer:minutes(1), use_sticky_locks = false, spawn_near_branch = false, activity_type = transaction, reuse_history_id = false }). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -record(time, { n_trans = 0, min_n = 0, max_n = 0, acc_time = 0, max_time = 0 }). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -record(driver_state, { driver_id, driver_node, seed, n_local_branches, local_branches, tab_config, run_config, history_id, time = #time{}, acc_time = #time{}, reuse_history_id }). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -record(reporter_state, { driver_pids, starter_pid, n_iters = 0, prev_tps = 0, curr = #time{}, acc = #time{}, init_micros, prev_micros, run_config }). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% One driver on each node, table not replicated config(frag_test, ReplicaType) -> Remote = nodes(), Local = node(), Nodes = [Local | Remote], [ {n_branches, length(Nodes)}, {n_fragments, length(Nodes)}, {replica_nodes, Nodes}, {db_nodes, Nodes}, {driver_nodes, Nodes}, {n_accounts_per_branch, 100}, {replica_type, ReplicaType}, {stop_after, timer:minutes(1)}, {report_interval, timer:seconds(10)}, {reuse_history_id, true} ]; %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% One driver on each node, table replicated to two nodes. config(frag2_test, ReplicaType) -> Remote = nodes(), Local = node(), Nodes = [Local | Remote], [ {n_branches, length(Nodes)}, {n_fragments, length(Nodes)}, {n_replicas, 2}, {replica_nodes, Nodes}, {db_nodes, Nodes}, {driver_nodes, Nodes}, {n_accounts_per_branch, 100}, {replica_type, ReplicaType}, {stop_after, timer:minutes(1)}, {report_interval, timer:seconds(10)}, {reuse_history_id, true} ]; %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% One driver on this node, table replicated to all nodes. config(replica_test, ReplicaType) -> Remote = nodes(), Local = node(), Nodes = [Local | Remote], [ {db_nodes, Nodes}, {driver_nodes, [Local]}, {replica_nodes, Nodes}, {n_accounts_per_branch, 100}, {replica_type, ReplicaType}, {stop_after, timer:minutes(1)}, {report_interval, timer:seconds(10)}, {reuse_history_id, true} ]; %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% One driver on this node, table replicated to all nodes. config(sticky_replica_test, ReplicaType) -> Remote = nodes(), Local = node(), Nodes = [Local | Remote], [ {db_nodes, Nodes}, {driver_nodes, [node()]}, {replica_nodes, Nodes}, {n_accounts_per_branch, 100}, {replica_type, ReplicaType}, {use_sticky_locks, true}, {stop_after, timer:minutes(1)}, {report_interval, timer:seconds(10)}, {reuse_history_id, true} ]; %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Ten drivers per node, tables replicated to all nodes, lots of branches config(dist_test, ReplicaType) -> Remote = nodes(), Local = node(), Nodes = [Local | Remote], [ {db_nodes, Nodes}, {driver_nodes, Nodes}, {replica_nodes, Nodes}, {n_drivers_per_node, 10}, {n_branches, 10 * length(Nodes) * 100}, {n_accounts_per_branch, 10}, {replica_type, ReplicaType}, {stop_after, timer:minutes(1)}, {report_interval, timer:seconds(10)}, {reuse_history_id, true} ]; %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Ten drivers per node, tables replicated to all nodes, single branch config(conflict_test, ReplicaType) -> Remote = nodes(), Local = node(), Nodes = [Local | Remote], [ {db_nodes, Nodes}, {driver_nodes, Nodes}, {replica_nodes, Nodes}, {n_drivers_per_node, 10}, {n_branches, 1}, {n_accounts_per_branch, 10}, {replica_type, ReplicaType}, {stop_after, timer:minutes(1)}, {report_interval, timer:seconds(10)}, {reuse_history_id, true} ]; %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% One driver on this node, table replicated to all other nodes. config(remote_test, ReplicaType) -> Remote = nodes(), Local = node(), Nodes = [Local | Remote], [ {db_nodes, Nodes}, {driver_nodes, [Local]}, {replica_nodes, Remote}, {n_accounts_per_branch, 100}, {replica_type, ReplicaType}, {stop_after, timer:minutes(1)}, {report_interval, timer:seconds(10)}, {reuse_history_id, true} ]; %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% One driver on this node, table replicated to two other nodes. config(remote_frag2_test, ReplicaType) -> Remote = nodes(), Local = node(), Nodes = [Local | Remote], [ {n_branches, length(Remote)}, {n_fragments, length(Remote)}, {n_replicas, 2}, {replica_nodes, Remote}, {db_nodes, Nodes}, {driver_nodes, [Local]}, {n_accounts_per_branch, 100}, {replica_type, ReplicaType}, {stop_after, timer:minutes(1)}, {report_interval, timer:seconds(10)}, {reuse_history_id, true} ]. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% start(What, ReplicaType) -> spawn_link(?MODULE, start, [config(What, ReplicaType)]). replica_test(ReplicaType) -> start(replica_test, ReplicaType). sticky_replica_test(ReplicaType) -> start(sticky_replica_test, ReplicaType). dist_test(ReplicaType) -> start(dist_test, ReplicaType). conflict_test(ReplicaType) -> start(conflict_test, ReplicaType). remote_test(ReplicaType) -> start(remote_test, ReplicaType). remote_frag2_test(ReplicaType) -> start(remote_frag2_test, ReplicaType). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Args is a list of {Key, Val} tuples where Key is a field name %% in either the record tab_config or run_config. Unknown keys are ignored. start() -> start([]). start(Args) -> init(Args), run(Args). list2rec(List, Fields, DefaultTuple) -> [Name|Defaults] = tuple_to_list(DefaultTuple), List2 = list2rec(List, Fields, Defaults, []), list_to_tuple([Name] ++ List2). list2rec(_List, [], [], Acc) -> Acc; list2rec(List, [F|Fields], [D|Defaults], Acc) -> {Val, List2} = case lists:keysearch(F, 1, List) of false -> {D, List}; {value, {F, NewVal}} -> {NewVal, lists:keydelete(F, 1, List)} end, list2rec(List2, Fields, Defaults, Acc ++ [Val]). stop() -> case whereis(mnesia_tpcb) of undefined -> {error, not_running}; Pid -> sync_stop(Pid) end. sync_stop(Pid) -> Pid ! {self(), stop}, receive {Pid, {stopped, Res}} -> Res after timer:minutes(1) -> exit(Pid, kill), {error, brutal_kill} end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Initialization %% Args is a list of {Key, Val} tuples where Key is a field name %% in the record tab_config, unknown keys are ignored. init(Args) -> TabConfig0 = list2rec(Args, record_info(fields, tab_config), #tab_config{}), TabConfig = if TabConfig0#tab_config.n_fragments =:= 0 -> TabConfig0#tab_config{n_replicas = length(TabConfig0#tab_config.replica_nodes)}; true -> TabConfig0 end, Tags = record_info(fields, tab_config), Fun = fun(F, Pos) -> {{F, element(Pos, TabConfig)}, Pos + 1} end, {List, _} = lists:mapfoldl(Fun, 2, Tags), io:format("TPC-B: Table config: ~p ~n", [List]), DbNodes = TabConfig#tab_config.db_nodes, stop(), if TabConfig#tab_config.use_running_mnesia =:= true -> ignore; true -> rpc:multicall(DbNodes, mnesia, lkill, []), case mnesia:delete_schema(DbNodes) of ok -> case mnesia:create_schema(DbNodes) of ok -> {Replies, BadNodes} = rpc:multicall(DbNodes, mnesia, start, []), case [Res || Res <- Replies, Res =/= ok] of [] when BadNodes =:= [] -> ok; BadRes -> io:format("TPC-B: <ERROR> " "Failed to start ~p: ~p~n", [BadNodes, BadRes]), exit({start_failed, BadRes, BadNodes}) end; {error, Reason} -> io:format("TPC-B: <ERROR> " "Failed to create schema on disc: ~p~n", [Reason]), exit({create_schema_failed, Reason}) end; {error, Reason} -> io:format("TPC-B: <ERROR> " "Failed to delete schema on disc: ~p~n", [Reason]), exit({delete_schema_failed, Reason}) end end, gen_tabs(TabConfig). gen_tabs(TC) -> create_tab(TC, branch, record_info(fields, branch), undefined), create_tab(TC, account, record_info(fields, account), {branch, #account.branch_id}), create_tab(TC, teller, record_info(fields, teller), {branch, #teller.branch_id}), create_tab(TC, history, record_info(fields, history), {branch, #history.branch_id}), NB = TC#tab_config.n_branches, NT = TC#tab_config.n_tellers_per_branch, NA = TC#tab_config.n_accounts_per_branch, io:format("TPC-B: Generating ~p branches a ~p bytes~n", [NB, size(term_to_binary(default_branch(TC)))]), io:format("TPC-B: Generating ~p * ~p tellers a ~p bytes~n", [NB, NT, size(term_to_binary(default_teller(TC)))]), io:format("TPC-B: Generating ~p * ~p accounts a ~p bytes~n", [NB, NA, size(term_to_binary(default_account(TC)))]), io:format("TPC-B: Generating 0 history records a ~p bytes~n", [size(term_to_binary(default_history(TC)))]), gen_branches(TC), case verify_tabs() of ok -> ignore; {error, Reason} -> io:format("TPC-B: <ERROR> Inconsistent tables: ~w~n", [Reason]), exit({inconsistent_tables, Reason}) end. create_tab(TC, Name, Attrs, _ForeignKey) when TC#tab_config.n_fragments =:= 0 -> Nodes = TC#tab_config.replica_nodes, Type = TC#tab_config.replica_type, Def = [{Type, Nodes}, {attributes, Attrs}], create_tab(Name, Def); create_tab(TC, Name, Attrs, ForeignKey) -> NReplicas = TC#tab_config.n_replicas, NodePool = TC#tab_config.replica_nodes, Type = TC#tab_config.replica_type, NF = TC#tab_config.n_fragments, Props = [{n_fragments, NF}, {node_pool, NodePool}, {n_copies(Type), NReplicas}, {foreign_key, ForeignKey}], Def = [{frag_properties, Props}, {attributes, Attrs}], create_tab(Name, Def). create_tab(Name, Def) -> mnesia:delete_table(Name), case mnesia:create_table(Name, Def) of {atomic, ok} -> ok; {aborted, Reason} -> io:format("TPC-B: <ERROR> failed to create table ~w ~w: ~p~n", [Name, Def, Reason]), exit({create_table_failed, Reason}) end. n_copies(Type) -> case Type of ram_copies -> n_ram_copies; disc_copies -> n_disc_copies; disc_only_copies -> n_disc_only_copies end. gen_branches(TC) -> First = 0, Last = First + TC#tab_config.n_branches - 1, GenPids = gen_branches(TC, First, Last, []), wait_for_gen(GenPids). wait_for_gen([]) -> ok; wait_for_gen(Pids) -> receive {branch_generated, Pid} -> wait_for_gen(lists:delete(Pid, Pids)); Exit -> exit({tpcb_failed, Exit}) end. gen_branches(TC, BranchId, Last, UsedNs) when BranchId =< Last -> UsedNs2 = get_branch_nodes(BranchId, UsedNs), Node = hd(UsedNs2), Pid = spawn_link(Node, ?MODULE, reply_gen_branch, [self(), TC, BranchId]), [Pid | gen_branches(TC, BranchId + 1, Last, UsedNs2)]; gen_branches(_, _, _, _) -> []. reply_gen_branch(ReplyTo, TC, BranchId) -> gen_branch(TC, BranchId), ReplyTo ! {branch_generated, self()}, unlink(ReplyTo). %% Returns a new list of nodes with the best node as head get_branch_nodes(BranchId, UsedNs) -> WriteNs = table_info({branch, BranchId}, where_to_write), WeightedNs = [{n_duplicates(N, UsedNs, 0), N} || N <- WriteNs], [{_, LeastUsed} | _ ] = lists:sort(WeightedNs), [LeastUsed | UsedNs]. n_duplicates(_N, [], Count) -> Count; n_duplicates(N, [N | Tail], Count) -> n_duplicates(N, Tail, Count + 1); n_duplicates(N, [_ | Tail], Count) -> n_duplicates(N, Tail, Count). gen_branch(TC, BranchId) -> A = default_account(TC), NA = TC#tab_config.n_accounts_per_branch, FirstA = BranchId * NA, ArgsA = [FirstA, FirstA + NA - 1, BranchId, A], ok = mnesia:activity(async_dirty, fun gen_accounts/4, ArgsA, mnesia_frag), T = default_teller(TC), NT = TC#tab_config.n_tellers_per_branch, FirstT = BranchId * NT, ArgsT = [FirstT, FirstT + NT - 1, BranchId, T], ok = mnesia:activity(async_dirty, fun gen_tellers/4, ArgsT, mnesia_frag), B = default_branch(TC), FunB = fun() -> mnesia:write(branch, B#branch{id = BranchId}, write) end, ok = mnesia:activity(sync_dirty, FunB, [], mnesia_frag). gen_tellers(Id, Last, BranchId, T) when Id =< Last -> mnesia:write(teller, T#teller{id = Id, branch_id=BranchId}, write), gen_tellers(Id + 1, Last, BranchId, T); gen_tellers(_, _, _, _) -> ok. gen_accounts(Id, Last, BranchId, A) when Id =< Last -> mnesia:write(account, A#account{id = Id, branch_id=BranchId}, write), gen_accounts(Id + 1, Last, BranchId, A); gen_accounts(_, _, _, _) -> ok. default_branch(TC) -> #branch{filler = TC#tab_config.branch_filler}. default_teller(TC) -> #teller{filler = TC#tab_config.teller_filler}. default_account(TC) -> #account{filler = TC#tab_config.account_filler}. default_history(_TC) -> #history{}. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%% Run the benchmark %% Args is a list of {Key, Val} tuples where Key is a field name %% in the record run_config, unknown keys are ignored. run(Args) -> RunConfig = list2rec(Args, record_info(fields, run_config), #run_config{}), Tags = record_info(fields, run_config), Fun = fun(F, Pos) -> {{F, element(Pos, RunConfig)}, Pos + 1} end, {List, _} = lists:mapfoldl(Fun, 2, Tags), io:format("TPC-B: Run config: ~p ~n", [List]), Pid = spawn_link(?MODULE, reporter_init, [self(), RunConfig]), receive {Pid, {stopped, Res}} -> Res; % Stopped by other process Else -> {tpcb_got, Else} after RunConfig#run_config.stop_after -> sync_stop(Pid) end. reporter_init(Starter, RC) -> register(mnesia_tpcb, self()), process_flag(trap_exit, true), DbNodes = mnesia:system_info(db_nodes), if RC#run_config.use_running_mnesia =:= true -> ignore; true -> {Replies, BadNodes} = rpc:multicall(DbNodes, mnesia, start, []), case [Res || Res <- Replies, Res =/= ok] of [] when BadNodes =:= [] -> ok; BadRes -> io:format("TPC-B: <ERROR> " "Failed to start ~w: ~p~n", [BadNodes, BadRes]), exit({start_failed, BadRes, BadNodes}) end, verify_tabs() end, N = table_info(branch, size), NT = table_info(teller, size) div N, NA = table_info(account, size) div N, {Type, NF, RepNodes} = table_storage(branch), TC = #tab_config{n_fragments = NF, n_branches = N, n_tellers_per_branch = NT, n_accounts_per_branch = NA, db_nodes = DbNodes, replica_nodes = RepNodes, replica_type = Type }, Drivers = start_drivers(RC, TC), Now = now_to_micros(erlang:now()), State = #reporter_state{driver_pids = Drivers, run_config = RC, starter_pid = Starter, init_micros = Now, prev_micros = Now }, case catch reporter_loop(State) of {'EXIT', Reason} -> io:format("TPC-B: Abnormal termination: ~p~n", [Reason]), if RC#run_config.use_running_mnesia =:= true -> ignore; true -> rpc:multicall(DbNodes, mnesia, lkill, []) end, unlink(Starter), Starter ! {self(), {stopped, {error, Reason}}}, % To be sure exit(shutdown); {ok, Stopper, State2} -> Time = State2#reporter_state.acc, Res = case verify_tabs() of ok -> {ok, Time}; {error, Reason} -> io:format("TPC-B: <ERROR> Inconsistent tables, ~p~n", [{error, Reason}]), {error, Reason} end, if RC#run_config.use_running_mnesia =:= true -> ignore; true -> rpc:multicall(DbNodes, mnesia, stop, []) end, unlink(Starter), Starter ! {self(), {stopped, Res}}, if Stopper =/= Starter -> Stopper ! {self(), {stopped, Res}}; true -> ignore end, exit(shutdown) end. table_info(Tab, Item) -> Fun = fun() -> mnesia:table_info(Tab, Item) end, mnesia:activity(sync_dirty, Fun, mnesia_frag). %% Returns {Storage, NFragments, ReplicaNodes} table_storage(Tab) -> case mnesia:table_info(branch, frag_properties) of [] -> NFO = 0, NR = length(mnesia:table_info(Tab, ram_copies)), ND = length(mnesia:table_info(Tab, disc_copies)), NDO = length(mnesia:table_info(Tab, disc_only_copies)), if NR =/= 0 -> {ram_copies, NFO, NR}; ND =/= 0 -> {disc_copies, NFO, ND}; NDO =/= 0 -> {disc_copies, NFO, NDO} end; Props -> {value, NFO} = lists:keysearch(n_fragments, 1, Props), NR = table_info(Tab, n_ram_copies), ND = table_info(Tab, n_disc_copies), NDO = table_info(Tab, n_disc_only_copies), if NR =/= 0 -> {ram_copies, NFO, NR}; ND =/= 0 -> {disc_copies, NFO, ND}; NDO =/= 0 -> {disc_copies, NFO, NDO} end end. reporter_loop(State) -> RC = State#reporter_state.run_config, receive {From, stop} -> {ok, From, call_drivers(State, stop)}; {'EXIT', Pid, Reason} when Pid =:= State#reporter_state.starter_pid -> %% call_drivers(State, stop), exit({starter_died, Pid, Reason}) after RC#run_config.report_interval -> Iters = State#reporter_state.n_iters, State2 = State#reporter_state{n_iters = Iters + 1}, case call_drivers(State2, report) of State3 when State3#reporter_state.driver_pids =/= [] -> State4 = State3#reporter_state{curr = #time{}}, reporter_loop(State4); _ -> exit(drivers_died) end end. call_drivers(State, Msg) -> Drivers = State#reporter_state.driver_pids, lists:foreach(fun(Pid) -> Pid ! {self(), Msg} end, Drivers), State2 = show_report(calc_reports(Drivers, State)), case Msg =:= stop of true -> Acc = State2#reporter_state.acc, Init = State2#reporter_state.init_micros, show_report(State2#reporter_state{n_iters = 0, curr = Acc, prev_micros = Init}); false -> ignore end, State2. calc_reports([], State) -> State; calc_reports([Pid|Drivers], State) -> receive {'EXIT', P, Reason} when P =:= State#reporter_state.starter_pid -> exit({starter_died, P, Reason}); {'EXIT', Pid, Reason} -> exit({driver_died, Pid, Reason}); {Pid, Time} when is_record(Time, time) -> %% io:format("~w: ~w~n", [Pid, Time]), A = add_time(State#reporter_state.acc, Time), C = add_time(State#reporter_state.curr, Time), State2 = State#reporter_state{acc = A, curr = C}, calc_reports(Drivers, State2) end. add_time(Acc, New) -> Acc#time{n_trans = New#time.n_trans + Acc#time.n_trans, min_n = lists:min([New#time.n_trans, Acc#time.min_n] -- [0]), max_n = lists:max([New#time.n_trans, Acc#time.max_n]), acc_time = New#time.acc_time + Acc#time.acc_time, max_time = lists:max([New#time.max_time, Acc#time.max_time])}. -define(AVOID_DIV_ZERO(_What_), try (_What_) catch _:_ -> 0 end). show_report(State) -> Now = now_to_micros(erlang:now()), Iters = State#reporter_state.n_iters, Time = State#reporter_state.curr, Max = Time#time.max_time, N = Time#time.n_trans, Avg = ?AVOID_DIV_ZERO(Time#time.acc_time div N), AliveN = length(State#reporter_state.driver_pids), Tps = ?AVOID_DIV_ZERO((?SECOND * AliveN) div Avg), PrevTps= State#reporter_state.prev_tps, {DiffSign, DiffTps} = signed_diff(Iters, Tps, PrevTps), Unfairness = ?AVOID_DIV_ZERO(Time#time.max_n / Time#time.min_n), BruttoAvg = ?AVOID_DIV_ZERO((Now - State#reporter_state.prev_micros) div N), %% io:format("n_iters=~p, n_trans=~p, n_drivers=~p, avg=~p, now=~p, prev=~p~n", %% [Iters, N, AliveN, BruttoAvg, Now, State#reporter_state.prev_micros]), BruttoTps = ?AVOID_DIV_ZERO(?SECOND div BruttoAvg), case Iters > 0 of true -> io:format("TPC-B: ~p iter ~s~p diff ~p (~p) tps ~p avg micros ~p max micros ~p unfairness~n", [Iters, DiffSign, DiffTps, Tps, BruttoTps, Avg, Max, Unfairness]); false -> io:format("TPC-B: ~p (~p) transactions per second, " "duration of longest transaction was ~p milliseconds~n", [Tps, BruttoTps, Max div 1000]) end, State#reporter_state{prev_tps = Tps, prev_micros = Now}. signed_diff(Iters, Curr, Prev) -> case Iters > 1 of true -> sign(Curr - Prev); false -> sign(0) end. sign(N) when N > 0 -> {"+", N}; sign(N) -> {"", N}. now_to_micros({Mega, Secs, Micros}) -> DT = calendar:now_to_datetime({Mega, Secs, 0}), S = calendar:datetime_to_gregorian_seconds(DT), (S * ?SECOND) + Micros. start_drivers(RC, TC) -> LastHistoryId = table_info(history, size), Reuse = RC#run_config.reuse_history_id, DS = #driver_state{tab_config = TC, run_config = RC, n_local_branches = 0, local_branches = [], history_id = LastHistoryId, reuse_history_id = Reuse}, Nodes = RC#run_config.driver_nodes, NB = TC#tab_config.n_branches, First = 0, AllBranches = lists:seq(First, First + NB - 1), ND = RC#run_config.n_drivers_per_node, Spawn = fun(Spec) -> Node = Spec#driver_state.driver_node, spawn_link(Node, ?MODULE, driver_init, [Spec, AllBranches]) end, Specs = [DS#driver_state{driver_id = Id, driver_node = N} || N <- Nodes, Id <- lists:seq(1, ND)], Specs2 = lists:sort(lists:flatten(Specs)), {Specs3, OrphanBranches} = alloc_local_branches(AllBranches, Specs2, []), case length(OrphanBranches) of N when N =< 10 -> io:format("TPC-B: Orphan branches: ~p~n", [OrphanBranches]); N -> io:format("TPC-B: Orphan branches: ~p~n", [N]) end, [Spawn(Spec) || Spec <- Specs3]. alloc_local_branches([BranchId | Tail], Specs, OrphanBranches) -> Nodes = table_info({branch, BranchId}, where_to_write), LocalSpecs = [DS || DS <- Specs, lists:member(DS#driver_state.driver_node, Nodes)], case lists:keysort(#driver_state.n_local_branches, LocalSpecs) of [] -> alloc_local_branches(Tail, Specs, [BranchId | OrphanBranches]); [DS | _] -> LocalNB = DS#driver_state.n_local_branches + 1, LocalBranches = [BranchId | DS#driver_state.local_branches], DS2 = DS#driver_state{n_local_branches = LocalNB, local_branches = LocalBranches}, Specs2 = Specs -- [DS], Specs3 = [DS2 | Specs2], alloc_local_branches(Tail, Specs3, OrphanBranches) end; alloc_local_branches([], Specs, OrphanBranches) -> {Specs, OrphanBranches}. driver_init(DS, AllBranches) -> Seed = erlang:now(), DS2 = if DS#driver_state.n_local_branches =:= 0 -> DS#driver_state{seed = Seed, n_local_branches = length(AllBranches), local_branches = AllBranches}; true -> DS#driver_state{seed = Seed} end, io:format("TPC-B: Driver ~p started as ~p on node ~p with ~p local branches~n", [DS2#driver_state.driver_id, self(), node(), DS2#driver_state.n_local_branches]), driver_loop(DS2). driver_loop(DS) -> receive {From, report} -> From ! {self(), DS#driver_state.time}, Acc = add_time(DS#driver_state.time, DS#driver_state.acc_time), DS2 = DS#driver_state{time=#time{}, acc_time = Acc}, % Reset timer DS3 = calc_trans(DS2), driver_loop(DS3); {From, stop} -> Acc = add_time(DS#driver_state.time, DS#driver_state.acc_time), io:format("TPC-B: Driver ~p (~p) on node ~p stopped: ~w~n", [DS#driver_state.driver_id, self(), node(self()), Acc]), From ! {self(), DS#driver_state.time}, unlink(From), exit(stopped) after 0 -> DS2 = calc_trans(DS), driver_loop(DS2) end. calc_trans(DS) -> {Micros, DS2} = time_trans(DS), Time = DS2#driver_state.time, Time2 = Time#time{n_trans = Time#time.n_trans + 1, acc_time = Time#time.acc_time + Micros, max_time = lists:max([Micros, Time#time.max_time]) }, case DS#driver_state.reuse_history_id of false -> HistoryId = DS#driver_state.history_id + 1, DS2#driver_state{time=Time2, history_id = HistoryId}; true -> DS2#driver_state{time=Time2} end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Generate teller_id, account_id and delta %% Time the TPC-B transaction time_trans(DS) -> OldSeed = get(random_seed), % Avoid interference with Mnesia put(random_seed, DS#driver_state.seed), Random = random:uniform(), NewSeed = get(random_seed), case OldSeed of undefined -> erase(random_seed); _ -> put(random_seed, OldSeed) end, TC = DS#driver_state.tab_config, RC = DS#driver_state.run_config, {Branchid, Args} = random_to_args(Random, DS), {Fun, Mod} = trans_type(TC, RC), {Time, Res} = timer:tc(?MODULE, real_trans, [RC, Branchid, Fun, Args, Mod]), case Res of AccountBal when is_integer(AccountBal) -> {Time, DS#driver_state{seed = NewSeed}}; Other -> exit({crash, Other, Args, Random, DS}) end. random_to_args(Random, DS) -> DriverId = DS#driver_state.driver_id, TC = DS#driver_state.tab_config, HistoryId = DS#driver_state.history_id, Delta = trunc(Random * 1999998) - 999999, % -999999 <= Delta <= +999999 Branches = DS#driver_state.local_branches, NB = DS#driver_state.n_local_branches, NT = TC#tab_config.n_tellers_per_branch, NA = TC#tab_config.n_accounts_per_branch, Tmp = trunc(Random * NB * NT), BranchPos = (Tmp div NT) + 1, BranchId = case TC#tab_config.n_fragments of 0 -> BranchPos - 1; _ -> lists:nth(BranchPos, Branches) end, RelativeTellerId = Tmp div NT, TellerId = (BranchId * NT) + RelativeTellerId, {AccountBranchId, AccountId} = if Random >= 0.85, NB > 1 -> %% Pick from a remote account TmpAccountId= trunc(Random * (NB - 1) * NA), TmpAccountBranchId = TmpAccountId div NA, if TmpAccountBranchId =:= BranchId -> {TmpAccountBranchId + 1, TmpAccountId + NA}; true -> {TmpAccountBranchId, TmpAccountId} end; true -> %% Pick from a local account RelativeAccountId = trunc(Random * NA), TmpAccountId = (BranchId * NA) + RelativeAccountId, {BranchId, TmpAccountId} end, {BranchId, [DriverId, BranchId, TellerId, AccountBranchId, AccountId, HistoryId, Delta]}. real_trans(RC, BranchId, Fun, Args, Mod) -> Type = RC#run_config.activity_type, case RC#run_config.spawn_near_branch of false -> mnesia:activity(Type, Fun, Args, Mod); true -> Node = table_info({branch, BranchId}, where_to_read), case rpc:call(Node, mnesia, activity, [Type, Fun, Args, Mod]) of {badrpc, Reason} -> exit(Reason); Other -> Other end end. trans_type(TC, RC) -> if TC#tab_config.n_fragments =:= 0, RC#run_config.use_sticky_locks =:= false -> {fun add_delta/7, mnesia}; TC#tab_config.n_fragments =:= 0, RC#run_config.use_sticky_locks =:= true -> {fun sticky_add_delta/7, mnesia}; TC#tab_config.n_fragments > 0, RC#run_config.use_sticky_locks =:= false -> {fun frag_add_delta/7, mnesia_frag} end. %% %% Runs the TPC-B defined transaction and returns NewAccountBalance %% add_delta(DriverId, BranchId, TellerId, _AccountBranchId, AccountId, HistoryId, Delta) -> %% Grab write lock already when the record is read %% Add delta to branch balance [B] = mnesia:read(branch, BranchId, write), NewB = B#branch{balance = B#branch.balance + Delta}, ok = mnesia:write(branch, NewB, write), %% Add delta to teller balance [T] = mnesia:read(teller, TellerId, write), NewT = T#teller{balance = T#teller.balance + Delta}, ok = mnesia:write(teller, NewT, write), %% Add delta to account balance [A] = mnesia:read(account, AccountId, write), NewA = A#account{balance = A#account.balance + Delta}, ok = mnesia:write(account, NewA, write), %% Append to history log History = #history{history_id = {DriverId, HistoryId}, account_id = AccountId, teller_id = TellerId, branch_id = BranchId, amount = Delta }, ok = mnesia:write(history, History, write), %% Return account balance NewA#account.balance. sticky_add_delta(DriverId, BranchId, TellerId, _AccountBranchId, AccountId, HistoryId, Delta) -> %% Grab orinary read lock when the record is read %% Grab sticky write lock when the record is written %% This transaction would benefit of an early stick_write lock at read %% Add delta to branch balance [B] = mnesia:read(branch, BranchId, read), NewB = B#branch{balance = B#branch.balance + Delta}, ok = mnesia:write(branch, NewB, sticky_write), %% Add delta to teller balance [T] = mnesia:read(teller, TellerId, read), NewT = T#teller{balance = T#teller.balance + Delta}, ok = mnesia:write(teller, NewT, sticky_write), %% Add delta to account balance [A] = mnesia:read(account, AccountId, read), NewA = A#account{balance = A#account.balance + Delta}, ok = mnesia:write(account, NewA, sticky_write), %% Append to history log History = #history{history_id = {DriverId, HistoryId}, account_id = AccountId, teller_id = TellerId, branch_id = BranchId, amount = Delta }, ok = mnesia:write(history, History, sticky_write), %% Return account balance NewA#account.balance. frag_add_delta(DriverId, BranchId, TellerId, AccountBranchId, AccountId, HistoryId, Delta) -> %% Access fragmented table %% Grab write lock already when the record is read %% Add delta to branch balance [B] = mnesia:read(branch, BranchId, write), NewB = B#branch{balance = B#branch.balance + Delta}, ok = mnesia:write(NewB), %% Add delta to teller balance [T] = mnesia:read({teller, BranchId}, TellerId, write), NewT = T#teller{balance = T#teller.balance + Delta}, ok = mnesia:write(NewT), %% Add delta to account balance %%io:format("frag_add_delta(~p): ~p\n", [node(), {account, BranchId, AccountId}]), [A] = mnesia:read({account, AccountBranchId}, AccountId, write), NewA = A#account{balance = A#account.balance + Delta}, ok = mnesia:write(NewA), %% Append to history log History = #history{history_id = {DriverId, HistoryId}, account_id = AccountId, teller_id = TellerId, branch_id = BranchId, amount = Delta }, ok = mnesia:write(History), %% Return account balance NewA#account.balance. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Verify table consistency verify_tabs() -> Nodes = mnesia:system_info(running_db_nodes), case lists:member(node(), Nodes) of true -> Tabs = [branch, teller, account, history], io:format("TPC-B: Verifying tables: ~w~n", [Tabs]), rpc:multicall(Nodes, mnesia, wait_for_tables, [Tabs, infinity]), Fun = fun() -> mnesia:write_lock_table(branch), mnesia:write_lock_table(teller), mnesia:write_lock_table(account), mnesia:write_lock_table(history), {Res, BadNodes} = rpc:multicall(Nodes, ?MODULE, count_balance, []), check_balance(Res, BadNodes) end, case mnesia:transaction(Fun) of {atomic, Res} -> Res; {aborted, Reason} -> {error, Reason} end; false -> {error, "Must be initiated from a running db_node"} end. %% Returns a list of {Table, Node, Balance} tuples %% Assumes that no updates are performed -record(summary, {table, node, balance, size}). count_balance() -> [count_balance(branch, #branch.balance), count_balance(teller, #teller.balance), count_balance(account, #account.balance)]. count_balance(Tab, BalPos) -> Frags = table_info(Tab, frag_names), count_balance(Tab, Frags, 0, 0, BalPos). count_balance(Tab, [Frag | Frags], Bal, Size, BalPos) -> First = mnesia:dirty_first(Frag), {Bal2, Size2} = count_frag_balance(Frag, First, Bal, Size, BalPos), count_balance(Tab, Frags, Bal2, Size2, BalPos); count_balance(Tab, [], Bal, Size, _BalPos) -> #summary{table = Tab, node = node(), balance = Bal, size = Size}. count_frag_balance(_Frag, '$end_of_table', Bal, Size, _BalPos) -> {Bal, Size}; count_frag_balance(Frag, Key, Bal, Size, BalPos) -> [Record] = mnesia:dirty_read({Frag, Key}), Bal2 = Bal + element(BalPos, Record), Next = mnesia:dirty_next(Frag, Key), count_frag_balance(Frag, Next, Bal2, Size + 1, BalPos). check_balance([], []) -> mnesia:abort({"No balance"}); check_balance(Summaries, []) -> [One | Rest] = lists:flatten(Summaries), Balance = One#summary.balance, %% Size = One#summary.size, case [S || S <- Rest, S#summary.balance =/= Balance] of [] -> ok; BadSummaries -> mnesia:abort({"Bad balance", One, BadSummaries}) end; check_balance(_, BadNodes) -> mnesia:abort({"Bad nodes", BadNodes}).