diff options
Diffstat (limited to 'lib/mnesia/examples')
24 files changed, 3765 insertions, 0 deletions
diff --git a/lib/mnesia/examples/DATA b/lib/mnesia/examples/DATA new file mode 120000 index 0000000000..2c2314b960 --- /dev/null +++ b/lib/mnesia/examples/DATA @@ -0,0 +1 @@ +../doc/src/DATA
\ No newline at end of file diff --git a/lib/mnesia/examples/Makefile b/lib/mnesia/examples/Makefile new file mode 100644 index 0000000000..ff00ee76a5 --- /dev/null +++ b/lib/mnesia/examples/Makefile @@ -0,0 +1,103 @@ +# +# %CopyrightBegin% +# +# Copyright Ericsson AB 1996-2009. All Rights Reserved. +# +# The contents of this file are subject to the Erlang Public License, +# Version 1.1, (the "License"); you may not use this file except in +# compliance with the License. You should have received a copy of the +# Erlang Public License along with this software. If not, it can be +# retrieved online at http://www.erlang.org/. +# +# Software distributed under the License is distributed on an "AS IS" +# basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +# the License for the specific language governing rights and limitations +# under the License. +# +# %CopyrightEnd% +# + +# +include $(ERL_TOP)/make/target.mk +include $(ERL_TOP)/make/$(TARGET)/otp.mk + +# ---------------------------------------------------- +# Application version +# ---------------------------------------------------- +include ../vsn.mk +VSN=$(MNESIA_VSN) + +# ---------------------------------------------------- +# Release Macros +# ---------------------------------------------------- +RELSYSDIR = $(RELEASE_PATH)/lib/mnesia-$(VSN) + +# ---------------------------------------------------- +# Common Macros +# ---------------------------------------------------- + + +MODULES = \ + company \ + company_o \ + bup \ + mnesia_meter \ + mnesia_tpcb + +ERL_FILES= $(MODULES:=.erl) + +HRL_FILES = \ + company.hrl \ + company_o.hrl + +DATA_FILES = \ + DATA + +# TARGET_FILES= $(MODULES:%=$(EBIN)/%.$(EMULATOR)) +TARGET_FILES = + +# ---------------------------------------------------- +# FLAGS +# ---------------------------------------------------- +ERL_COMPILE_FLAGS += -pa ../ebin +EBIN = . + +# ---------------------------------------------------- +# Make Rules +# ---------------------------------------------------- +debug opt: $(TARGET_FILES) + +clean: + rm -f $(TARGET_FILES) *~ + +docs: + +# ---------------------------------------------------- +# Release Targets +# ---------------------------------------------------- +include $(ERL_TOP)/make/otp_release_targets.mk + +release_spec: opt + $(INSTALL_DIR) $(RELSYSDIR)/examples + $(INSTALL_DATA) $(ERL_FILES) $(DATA_FILES) $(HRL_FILES) $(RELSYSDIR)/examples + $(INSTALL_DIR) $(RELSYSDIR)/examples/bench + (cd bench; $(INSTALL_DATA) \ + Makefile \ + README \ + bench.erl \ + bench.hrl \ + bench_generate.erl \ + bench_populate.erl \ + bench_trans.erl \ + bench.config1 \ + bench.config2 \ + bench.config3 \ + bench.config4 \ + bench.config5 \ + bench.config6 \ + bench.config7 \ + $(RELSYSDIR)/examples/bench) + (cd bench; $(INSTALL_SCRIPT) bench.sh $(RELSYSDIR)/examples/bench) + +release_docs_spec: + diff --git a/lib/mnesia/examples/bench/Makefile b/lib/mnesia/examples/bench/Makefile new file mode 100644 index 0000000000..55621e8cf4 --- /dev/null +++ b/lib/mnesia/examples/bench/Makefile @@ -0,0 +1,10 @@ + +all: + erl -make + +clean: + rm *.beam + +test: + ./bench.sh bench.config* + diff --git a/lib/mnesia/examples/bench/README b/lib/mnesia/examples/bench/README new file mode 100644 index 0000000000..5d31b5ba25 --- /dev/null +++ b/lib/mnesia/examples/bench/README @@ -0,0 +1,211 @@ +Author : Hakan Mattsson <[email protected]> +Created : 21 Jun 2001 by Hakan Mattsson <[email protected]> + +This is an implementation of a real-time database benchmark +(LMC/UU-01:025), defined by Richard Trembley (LMC) and Miroslaw +Zakrzewski (LMC) . The implementation runs the benchmark on the Mnesia +DBMS which is a part of Erlang/OTP (www.erlang.org). + +The implementation is organized in the following parts: + + bench.erl - main API, startup and configuration + bench.hrl - record definitions + bench_populate.erl - create database and populate it with records + bench_trans.erl - the actual transactions to be benchmarked + bench_generate.erl - request generator, statistics computation + +Compile the files with: + + make all + +and run the benchmarks with: + + make test + +================================================================ + +The benchmark runs on a set of Erlang nodes which should reside on +one processor each. + +There are many options when running the benchmark. Benchmark +configuration parameters may either be stated in a configuration file +or as command line arguments in the Erlang shell. Erlang nodes may +either be started manually or automatically by the benchmark program. + +In its the most automated usage you only need to provide one or more +configuration files and run the + + bench.sh <ConfigFiles> + +script to start all Erlang nodes, populate the database and run the +actual benchmark for each one of the configuration files. The +benchmark results will be displayed at stdout. + +In order to be able to automatically start remote Erlang nodes, +you need to: + + - put the $ERL_TOP/bin directory in your path on all nodes + - bind IP adresses to hostnames (e.g via DNS or /etc/hosts) + - enable usage of rsh so it does not prompt for password + +If you cannot achieve this, it is possible to run the benchmark +anyway, but it requires more manual work to be done for each +execution of the benchmark. + +================================================================ + +For each configuration file given to the bench.sh script: + + - a brand new Erlang node is started + - the bench:run(['YourConfigFile']) function is invoked + - the Erlang node(s) are halted. + +Without arguments, the bench.sh simply starts an Erlang shell. +In that shell you have the ability to invoke Erlang functions, +such as bench:run/1. + +The bench:start_all/1 function analyzes the configuration, starts +all Erlang nodes necessary to perform the benchmark and starts +Mnesia on all these nodes. + +The bench:populate/1 function populates the database according +to the configuration and assumes that Mnesia is up and running +on all nodes. + +The bench:generate/1 function starts the actual benchmark +according to the configuration and assumes that Mnesia is +up and running and that the database is fully populated. +Given some arguments such as + + Args = ['YourConfigFile', {statistics_detail, debug}]. + +the invokation of + + bench:run(Args). + +is equivivalent with: + + SlaveNodes = bench:start_all(Args). + bench:populate(Args). + bench:generate(Args). + bench:stop_slave_nodes(SlaveNodes). + +In case you cannot get the automatic start of remote Erlang nodes to +work (implied by bench:start_all/1) , you may need to manually start +an Erlang node on each host (e.g. with bench.sh without arguments) and +then invoke bench:run/1 or its equivivalents on one of them. + +================================================================ + +The following configuration parameters are valid: + +generator_profile + + Selects the transaction profile of the benchmark. Must be one + of the following atoms: t1, t2, t3, t4, t5, ping, random. + Defaults to random which means that the t1 .. t5 transaction + types are randomly selected according to the benchmark spec. + The other choices means disables the random choice and selects + one particular transaction type to be run over and over again. + +generator_warmup + + Defines how long the request generators should "warm up" the + DBMS before the actual measurements are performed. The unit + is milliseconds and defaults to 2000 (2 seconds). + +generator_duration + + Defines the duration of the actual benchmark measurement activity. + The unit is milliseconds and defaults to 15000 (15 seconds). + +generator_cooldown + + Defines how long the request generators should "cool down" the + DBMS after the actual measurements has been performed. The unit + is milliseconds and defaults to 2000 (2 seconds). + +generator_nodes + + Defines which Erlang nodes that should host request generators. + The default is all connected nodes. + +n_generators_per_node + + Defines how many generator processes that should be running on + each generator node. The default is 2. + +statistics_detail + + Regulates the detail level of statistics. It must be one of the + following atoms: normal, debug and debug2. debug enables a + finer grain of statistics to be reported, but since it requires + more counters, to be updated by the generator processes it may + cause slightly worse benchmark performace figures than the brief + default case, that is normal. debug2 prints out the debug info + and formats it according to LMC's benchmark program. + +storage_type + + Defines whether the database should be kept solely in primary + memory (ram_copies), solely on disc (disc_only_copies) or + in both (disc_copies). The default is ram_copies. Currently + the other choices requires a little bit of manual preparation. + +table_nodes + + Defines which Erlang nodes that should host the tables. + +n_fragments + + Defines how many fragments each table should be divided in. + Default is 100. The fragments are evenly distributed over + all table nodes. The group table not devided in fragments. + +n_replicas + + Defines how many replicas that should be kept of each fragment. + The group table is replicated to all table nodes. + +n_subscribers + + Defines the number of subscriber records. Default 25000. + +n_subscribers + + Defines the number of subscriber records. Default 25000. + +n_groups + + Defines the number of group records. Default 5. + +n_servers + + Defines the number of server records. Default 1. + +write_lock_type + + Defines whether the transactions should use ordinary + write locks or if they utilize sticky write locks. + Must be one of the following atoms: write, sticky_write. + Default is write. + +use_binary_subscriber_key + + Defines whether the subscriber key should be represented + as a string (binary) or as an integer. Default is false. + +always_try_nearest_node + + The benchmark was initially written to test scalability + when more nodes were added to the database and when the + (fragmented) tables were distributed over all nodes. In + such a system the transactions should be evenly distributed + over all nodes. When this option is set to true it is possible + to make fair measurements of master/slave configurations, when + all transactions are performed on on one node. Default is false. + +cookie + + Defines which cookie the Erlang node should use in its + distribution protocol. Must be an atom, default is 'bench'. diff --git a/lib/mnesia/examples/bench/bench.config1 b/lib/mnesia/examples/bench/bench.config1 new file mode 100644 index 0000000000..e53ce51f63 --- /dev/null +++ b/lib/mnesia/examples/bench/bench.config1 @@ -0,0 +1,21 @@ +{cookie, bench_cookie}. +{generator_profile, random}. +{statistics_detail, debug}. +{generator_warmup, 120000}. +{generator_duration, 900000}. +{generator_cooldown, 120000}. +{generator_nodes, + [bench@wppgpb1 + ]}. +{use_binary_subscriber_key, false}. +{n_generators_per_node, 2}. +{write_lock_type, sticky_write}. +{table_nodes, + [bench@wppgpb1 + ]}. +{storage_type, ram_copies}. +{n_replicas, 1}. +{n_fragments, 100}. +{n_subscribers, 500000}. +{n_groups, 100}. +{n_servers, 20}. diff --git a/lib/mnesia/examples/bench/bench.config2 b/lib/mnesia/examples/bench/bench.config2 new file mode 100644 index 0000000000..f2f82f01fa --- /dev/null +++ b/lib/mnesia/examples/bench/bench.config2 @@ -0,0 +1,21 @@ +{cookie, bench_cookie}. +{generator_profile, random}. +{statistics_detail, debug}. +{generator_warmup, 120000}. +{generator_duration, 900000}. +{generator_cooldown, 120000}. +{generator_nodes, + [bench@wppgpb1 + ]}. +{use_binary_subscriber_key, false}. +{n_generators_per_node, 2}. +{write_lock_type, sticky_write}. +{table_nodes, + [bench@wppgpb2 + ]}. +{storage_type, ram_copies}. +{n_replicas, 1}. +{n_fragments, 100}. +{n_subscribers, 500000}. +{n_groups, 100}. +{n_servers, 20}. diff --git a/lib/mnesia/examples/bench/bench.config3 b/lib/mnesia/examples/bench/bench.config3 new file mode 100644 index 0000000000..c96e4531fd --- /dev/null +++ b/lib/mnesia/examples/bench/bench.config3 @@ -0,0 +1,23 @@ +{cookie, bench_cookie}. +{generator_profile, random}. +{statistics_detail, debug}. +{generator_warmup, 120000}. +{generator_duration, 900000}. +{generator_cooldown, 120000}. +{generator_nodes, + [bench@wppgpb1, + bench@wppgpb2 + ]}. +{use_binary_subscriber_key, false}. +{n_generators_per_node, 2}. +{write_lock_type, sticky_write}. +{table_nodes, + [bench@wppgpb3, + bench@wppgpb4 + ]}. +{storage_type, ram_copies}. +{n_replicas, 2}. +{n_fragments, 100}. +{n_subscribers, 500000}. +{n_groups, 100}. +{n_servers, 20}. diff --git a/lib/mnesia/examples/bench/bench.config4 b/lib/mnesia/examples/bench/bench.config4 new file mode 100644 index 0000000000..e7c0bf2151 --- /dev/null +++ b/lib/mnesia/examples/bench/bench.config4 @@ -0,0 +1,23 @@ +{cookie, bench_cookie}. +{generator_profile, random}. +{statistics_detail, debug}. +{generator_warmup, 120000}. +{generator_duration, 900000}. +{generator_cooldown, 120000}. +{generator_nodes, + [bench@wppgpb1, + bench@wppgpb2 + ]}. +{use_binary_subscriber_key, false}. +{n_generators_per_node, 2}. +{write_lock_type, sticky_write}. +{table_nodes, + [bench@wppgpb1, + bench@wppgpb2 + ]}. +{storage_type, ram_copies}. +{n_replicas, 2}. +{n_fragments, 100}. +{n_subscribers, 500000}. +{n_groups, 100}. +{n_servers, 20}. diff --git a/lib/mnesia/examples/bench/bench.config5 b/lib/mnesia/examples/bench/bench.config5 new file mode 100644 index 0000000000..623ec3fb73 --- /dev/null +++ b/lib/mnesia/examples/bench/bench.config5 @@ -0,0 +1,27 @@ +{cookie, bench_cookie}. +{generator_profile, random}. +{statistics_detail, debug}. +{generator_warmup, 120000}. +{generator_duration, 900000}. +{generator_cooldown, 120000}. +{generator_nodes, + [bench@wppgpb1, + bench@wppgpb2, + bench@wppgpb3, + bench@wppgpb4 + ]}. +{use_binary_subscriber_key, false}. +{n_generators_per_node, 2}. +{write_lock_type, sticky_write}. +{table_nodes, + [bench@wppgpb1, + bench@wppgpb2, + bench@wppgpb3, + bench@wppgpb4 + ]}. +{storage_type, ram_copies}. +{n_replicas, 2}. +{n_fragments, 100}. +{n_subscribers, 500000}. +{n_groups, 100}. +{n_servers, 20}. diff --git a/lib/mnesia/examples/bench/bench.config6 b/lib/mnesia/examples/bench/bench.config6 new file mode 100644 index 0000000000..f056890ff4 --- /dev/null +++ b/lib/mnesia/examples/bench/bench.config6 @@ -0,0 +1,27 @@ +{cookie, bench_cookie}. +{generator_profile, random}. +{statistics_detail, debug}. +{generator_warmup, 120000}. +{generator_duration, 900000}. +{generator_cooldown, 120000}. +{generator_nodes, + [bench@wppgpb1, + bench@wppgpb2, + bench@wppgpb3, + bench@wppgpb4 + ]}. +{use_binary_subscriber_key, false}. +{n_generators_per_node, 2}. +{write_lock_type, sticky_write}. +{table_nodes, + [bench@wppgpb5, + bench@wppgpb6, + bench@wppgpb7, + bench@wppgpb8 + ]}. +{storage_type, ram_copies}. +{n_replicas, 2}. +{n_fragments, 100}. +{n_subscribers, 500000}. +{n_groups, 100}. +{n_servers, 20}. diff --git a/lib/mnesia/examples/bench/bench.config7 b/lib/mnesia/examples/bench/bench.config7 new file mode 100644 index 0000000000..6a78570e71 --- /dev/null +++ b/lib/mnesia/examples/bench/bench.config7 @@ -0,0 +1,35 @@ +{cookie, bench_cookie}. +{generator_profile, random}. +{statistics_detail, debug}. +{generator_warmup, 120000}. +{generator_duration, 900000}. +{generator_cooldown, 120000}. +{generator_nodes, + [bench@wppgpb1, + bench@wppgpb2, + bench@wppgpb3, + bench@wppgpb4, + bench@wppgpb5, + bench@wppgpb6, + bench@wppgpb7, + bench@wppgpb8 + ]}. +{use_binary_subscriber_key, false}. +{n_generators_per_node, 2}. +{write_lock_type, sticky_write}. +{table_nodes, + [bench@wppgpb1, + bench@wppgpb2, + bench@wppgpb3, + bench@wppgpb4, + bench@wppgpb5, + bench@wppgpb6, + bench@wppgpb7, + bench@wppgpb8 + ]}. +{storage_type, ram_copies}. +{n_replicas, 2}. +{n_fragments, 100}. +{n_subscribers, 500000}. +{n_groups, 100}. +{n_servers, 20}. diff --git a/lib/mnesia/examples/bench/bench.erl b/lib/mnesia/examples/bench/bench.erl new file mode 100644 index 0000000000..d191169296 --- /dev/null +++ b/lib/mnesia/examples/bench/bench.erl @@ -0,0 +1,327 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2001-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% File : bench.hrl +%%% Author : Hakan Mattsson <[email protected]> +%%% Purpose : Implement the Canadian database benchmark (LMC/UU-01:025) +%%% Created : 21 Jun 2001 by Hakan Mattsson <[email protected]> +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-module(bench). +-author('[email protected]'). + +-include("bench.hrl"). + +-export([ + run/0, run/1, + + start_all/0, start_all/1, + populate/0, populate/1, + generate/0, generate/1, + + args_to_config/1, verify_config/2, + start/0, start/1, + stop_slave_nodes/1, + bind_schedulers/0 + ]). + +bind_schedulers() -> + try + %% Avoid first core and bind schedules to the remaining ones + Topo = erlang:system_info(cpu_topology), + erlang:system_flag(cpu_topology,lists:reverse(Topo)), + %% N = erlang:system_info(schedulers), + %% erlang:system_flag(schedulers_online, lists:max([N - 1, 1])), + erlang:system_flag(scheduler_bind_type, default_bind), + timer:sleep(timer:seconds(1)), % Wait for Rickard + erlang:system_info(scheduler_bindings) + catch _:_ -> + %% Ancient systems + ignore + end. + +%% Run the benchmark: +%% +%% - Start all necessary Erlang nodes +%% - Populate the database +%% - Start the traffic generators +%% - Calculate benchmark statistics +%% - Stop the temporary Erlang nodes +run() -> + FileName = "bench.config", + run([FileName]). + +run(Args) -> + C = args_to_config(Args), + SlaveNodes = start_all(C), + bench_populate:start(C), + Result = bench_generate:start(C), + stop_slave_nodes(SlaveNodes), + Result. + +%% Start Mnesia on the local node +start() -> + FileName = 'bench.config', + start([FileName]). + +start(Args) -> + C = args_to_config(Args), + erlang:set_cookie(node(), C#config.cookie), + Nodes = [node() | (((C#config.table_nodes -- C#config.generator_nodes) ++ + C#config.generator_nodes) -- [node()])], + Extra = [{extra_db_nodes, Nodes}], + ?d("Starting Mnesia on node ~p...", [node()]), + case mnesia:start(Extra) of + ok -> + Tables = mnesia:system_info(tables), + io:format(" ok.~n" , []), + ?d("Waiting for ~p tables...", [length(Tables)]), + wait(Tables); + {error, Reason} -> + io:format(" FAILED: ~p~n", [Reason]), + {error, Reason} + end. + +wait(Tables) -> + case mnesia:wait_for_tables(Tables, timer:seconds(10)) of + ok -> + io:format(" loaded.~n", []), + ok; + {timeout, More} -> + io:format(" ~p...", [length(More)]), + wait(More) + end. + +%% Populate the database +populate() -> + FileName = 'bench.config', + populate([FileName]). + +populate(Args) -> + C = args_to_config(Args), + bench_populate:start(C). + +%% Start the traffic generators +generate() -> + FileName = 'bench.config', + generate([FileName]). + +generate(Args) -> + C = args_to_config(Args), + bench_generate:start(C). + +start_all() -> + FileName = 'bench.config', + start_all([FileName]). + +start_all(Args) -> + C = args_to_config(Args), + Nodes = [node() | (((C#config.table_nodes -- C#config.generator_nodes) ++ + C#config.generator_nodes) -- [node()])], + erlang:set_cookie(node(), C#config.cookie), + ?d("Starting Erlang nodes...~n", []), + ?d("~n", []), + SlaveNodes = do_start_all(Nodes, [], C#config.cookie), + Extra = [{extra_db_nodes, Nodes}], + ?d("~n", []), + ?d("Starting Mnesia...", []), + case rpc:multicall(Nodes, mnesia, start, [Extra]) of + {Replies, []} -> + case [R || R <- Replies, R /= ok] of + [] -> + io:format(" ok~n", []), + SlaveNodes; + Bad -> + io:format(" FAILED: ~p~n", [Bad]), + exit({mnesia_start, Bad}) + end; + Bad -> + io:format(" FAILED: ~p~n", [Bad]), + exit({mnesia_start, Bad}) + end. + +do_start_all([Node | Nodes], Acc, Cookie) when is_atom(Node) -> + case string:tokens(atom_to_list(Node), [$@]) of + [Name, Host] -> + Arg = lists:concat(["-setcookie ", Cookie]), + ?d(" ~s", [left(Node)]), + case slave:start_link(Host, Name, Arg) of + {ok, Node} -> + load_modules(Node), + rpc:call(Node, ?MODULE, bind_schedulers, []), + io:format(" started~n", []), + do_start_all(Nodes, [Node | Acc], Cookie); + {error, {already_running, Node}} -> + rpc:call(Node, ?MODULE, bind_schedulers, []), + io:format(" already started~n", []), + do_start_all(Nodes, Acc, Cookie); + {error, Reason} -> + io:format(" FAILED:~p~n", [Reason]), + stop_slave_nodes(Acc), + exit({slave_start_failed, Reason}) + end; + _ -> + ?d(" ~s FAILED: " + "Not valid as node name. Must be 'name@host'.~n", + [left(Node)]), + stop_slave_nodes(Acc), + exit({bad_node_name, Node}) + end; +do_start_all([], StartedNodes, _Cookie) -> + StartedNodes. + +load_modules(Node) -> + Fun = + fun(Mod) -> + case code:get_object_code(Mod) of + {_Module, Bin, Fname} -> + rpc:call(Node, code,load_binary,[Mod,Fname,Bin]); + Other -> + Other + end + end, + lists:foreach(Fun, [bench, bench_generate, bench_populate, bench_trans]). + +stop_slave_nodes([]) -> + ok; +stop_slave_nodes(Nodes) -> + ?d("~n", []), + ?d("Stopping Erlang nodes...~n", []), + ?d("~n", []), + do_stop_slave_nodes(Nodes). + +do_stop_slave_nodes([Node | Nodes]) -> + ?d(" ~s", [left(Node)]), + Res = slave:stop(Node), + io:format(" ~p~n", [Res]), + do_stop_slave_nodes(Nodes); +do_stop_slave_nodes([]) -> + ok. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% The configuration +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +args_to_config(C) when is_record(C, config) -> + C; +args_to_config(Args) when is_list(Args) -> + do_args_to_config(Args, []). + +do_args_to_config([{Key, Val} | Rest], Acc) when is_list(Acc) -> + do_args_to_config(Rest, Acc ++ [{Key, Val}]); +do_args_to_config([FileName | Rest], Acc) when is_list(Acc) -> + io:nl(), + ?d("Reading configuration file ~p...", [FileName]), + case file:consult(FileName) of + {ok, Config} -> + io:format(" ok~n", []), + do_args_to_config(Rest, Acc ++ Config); + {error, Reason} -> + io:format(" FAILED: ~s~n", + [[lists:flatten(file:format_error( Reason))]]), + {error, {args_to_config, FileName, Reason}} + end; +do_args_to_config([], Acc) when is_list(Acc) -> + verify_config(Acc, #config{}). + +verify_config([{Tag, Val} | T], C) -> + case Tag of + cookie when is_atom(Val) -> + verify_config(T, C#config{cookie = Val}); + generator_profile when Val == random -> + verify_config(T, C#config{generator_profile = Val}); + generator_profile when Val == t1 -> + verify_config(T, C#config{generator_profile = Val}); + generator_profile when Val == t2 -> + verify_config(T, C#config{generator_profile = Val}); + generator_profile when Val == t3 -> + verify_config(T, C#config{generator_profile = Val}); + generator_profile when Val == t4 -> + verify_config(T, C#config{generator_profile = Val}); + generator_profile when Val == t5 -> + verify_config(T, C#config{generator_profile = Val}); + generator_profile when Val == ping -> + verify_config(T, C#config{generator_profile = Val}); + generator_nodes when is_list(Val) -> + verify_config(T, C#config{generator_nodes = Val}); + n_generators_per_node when is_integer(Val), Val >= 0 -> + verify_config(T, C#config{n_generators_per_node = Val}); + generator_warmup when is_integer(Val), Val >= 0 -> + verify_config(T, C#config{generator_warmup = Val}); + generator_duration when is_integer(Val), Val >= 0 -> + verify_config(T, C#config{generator_duration = Val}); + generator_cooldown when is_integer(Val), Val >= 0 -> + verify_config(T, C#config{generator_cooldown = Val}); + statistics_detail when Val == debug -> + verify_config(T, C#config{statistics_detail = Val}); + statistics_detail when Val == debug2 -> + verify_config(T, C#config{statistics_detail = Val}); + statistics_detail when Val == normal -> + verify_config(T, C#config{statistics_detail = Val}); + table_nodes when is_list(Val) -> + verify_config(T, C#config{table_nodes = Val}); + use_binary_subscriber_key when Val == true -> + verify_config(T, C#config{use_binary_subscriber_key = Val}); + use_binary_subscriber_key when Val == false -> + verify_config(T, C#config{use_binary_subscriber_key = Val}); + storage_type when is_atom(Val) -> + verify_config(T, C#config{storage_type = Val}); + write_lock_type when Val == sticky_write -> + verify_config(T, C#config{write_lock_type = Val}); + write_lock_type when Val == write -> + verify_config(T, C#config{write_lock_type = Val}); + n_replicas when is_integer(Val), Val >= 0 -> + verify_config(T, C#config{n_replicas = Val}); + n_fragments when is_integer(Val), Val >= 0 -> + verify_config(T, C#config{n_fragments = Val}); + n_subscribers when is_integer(Val), Val >= 0 -> + verify_config(T, C#config{n_subscribers = Val}); + n_groups when is_integer(Val), Val >= 0 -> + verify_config(T, C#config{n_groups = Val}); + n_servers when is_integer(Val), Val >= 0 -> + verify_config(T, C#config{n_servers = Val}); + always_try_nearest_node when Val == true; Val == false -> + verify_config(T, C#config{always_try_nearest_node = Val}); + _ -> + ?e("Bad config value: ~p~n", [Tag, Val]), + exit({bad_config_value, {Tag, Val}}) + end; +verify_config([], C) -> + display_config(C), + C; +verify_config(Config, _) -> + ?e("Bad config: ~p~n", [Config]), + exit({bad_config, Config}). + +display_config(C) when is_record(C, config) -> + ?d("~n", []), + ?d("Actual configuration...~n", []), + ?d("~n", []), + Fields = record_info(fields, config), + [config | Values] = tuple_to_list(C), + display_config(Fields, Values). + +display_config([F | Fields], [V | Values]) -> + ?d(" ~s ~p~n", [left(F), V]), + display_config(Fields, Values); +display_config([], []) -> + ?d("~n", []), + ok. + +left(Term) -> + string:left(lists:flatten(io_lib:format("~p", [Term])), 27, $.). diff --git a/lib/mnesia/examples/bench/bench.hrl b/lib/mnesia/examples/bench/bench.hrl new file mode 100644 index 0000000000..7b0e0c1280 --- /dev/null +++ b/lib/mnesia/examples/bench/bench.hrl @@ -0,0 +1,107 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2001-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% File : bench.hrl +%%% Author : Hakan Mattsson <[email protected]> +%%% Purpose : Define various database records +%%% Created : 21 Jun 2001 by Hakan Mattsson <[email protected]> +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(config, + { + generator_profile = random, + generator_warmup = timer:seconds(2), + generator_duration = timer:seconds(15), + generator_cooldown = timer:seconds(2), + generator_nodes = [node() | nodes()], + statistics_detail = debug, + n_generators_per_node = 1, + write_lock_type = sticky_write, + table_nodes = [node() | nodes()], + storage_type = ram_copies, + n_subscribers = 25000, + n_groups = 5, + n_servers = 1, + n_replicas = 1, + n_fragments = 100, + use_binary_subscriber_key = false, + always_try_nearest_node = false, + cookie = 'bench' + }). + +-record(subscriber, + { + subscriber_number, % string (10 chars) + subscriber_name, % string (32 chars) + group_id, % integer (uint32) + location, % integer (uint32) + active_sessions, % array of 32 booleans (32 bits) + changed_by, % string (25 chars) + changed_time, % string (25 chars) + suffix + }). + +-record(group, + { + group_id, % integer (uint32) + group_name, % string (32 chars) + allow_read, % array of 32 booleans (32 bits) + allow_insert, % array of 32 booleans (32 bits) + allow_delete % array of 32 booleans (32 bits) + }). + +-record(server, + { + server_key, % {ServerId, SubscriberNumberSuffix} + server_name, % string (32 chars) + no_of_read, % integer (uint32) + no_of_insert, % integer (uint32) + no_of_delete, % integer (uint32) + suffix + }). + +-record(session, + { + session_key, % {SubscriberNumber, ServerId} + session_details, % string (4000 chars) + suffix + }). + +-define(d(Format, Args), + io:format("~s" ++ Format, [string:left(lists:flatten(io_lib:format("~p(~p):", [?MODULE, ?LINE])), 30, $ ) | Args])). + +-define(e(Format, Args), + begin + ok = error_logger:format("~p(~p): " ++ Format, [?MODULE, ?LINE | Args]), + timer:sleep(1000) + end). + +-define(ERROR(M, F, A, R), + ?e("~w:~w~p\n\t ->~p\n", [M, F, A, R])). + +-define(APPLY(M, F, A), + fun() -> + case catch apply(M, F, A) of + ok -> {ok, ok}; + {atomic, R} -> {ok, R}; + {ok, R} -> {ok, R}; + {aborted, R} -> ?ERROR(M, F, A, R); + {error, R} -> ?ERROR(M, F, A, R); + R -> ?ERROR(M, F, A, R) + end + end()). diff --git a/lib/mnesia/examples/bench/bench.sh b/lib/mnesia/examples/bench/bench.sh new file mode 100755 index 0000000000..1f8b5eec52 --- /dev/null +++ b/lib/mnesia/examples/bench/bench.sh @@ -0,0 +1,23 @@ +#!/bin/sh +# Author : Hakan Mattsson <[email protected]> +# Purpose : Simplify benchmark execution +# Created : 21 Jun 2001 by Hakan Mattsson <[email protected]> +###################################################################### + +args="-pa .. -boot start_sasl -sasl errlog_type error -sname bench" +set -x + +if [ $# -eq 0 ] ; then + + erl $args + +else + + while [ $# -gt 0 ]; do + + erl $args -s bench run $1 -s erlang halt + shift + done + +fi + diff --git a/lib/mnesia/examples/bench/bench_generate.erl b/lib/mnesia/examples/bench/bench_generate.erl new file mode 100644 index 0000000000..0fccc6c082 --- /dev/null +++ b/lib/mnesia/examples/bench/bench_generate.erl @@ -0,0 +1,684 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2001-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% File : bench_generate.hrl +%%% Author : Hakan Mattsson <[email protected]> +%%% Purpose : Start request generators and collect statistics +%%% Created : 21 Jun 2001 by Hakan Mattsson <[email protected]> +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-module(bench_generate). +-author('[email protected]'). + +-include("bench.hrl"). + +%% Public +-export([start/1]). + +%% Internal +-export([ + monitor_init/2, + generator_init/2, + worker_init/1 + ]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% The traffic generator +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% ------------------------------------------------------------------- +%% Start request generators +%% ------------------------------------------------------------------- + +start(C) when is_record(C, config) -> + MonPid = spawn_link(?MODULE, monitor_init, [C, self()]), + receive + {'EXIT', MonPid, Reason} -> + exit(Reason); + {monitor_done, MonPid, Res} -> + Res + end. + +monitor_init(C, Parent) when is_record(C, config) -> + process_flag(trap_exit, true), + %% net_kernel:monitor_nodes(true), %% BUGBUG: Needed in order to re-start generators + Nodes = C#config.generator_nodes, + PerNode = C#config.n_generators_per_node, + Timer = C#config.generator_warmup, + ?d("~n", []), + ?d("Start ~p request generators each at ~p nodes...~n", + [PerNode, length(Nodes)]), + ?d("~n", []), + warmup_sticky(C), + ?d(" ~p seconds warmup...~n", [Timer div 1000]), + Alive = spawn_generators(C, Nodes, PerNode), + erlang:send_after(Timer, self(), warmup_done), + monitor_loop(C, Parent, Alive, []). + +spawn_generators(C, Nodes, PerNode) -> + [spawn_link(Node, ?MODULE, generator_init, [self(), C]) || + Node <- Nodes, + _ <- lists:seq(1, PerNode)]. + +warmup_sticky(C) -> + %% Select one node per fragment as master node + Tabs = [subscriber, session, server, suffix], + Fun = fun(S) -> + {[Node | _], _, Wlock} = nearest_node(S, transaction, C), + Stick = fun() -> [mnesia:read({T, S}, S, Wlock) || T <- Tabs] end, + Args = [transaction, Stick, [], mnesia_frag], + rpc:call(Node, mnesia, activity, Args) + end, + Suffixes = lists:seq(0, C#config.n_fragments - 1), % Assume even distrib. + lists:foreach(Fun, Suffixes). + +%% Main loop for benchmark monitor +monitor_loop(C, Parent, Alive, Deceased) -> + receive + warmup_done -> + multicall(Alive, reset_statistics), + Timer = C#config.generator_duration, + ?d(" ~p seconds actual benchmarking...~n", [Timer div 1000]), + erlang:send_after(Timer, self(), measurement_done), + monitor_loop(C, Parent, Alive, Deceased); + measurement_done -> + Stats = multicall(Alive, get_statistics), + Timer = C#config.generator_cooldown, + ?d(" ~p seconds cooldown...~n", [Timer div 1000]), + erlang:send_after(Timer, self(), {cooldown_done, Stats}), + monitor_loop(C, Parent, Alive, Deceased); + {cooldown_done, Stats} -> + multicall(Alive, stop), + display_statistics(Stats, C), + Parent ! {monitor_done, self(), ok}, + unlink(Parent), + exit(monitor_done); + {nodedown, _Node} -> + monitor_loop(C, Parent, Alive, Deceased); + {nodeup, Node} -> + NeedsBirth = [N || N <- Deceased, N == Node], + Born = spawn_generators(C, NeedsBirth, 1), + monitor_loop(C, Parent, Born ++ Alive, Deceased -- NeedsBirth); + {'EXIT', Pid, Reason} when Pid == Parent -> + exit(Reason); + {'EXIT', Pid, Reason} -> + case lists:member(Pid, Alive) of + true -> + ?d("Generator on node ~p died: ~p~n", [node(Pid), Reason]), + monitor_loop(C, Parent, Alive -- [Pid], [node(Pid) | Deceased]); + false -> + monitor_loop(C, Parent, Alive, Deceased) + end + end. + +%% Send message to a set of processes and wait for their replies +multicall(Pids, Message) -> + Send = + fun(Pid) -> + Ref = erlang:monitor(process, Pid), + Pid ! {self(), Ref, Message}, + {Pid, Ref} + end, + PidRefs = lists:map(Send, Pids), + Collect = + fun({Pid, Ref}) -> + receive + {'DOWN', Ref, process, Pid, Reason} -> + {Pid, {'EXIT', Reason}}; + {Pid, Ref, Reply} -> + erlang:demonitor(Ref), + {Pid, Reply} + end + end, + lists:map(Collect, PidRefs). + +%% Initialize a traffic generator +generator_init(Monitor, C) -> + process_flag(trap_exit, true), + Tables = mnesia:system_info(tables), + ok = mnesia:wait_for_tables(Tables, infinity), + {_Mega, Sec, Micro} = erlang:now(), + Uniq = lists:sum(binary_to_list(term_to_binary(make_ref()))), + random:seed(Uniq, Sec, Micro), + Counters = reset_counters(C, C#config.statistics_detail), + SessionTab = ets:new(bench_sessions, [public, {keypos, 1}]), + generator_loop(Monitor, C, SessionTab, Counters). + +%% Main loop for traffic generator +generator_loop(Monitor, C, SessionTab, Counters) -> + receive + {ReplyTo, Ref, get_statistics} -> + Stats = get_counters(C, Counters), + ReplyTo ! {self(), Ref, Stats}, + generator_loop(Monitor, C, SessionTab, Counters); + {ReplyTo, Ref, reset_statistics} -> + Stats = get_counters(C, Counters), + Counters2 = reset_counters(C, Counters), + ReplyTo ! {self(), Ref, Stats}, + generator_loop(Monitor, C, SessionTab, Counters2); + {_ReplyTo, _Ref, stop} -> + exit(shutdown); + {'EXIT', Pid, Reason} when Pid == Monitor -> + exit(Reason); + {'EXIT', Pid, Reason} -> + Node = node(Pid), + ?d("Worker on node ~p(~p) died: ~p~n", [Node, node(), Reason]), + Key = {worker,Node}, + case get(Key) of + undefined -> ignore; + Pid -> erase(Key); + _ -> ignore + end, + generator_loop(Monitor, C, SessionTab, Counters) + after 0 -> + {Name, {Nodes, Activity, Wlock}, Fun, CommitSessions} = + gen_trans(C, SessionTab), + Before = erlang:now(), + Res = call_worker(Nodes, Activity, Fun, Wlock, mnesia_frag), + After = erlang:now(), + Elapsed = elapsed(Before, After), + post_eval(Monitor, C, Elapsed, Res, Name, CommitSessions, SessionTab, Counters) + end. + +%% Perform a transaction on a node near the data +call_worker([Node | _], Activity, Fun, Wlock, Mod) when Node == node() -> + {Node, catch mnesia:activity(Activity, Fun, [Wlock], Mod)}; +call_worker([Node | _] = Nodes, Activity, Fun, Wlock, Mod) -> + Key = {worker,Node}, + case get(Key) of + Pid when is_pid(Pid) -> + Args = [Activity, Fun, [Wlock], Mod], + Pid ! {activity, self(), Args}, + receive + {'EXIT', Pid, Reason} -> + ?d("Worker on node ~p(~p) died: ~p~n", [Node, node(), Reason]), + erase(Key), + retry_worker(Nodes, Activity, Fun, Wlock, Mod, {'EXIT', Reason}); + {activity_result, Pid, Result} -> + case Result of + {'EXIT', {aborted, {not_local, _}}} -> + retry_worker(Nodes, Activity, Fun, Wlock, Mod, Result); + _ -> + {Node, Result} + end + end; + undefined -> + GenPid = self(), + Pid = spawn_link(Node, ?MODULE, worker_init, [GenPid]), + put(Key, Pid), + call_worker(Nodes, Activity, Fun, Wlock, Mod) + end. + +retry_worker([], _Activity, _Fun, _Wlock, _Mod, Reason) -> + {node(), Reason}; +retry_worker([BadNode | SpareNodes], Activity, Fun, Wlock, Mod, Reason) -> + Nodes = SpareNodes -- [BadNode], + case Nodes of + [] -> + {BadNode, Reason}; + [_] -> + call_worker(Nodes, Activity, Fun, write, Mod); + _ -> + call_worker(Nodes, Activity, Fun, Wlock, Mod) + end. + +worker_init(Parent) -> + Tables = mnesia:system_info(tables), + ok = mnesia:wait_for_tables(Tables, infinity), + worker_loop(Parent). + +%% Main loop for remote workers +worker_loop(Parent) -> + receive + {activity, Parent, [Activity, Fun, Extra, Mod]} -> + Result = (catch mnesia:activity(Activity, Fun, Extra, Mod)), + Parent ! {activity_result, self(), Result}, + worker_loop(Parent) + end. + + +elapsed({Before1, Before2, Before3}, {After1, After2, After3}) -> + After = After1 * 1000000000000 + After2 * 1000000 + After3, + Before = Before1 * 1000000000000 + Before2 * 1000000 + Before3, + After - Before. + +%% Lookup counters +get_counters(_C, {table, Tab}) -> + ets:match_object(Tab, '_'); +get_counters(_C, {NM, NC, NA, NB}) -> + Trans = any, + Node = somewhere, + [{{Trans, n_micros, Node}, NM}, + {{Trans, n_commits, Node}, NC}, + {{Trans, n_aborts, Node}, NA}, + {{Trans, n_branches_executed, Node}, NB}]. + +% Clear all counters +reset_counters(_C, normal) -> + {0, 0, 0, 0}; +reset_counters(C, {_, _, _, _}) -> + reset_counters(C, normal); +reset_counters(C, debug) -> + CounterTab = ets:new(bench_pending, [public, {keypos, 1}]), + reset_counters(C, {table, CounterTab}); +reset_counters(C, debug2) -> + CounterTab = ets:new(bench_pending, [public, {keypos, 1}]), + reset_counters(C, {table, CounterTab}); +reset_counters(C, {table, Tab} = Counters) -> + Names = [n_micros, n_commits, n_aborts, n_branches_executed], + Nodes = C#config.generator_nodes ++ C#config.table_nodes, + TransTypes = [t1, t2, t3, t4, t5, ping], + [ets:insert(Tab, {{Trans, Name, Node}, 0}) || Name <- Names, + Node <- Nodes, + Trans <- TransTypes], + Counters. + +%% Determine the outcome of a transaction and increment the counters +post_eval(Monitor, C, Elapsed, {Node, Res}, Name, CommitSessions, SessionTab, {table, Tab} = Counters) -> + case Res of + {do_commit, BranchExecuted, _} -> + incr(Tab, {Name, n_micros, Node}, Elapsed), + incr(Tab, {Name, n_commits, Node}, 1), + case BranchExecuted of + true -> + incr(Tab, {Name, n_branches_executed, Node}, 1), + commit_session(CommitSessions), + generator_loop(Monitor, C, SessionTab, Counters); + false -> + generator_loop(Monitor, C, SessionTab, Counters) + end; + {'EXIT', {aborted, {do_rollback, BranchExecuted, _}}} -> + incr(Tab, {Name, n_micros, Node}, Elapsed), + incr(Tab, {Name, n_aborts, Node}, 1), + case BranchExecuted of + true -> + incr(Tab, {Name, n_branches_executed, Node}, 1), + generator_loop(Monitor, C, SessionTab, Counters); + false -> + generator_loop(Monitor, C, SessionTab, Counters) + end; + _ -> + ?d("Failed(~p): ~p~n", [Node, Res]), + incr(Tab, {Name, n_micros, Node}, Elapsed), + incr(Tab, {Name, n_aborts, Node}, 1), + generator_loop(Monitor, C, SessionTab, Counters) + end; +post_eval(Monitor, C, Elapsed, {_Node, Res}, _Name, CommitSessions, SessionTab, {NM, NC, NA, NB}) -> + case Res of + {do_commit, BranchExecuted, _} -> + case BranchExecuted of + true -> + commit_session(CommitSessions), + generator_loop(Monitor, C, SessionTab, {NM + Elapsed, NC + 1, NA, NB + 1}); + false -> + generator_loop(Monitor, C, SessionTab, {NM + Elapsed, NC + 1, NA, NB}) + end; + {'EXIT', {aborted, {do_rollback, BranchExecuted, _}}} -> + case BranchExecuted of + true -> + generator_loop(Monitor, C, SessionTab, {NM + Elapsed, NC, NA + 1, NB + 1}); + false -> + generator_loop(Monitor, C, SessionTab, {NM + Elapsed, NC, NA + 1, NB}) + end; + _ -> + ?d("Failed: ~p~n", [Res]), + generator_loop(Monitor, C, SessionTab, {NM + Elapsed, NC, NA + 1, NB}) + end. + +incr(Tab, Counter, Incr) -> + ets:update_counter(Tab, Counter, Incr). + +commit_session(no_fun) -> + ignore; +commit_session(Fun) when is_function(Fun, 0) -> + Fun(). + +%% Randlomly choose a transaction type according to benchmar spec +gen_trans(C, SessionTab) when C#config.generator_profile == random -> + case random:uniform(100) of + Rand when Rand > 0, Rand =< 25 -> gen_t1(C, SessionTab); + Rand when Rand > 25, Rand =< 50 -> gen_t2(C, SessionTab); + Rand when Rand > 50, Rand =< 70 -> gen_t3(C, SessionTab); + Rand when Rand > 70, Rand =< 85 -> gen_t4(C, SessionTab); + Rand when Rand > 85, Rand =< 100 -> gen_t5(C, SessionTab) + end; +gen_trans(C, SessionTab) -> + case C#config.generator_profile of + t1 -> gen_t1(C, SessionTab); + t2 -> gen_t2(C, SessionTab); + t3 -> gen_t3(C, SessionTab); + t4 -> gen_t4(C, SessionTab); + t5 -> gen_t5(C, SessionTab); + ping -> gen_ping(C, SessionTab) + end. + +gen_t1(C, _SessionTab) -> + SubscrId = random:uniform(C#config.n_subscribers) - 1, + SubscrKey = bench_trans:number_to_key(SubscrId, C), + Location = 4711, + ChangedBy = <<4711:(8*25)>>, + ChangedTime = <<4711:(8*25)>>, + {t1, + nearest_node(SubscrId, transaction, C), + fun(Wlock) -> bench_trans:update_current_location(Wlock, SubscrKey, Location, ChangedBy, ChangedTime) end, + no_fun + }. + +gen_t2(C, _SessionTab) -> + SubscrId = random:uniform(C#config.n_subscribers) - 1, + SubscrKey = bench_trans:number_to_key(SubscrId, C), + {t2, + nearest_node(SubscrId, sync_dirty, C), + %%nearest_node(SubscrId, transaction, C), + fun(Wlock) -> bench_trans:read_current_location(Wlock, SubscrKey) end, + no_fun + }. + +gen_t3(C, SessionTab) -> + case ets:first(SessionTab) of + '$end_of_table' -> + %% This generator does not have any session, + %% try reading someone elses session details + SubscrId = random:uniform(C#config.n_subscribers) - 1, + SubscrKey = bench_trans:number_to_key(SubscrId, C), + ServerId = random:uniform(C#config.n_servers) - 1, + ServerBit = 1 bsl ServerId, + {t3, + nearest_node(SubscrId, transaction, C), + fun(Wlock) -> bench_trans:read_session_details(Wlock, SubscrKey, ServerBit, ServerId) end, + no_fun + }; + {SubscrId, SubscrKey, ServerId} -> + %% This generator do have a session, + %% read its session details + ServerBit = 1 bsl ServerId, + {t3, + nearest_node(SubscrId, transaction, C), + fun(Wlock) -> bench_trans:read_session_details(Wlock, SubscrKey, ServerBit, ServerId) end, + no_fun + } + end. + +gen_t4(C, SessionTab) -> + %% This generator may already have sessions, + %% create a new session and hope that no other + %% generator already has occupied it + SubscrId = random:uniform(C#config.n_subscribers) - 1, + SubscrKey = bench_trans:number_to_key(SubscrId, C), + ServerId = random:uniform(C#config.n_servers) - 1, + ServerBit = 1 bsl ServerId, + Details = <<4711:(8*2000)>>, + DoRollback = (random:uniform(100) =< 2), + Insert = fun() -> ets:insert(SessionTab, {{SubscrId, SubscrKey, ServerId}, self()}) end, + {t4, + nearest_node(SubscrId, transaction, C), + fun(Wlock) -> bench_trans:create_session_to_server(Wlock, SubscrKey, ServerBit, ServerId, Details, DoRollback) end, + Insert + }. + +gen_t5(C, SessionTab) -> + case ets:first(SessionTab) of + '$end_of_table' -> + %% This generator does not have any session, + %% try to delete someone elses session details + SubscrId = random:uniform(C#config.n_subscribers) - 1, + SubscrKey = bench_trans:number_to_key(SubscrId, C), + ServerId = random:uniform(C#config.n_servers) - 1, + ServerBit = 1 bsl ServerId, + DoRollback = (random:uniform(100) =< 2), + {t5, + nearest_node(SubscrId, transaction, C), + fun(Wlock) -> bench_trans:delete_session_from_server(Wlock, SubscrKey, ServerBit, ServerId, DoRollback) end, + no_fun + }; + {SubscrId, SubscrKey, ServerId} -> + %% This generator do have at least one session, + %% delete it. + ServerBit = 1 bsl ServerId, + DoRollback = (random:uniform(100) =< 2), + Delete = fun() -> ets:delete(SessionTab, {SubscrId, SubscrKey, ServerId}) end, + {t5, + nearest_node(SubscrId, transaction, C), + fun(Wlock) -> bench_trans:delete_session_from_server(Wlock, SubscrKey, ServerBit, ServerId, DoRollback) end, + Delete + } + end. + +gen_ping(C, _SessionTab) -> + SubscrId = random:uniform(C#config.n_subscribers) - 1, + {ping, + nearest_node(SubscrId, transaction, C), + fun(_Wlock) -> {do_commit, true, []} end, + no_fun + }. + +%% Select a node as near as the subscriber data as possible +nearest_node(SubscrId, Activity, C) -> + Suffix = bench_trans:number_to_suffix(SubscrId), + case mnesia_frag:table_info(t, s, {suffix, Suffix}, where_to_write) of + [] -> + {[node()], Activity, write}; + [Node] -> + {[Node], Activity, write}; + Nodes -> + Wlock = C#config.write_lock_type, + if + C#config.always_try_nearest_node; Wlock =:= write -> + case lists:member(node(), Nodes) of + true -> + {[node() | Nodes], Activity, Wlock}; + false -> + Node = pick_node(Suffix, C, Nodes), + {[Node | Nodes], Activity, Wlock} + end; + Wlock == sticky_write -> + Node = pick_node(Suffix, C, Nodes), + {[Node | Nodes], Activity, Wlock} + end + end. + +pick_node(Suffix, C, Nodes) -> + Ordered = lists:sort(Nodes), + NumberOfActive = length(Ordered), + PoolSize = length(C#config.table_nodes), + Suffix2 = + case PoolSize rem NumberOfActive of + 0 -> Suffix div (PoolSize div NumberOfActive); + _ -> Suffix + end, + N = (Suffix2 rem NumberOfActive) + 1, + lists:nth(N, Ordered). + +display_statistics(Stats, C) -> + GoodStats = [{node(GenPid), GenStats} || {GenPid, GenStats} <- Stats, + is_list(GenStats)], + FlatStats = [{Type, Name, EvalNode, GenNode, Count} || + {GenNode, GenStats} <- GoodStats, + {{Type, Name, EvalNode}, Count} <- GenStats], + TotalStats = calc_stats_per_tag(lists:keysort(2, FlatStats), 2, []), + {value, {n_aborts, 0, NA, 0, 0}} = + lists:keysearch(n_aborts, 1, TotalStats ++ [{n_aborts, 0, 0, 0, 0}]), + {value, {n_commits, NC, 0, 0, 0}} = + lists:keysearch(n_commits, 1, TotalStats ++ [{n_commits, 0, 0, 0, 0}]), + {value, {n_branches_executed, 0, 0, _NB, 0}} = + lists:keysearch(n_branches_executed, 1, TotalStats ++ [{n_branches_executed, 0, 0, 0, 0}]), + {value, {n_micros, 0, 0, 0, AccMicros}} = + lists:keysearch(n_micros, 1, TotalStats ++ [{n_micros, 0, 0, 0, 0}]), + NT = NA + NC, + NG = length(GoodStats), + NTN = length(C#config.table_nodes), + WallMicros = C#config.generator_duration * 1000 * NG, + Overhead = (catch (WallMicros - AccMicros) / WallMicros), + ?d("~n", []), + ?d("Benchmark result...~n", []), + ?d("~n", []), + ?d(" ~p transactions per second (TPS).~n", [catch ((NT * 1000000 * NG) div AccMicros)]), + ?d(" ~p TPS per table node.~n", [catch ((NT * 1000000 * NG) div (AccMicros * NTN))]), + ?d(" ~p micro seconds in average per transaction, including latency.~n", + [catch (AccMicros div NT)]), + ?d(" ~p transactions. ~f% generator overhead.~n", [NT, Overhead * 100]), + + TypeStats = calc_stats_per_tag(lists:keysort(1, FlatStats), 1, []), + EvalNodeStats = calc_stats_per_tag(lists:keysort(3, FlatStats), 3, []), + GenNodeStats = calc_stats_per_tag(lists:keysort(4, FlatStats), 4, []), + if + C#config.statistics_detail == normal -> + ignore; + true -> + ?d("~n", []), + ?d("Statistics per transaction type...~n", []), + ?d("~n", []), + display_type_stats(" ", TypeStats, NT, AccMicros), + + ?d("~n", []), + ?d("Transaction statistics per table node...~n", []), + ?d("~n", []), + display_calc_stats(" ", EvalNodeStats, NT, AccMicros), + + ?d("~n", []), + ?d("Transaction statistics per generator node...~n", []), + ?d("~n", []), + display_calc_stats(" ", GenNodeStats, NT, AccMicros) + end, + if + C#config.statistics_detail /= debug2 -> + ignore; + true -> + io:format("~n", []), + io:format("------ Test Results ------~n", []), + io:format("Length : ~p sec~n", [C#config.generator_duration div 1000]), + Host = lists:nth(2, string:tokens(atom_to_list(node()), [$@])), + io:format("Processor : ~s~n", [Host]), + io:format("Number of Proc: ~p~n", [NG]), + io:format("~n", []), + display_trans_stats(" ", TypeStats, NT, AccMicros, NG), + io:format("~n", []), + io:format(" Overall Statistics~n", []), + io:format(" Transactions: ~p~n", [NT]), + io:format(" Inner : ~p TPS~n", [catch ((NT * 1000000 * NG) div AccMicros)]), + io:format(" Outer : ~p TPS~n", [catch ((NT * 1000000 * NG) div WallMicros)]), + io:format("~n", []) + end. + + +display_calc_stats(Prefix, [{_Tag, 0, 0, 0, 0} | Rest], NT, Micros) -> + display_calc_stats(Prefix, Rest, NT, Micros); +display_calc_stats(Prefix, [{Tag, NC, NA, _NB, NM} | Rest], NT, Micros) -> + ?d("~s~s n=~s%\ttime=~s%~n", + [Prefix, left(Tag), percent(NC + NA, NT), percent(NM, Micros)]), + display_calc_stats(Prefix, Rest, NT, Micros); +display_calc_stats(_, [], _, _) -> + ok. + +display_type_stats(Prefix, [{_Tag, 0, 0, 0, 0} | Rest], NT, Micros) -> + display_type_stats(Prefix, Rest, NT, Micros); +display_type_stats(Prefix, [{Tag, NC, NA, NB, NM} | Rest], NT, Micros) -> + ?d("~s~s n=~s%\ttime=~s%\tavg micros=~p~n", + [ + Prefix, + left(Tag), + percent(NC + NA, NT), + percent(NM, Micros), + catch (NM div (NC + NA)) + ]), + case NA /= 0 of + true -> ?d("~s ~s% aborted~n", [Prefix, percent(NA, NC + NA)]); + false -> ignore + end, + case NB /= 0 of + true -> ?d("~s ~s% branches executed~n", [Prefix, percent(NB, NC + NA)]); + false -> ignore + end, + display_type_stats(Prefix, Rest, NT, Micros); +display_type_stats(_, [], _, _) -> + ok. + +left(Term) -> + string:left(lists:flatten(io_lib:format("~p", [Term])), 27, $.). + +percent(_Part, 0) -> "infinity"; +percent(Part, Total) -> io_lib:format("~8.4f", [(Part * 100) / Total]). + +calc_stats_per_tag([], _Pos, Acc) -> + lists:sort(Acc); +calc_stats_per_tag([Tuple | _] = FlatStats, Pos, Acc) when size(Tuple) == 5 -> + Tag = element(Pos, Tuple), + do_calc_stats_per_tag(FlatStats, Pos, {Tag, 0, 0, 0, 0}, Acc). + +do_calc_stats_per_tag([Tuple | Rest], Pos, {Tag, NC, NA, NB, NM}, Acc) + when element(Pos, Tuple) == Tag -> + Val = element(5, Tuple), + case element(2, Tuple) of + n_commits -> + do_calc_stats_per_tag(Rest, Pos, {Tag, NC + Val, NA, NB, NM}, Acc); + n_aborts -> + do_calc_stats_per_tag(Rest, Pos, {Tag, NC, NA + Val, NB, NM}, Acc); + n_branches_executed -> + do_calc_stats_per_tag(Rest, Pos, {Tag, NC, NA, NB + Val, NM}, Acc); + n_micros -> + do_calc_stats_per_tag(Rest, Pos, {Tag, NC, NA, NB, NM + Val}, Acc) + end; +do_calc_stats_per_tag(GenStats, Pos, CalcStats, Acc) -> + calc_stats_per_tag(GenStats, Pos, [CalcStats | Acc]). + +display_trans_stats(Prefix, [{_Tag, 0, 0, 0, 0} | Rest], NT, Micros, NG) -> + display_trans_stats(Prefix, Rest, NT, Micros, NG); +display_trans_stats(Prefix, [{Tag, NC, NA, NB, NM} | Rest], NT, Micros, NG) -> + Common = + fun(Name) -> + Sec = NM / (1000000 * NG), + io:format(" ~s: ~p (~p%) Time: ~p sec TPS = ~p~n", + [Name, + NC + NA, + round(((NC + NA) * 100) / NT), + round(Sec), + round((NC + NA) / Sec)]) + end, + Branch = + fun() -> + io:format(" Branches Executed: ~p (~p%)~n", + [NB, round((NB * 100) / (NC + NA))]) + end, + Rollback = + fun() -> + io:format(" Rollback Executed: ~p (~p%)~n", + [NA, round((NA * 100) / (NC + NA))]) + end, + case Tag of + t1 -> + Common("T1"); + t2 -> + Common("T2"); + t3 -> + Common("T3"), + Branch(); + t4 -> + Common("T4"), + Branch(), + Rollback(); + t5 -> + Common("T5"), + Branch(), + Rollback(); + _ -> + Common(io_lib:format("~p", [Tag])) + end, + display_trans_stats(Prefix, Rest, NT, Micros, NG); +display_trans_stats(_, [], _, _, _) -> + ok. + diff --git a/lib/mnesia/examples/bench/bench_populate.erl b/lib/mnesia/examples/bench/bench_populate.erl new file mode 100644 index 0000000000..f82ee210b6 --- /dev/null +++ b/lib/mnesia/examples/bench/bench_populate.erl @@ -0,0 +1,200 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2001-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% File : bench_populate.hrl +%%% Author : Hakan Mattsson <[email protected]> +%%% Purpose : Populate the database +%%% Created : 21 Jun 2001 by Hakan Mattsson <[email protected]> +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-module(bench_populate). +-author('[email protected]'). + +-include("bench.hrl"). + +%% Public +-export([start/1]). + +%% Populate the database +start(C) when is_record(C, config) -> + ?d("~n",[]), + ?d("Populate database...~n",[]), + ?d("~n",[]), + create_tables(C), + Populate = + fun() -> + populate_subscriber(write, C), + populate_group(write, C), + populate_server(write, C) + end, + mnesia:activity(sync_dirty, Populate, [], mnesia_frag). + +%% ------------------------------------------------------------------- +%% Create the tables +%% ------------------------------------------------------------------- + +create_tables(C) -> + ?d(" Delete old tables...~n",[]), + mnesia:delete_table(group), + mnesia:delete_table(subscriber), + mnesia:delete_table(session), + mnesia:delete_table(server), + mnesia:delete_table(suffix), + + ?d(" Creating ~p tables, with ~p replicas distributed over ~p nodes...~n", + [C#config.storage_type, + C#config.n_replicas, + length(C#config.table_nodes)]), + + %% Create group table + GroupDef = [{C#config.storage_type, C#config.table_nodes}, + {attributes, record_info(fields, group)}], + ?APPLY(mnesia, create_table, [group, GroupDef]), + + %% Create suffix table + FragStorage = + case C#config.storage_type of + ram_copies -> n_ram_copies; + disc_copies -> n_disc_copies; + disc_only_copies -> n_disc_only_copies + end, + FragProps = + [{FragStorage, C#config.n_replicas}, + {node_pool, C#config.table_nodes}, + {n_fragments, C#config.n_fragments}], + SuffixDef = [{frag_properties, FragProps}], + ?APPLY(mnesia, create_table, [suffix, SuffixDef]), + + %% Create subscriber table + SubscriberDef = + [{frag_properties, [{foreign_key, {suffix, #subscriber.suffix}} | FragProps]}, + {attributes, record_info(fields, subscriber)}], + ?APPLY(mnesia, create_table, [subscriber, SubscriberDef]), + + %% Create session table + SessionDef = + [{frag_properties, [{foreign_key, {suffix, #session.suffix}} | FragProps]}, + {attributes, record_info(fields, session)}], + ?APPLY(mnesia, create_table, [session, SessionDef]), + + %% Create server table + ServerDef = + [{frag_properties, [{foreign_key, {suffix, #server.suffix}} | FragProps]}, + {attributes, record_info(fields, server)}], + ?APPLY(mnesia, create_table, [server, ServerDef]). + +%% ------------------------------------------------------------------- +%% Populate the subscriber table +%% ------------------------------------------------------------------- + +populate_subscriber(Wlock, C) -> + random:seed(), + N = C#config.n_subscribers, + ?d(" Populate ~p subscribers...", [N]), + do_populate_subscriber(Wlock, N - 1, C). + +do_populate_subscriber(Wlock, Id, C) when Id >= 0 -> + Suffix = bench_trans:number_to_suffix(Id), + SubscrId = bench_trans:number_to_key(Id, C), + Name = list_to_binary([random:uniform(26) + $A - 1]), + GroupId = random:uniform(C#config.n_groups) - 1, + Subscr = #subscriber{subscriber_number = SubscrId, + subscriber_name = Name, + group_id = GroupId, + location = 0, + active_sessions = 0, + changed_by = <<"">>, + changed_time = <<"">>, + suffix = Suffix}, + ?APPLY(mnesia, write, [subscriber, Subscr, Wlock]), + do_populate_subscriber(Wlock, Id - 1, C); +do_populate_subscriber(_Wlock, _, _) -> + io:format(" totally ~p bytes~n", + [mnesia:table_info(subscriber, memory) * 4]), + ok. + +%% ------------------------------------------------------------------- +%% Populate the group table +%% ------------------------------------------------------------------- + +populate_group(Wlock, C) -> + random:seed(), + N = C#config.n_groups, + ?d(" Populate ~p groups...", [N]), + do_populate_group(Wlock, N - 1, C). + +do_populate_group(Wlock, Id, C) when Id >= 0 -> + Name = list_to_binary(["-group ", integer_to_list(Id), "-"]), + Allow = init_allow(C), + Group = #group{group_id = Id, + group_name = Name, + allow_read = Allow, + allow_insert = Allow, + allow_delete = Allow}, + ?APPLY(mnesia, write, [group, Group, Wlock]), + do_populate_group(Wlock, Id - 1, C); +do_populate_group(_Wlock, _, _) -> + io:format(" totally ~p bytes~n", + [mnesia:table_info(group, memory) * 4]), + ok. + +init_allow(C) -> + do_init_allow(0, C#config.n_servers - 1). + +do_init_allow(Allow, NS) when NS >= 0 -> + case random:uniform(100) < (90 + 1) of + true -> + ServerBit = 1 bsl NS, + do_init_allow(Allow bor ServerBit, NS - 1); + false -> + do_init_allow(Allow, NS - 1) + end; +do_init_allow(Allow, _) -> + Allow. + +%% ------------------------------------------------------------------- +%% Populate the server table +%% ------------------------------------------------------------------- + +populate_server(Wlock, C) -> + random:seed(), + N = C#config.n_servers, + ?d(" Populate ~p servers with 100 records each...", [N]), + do_populate_server(Wlock, N - 1). + +do_populate_server(Wlock, Id) when Id >= 0 -> + populate_server_suffixes(Wlock, Id, 99), + do_populate_server(Wlock, Id - 1); +do_populate_server(_Wlock, _) -> + io:format(" totally ~p bytes~n", + [mnesia:table_info(server, memory) * 4]), + ok. + +populate_server_suffixes(Wlock, Id, Suffix) when Suffix >= 0 -> + Name = list_to_binary(["-server ", integer_to_list(Id), "-"]), + Server = #server{server_key = {Id, Suffix}, + server_name = Name, + no_of_read = 0, + no_of_insert = 0, + no_of_delete = 0, + suffix = Suffix}, + ?APPLY(mnesia, write, [server, Server, Wlock]), + populate_server_suffixes(Wlock, Id, Suffix - 1); +populate_server_suffixes(_Wlock, _, _) -> + ok. + diff --git a/lib/mnesia/examples/bench/bench_trans.erl b/lib/mnesia/examples/bench/bench_trans.erl new file mode 100644 index 0000000000..945715daae --- /dev/null +++ b/lib/mnesia/examples/bench/bench_trans.erl @@ -0,0 +1,184 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2001-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% File : bench_trans.hrl +%%% Author : Hakan Mattsson <[email protected]> +%%% Purpose : Implement the transactions in Canadian database benchmark (LMC/UU-01:025) +%%% Created : 21 Jun 2001 by Hakan Mattsson <[email protected]> +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-module(bench_trans). +-author('[email protected]'). + +-include("bench.hrl"). + +-export([ + update_current_location/5, + read_current_location/2, + read_session_details/4, + create_session_to_server/6, + delete_session_from_server/5, + number_to_suffix/1, + number_to_key/2 + ]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% The transactions +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% ------------------------------------------------------------------- +%% T1 +%% ------------------------------------------------------------------- + +update_current_location(Wlock, SubscrId, Location, ChangedBy, ChangedTime) -> + Suffix = number_to_suffix(SubscrId), + [Subscr] = mnesia:read({subscriber, Suffix}, SubscrId, Wlock), + Subscr2 = Subscr#subscriber{location = Location, + changed_by = ChangedBy, + changed_time = ChangedTime}, + mnesia:write(subscriber, Subscr2, Wlock), + {do_commit, false, [ok]}. + +%% ------------------------------------------------------------------- +%% T2 +%% ------------------------------------------------------------------- + +read_current_location(_Wlock, SubscrId) -> + Suffix = number_to_suffix(SubscrId), + [Subscr] = mnesia:read({subscriber, Suffix}, SubscrId, read), + + Name = Subscr#subscriber.subscriber_name, + Location = Subscr#subscriber.location, + ChangedBy = Subscr#subscriber.changed_by, + ChangedTime = Subscr#subscriber.changed_time, + {do_commit, false, [Name, Location, ChangedBy, ChangedTime]}. + +%% ------------------------------------------------------------------- +%% T3 +%% ------------------------------------------------------------------- + +read_session_details(Wlock, SubscrId, ServerBit, ServerId) -> + Suffix = number_to_suffix(SubscrId), + [Subscr] = mnesia:read({subscriber, Suffix}, SubscrId, read), + %%[Group] = mnesia:read(group, Subscr#subscriber.group_id, read), + [Group] = mnesia:dirty_read(group, Subscr#subscriber.group_id), + + IsAllowed = ((Group#group.allow_read band ServerBit) == ServerBit), + IsActive = ((Subscr#subscriber.active_sessions band ServerBit) == ServerBit), + ExecuteBranch = (IsAllowed and IsActive), + + case ExecuteBranch of + true -> + SessionKey = {SubscrId, ServerId}, + [Session] = mnesia:read({session, Suffix}, SessionKey, read), + + ServerKey = {ServerId, Suffix}, + [Server] = mnesia:read({server, Suffix}, ServerKey, Wlock), + Server2 = Server#server{no_of_read = Server#server.no_of_read + 1}, + mnesia:write(server, Server2, Wlock), + {do_commit, ExecuteBranch, [Session#session.session_details]}; + false -> + {do_commit, ExecuteBranch, []} + end. + +%% ------------------------------------------------------------------- +%% T4 +%% ------------------------------------------------------------------- + +create_session_to_server(Wlock, SubscrId, ServerBit, ServerId, Details, DoRollback) -> + Suffix = number_to_suffix(SubscrId), + [Subscr] = mnesia:read({subscriber, Suffix}, SubscrId, Wlock), + %%[Group] = mnesia:read(group, Subscr#subscriber.group_id, read), + [Group] = mnesia:dirty_read(group, Subscr#subscriber.group_id), + + IsAllowed = ((Group#group.allow_insert band ServerBit) == ServerBit), + IsInactive = ((Subscr#subscriber.active_sessions band ServerBit) == 0), + ExecuteBranch = (IsAllowed and IsInactive), + case ExecuteBranch of + true -> + SessionKey = {SubscrId, ServerId}, + Session = #session{session_key = SessionKey, + session_details = Details, + suffix = Suffix}, + mnesia:write(session, Session, Wlock), + Active = (Subscr#subscriber.active_sessions bor ServerBit), + Subscr2 = Subscr#subscriber{active_sessions = Active}, + mnesia:write(subscriber, Subscr2, Wlock), + + ServerKey = {ServerId, Suffix}, + [Server] = mnesia:read({server, Suffix}, ServerKey, Wlock), + Server2 = Server#server{no_of_insert = Server#server.no_of_insert + 1}, + mnesia:write(server, Server2, Wlock); + false -> + ignore + end, + case DoRollback of + true -> + mnesia:abort({do_rollback, ExecuteBranch, []}); + false -> + {do_commit, ExecuteBranch, []} + end. + +%% ------------------------------------------------------------------- +%% T5 +%% ------------------------------------------------------------------- + +delete_session_from_server(Wlock, SubscrId, ServerBit, ServerId, DoRollback) -> + Suffix = number_to_suffix(SubscrId), + [Subscr] = mnesia:read({subscriber, Suffix}, SubscrId, Wlock), + %%[Group] = mnesia:read(group, Subscr#subscriber.group_id, read), + [Group] = mnesia:dirty_read(group, Subscr#subscriber.group_id), + + IsAllowed = ((Group#group.allow_delete band ServerBit) == ServerBit), + IsActive = ((Subscr#subscriber.active_sessions band ServerBit) == ServerBit), + ExecuteBranch = (IsAllowed and IsActive), + case ExecuteBranch of + true -> + SessionKey = {SubscrId, ServerId}, + mnesia:delete({session, Suffix}, SessionKey, Wlock), + Active = (Subscr#subscriber.active_sessions bxor ServerBit), + Subscr2 = Subscr#subscriber{active_sessions = Active}, + mnesia:write(subscriber, Subscr2, Wlock), + + ServerKey = {ServerId, Suffix}, + [Server] = mnesia:read({server, Suffix}, ServerKey, Wlock), + Server2 = Server#server{no_of_delete = Server#server.no_of_delete + 1}, + mnesia:write(server, Server2, Wlock); + false -> + ignore + end, + case DoRollback of + true -> + mnesia:abort({do_rollback, ExecuteBranch, []}); + false -> + {do_commit, ExecuteBranch, []} + end. + +number_to_suffix(SubscrId) when is_integer(SubscrId) -> + SubscrId rem 100; +number_to_suffix(<<_:8/binary, TimesTen:8/integer, TimesOne:8/integer>>) -> + ((TimesTen - $0) * 10) + (TimesOne - $0). + +number_to_key(Id, C) when is_integer(Id) -> + case C#config.use_binary_subscriber_key of + true -> + list_to_binary(string:right(integer_to_list(Id), 10, $0)); + false -> + Id + end. + diff --git a/lib/mnesia/examples/bup.erl b/lib/mnesia/examples/bup.erl new file mode 120000 index 0000000000..a25a785996 --- /dev/null +++ b/lib/mnesia/examples/bup.erl @@ -0,0 +1 @@ +../doc/src/bup.erl
\ No newline at end of file diff --git a/lib/mnesia/examples/company.erl b/lib/mnesia/examples/company.erl new file mode 120000 index 0000000000..4b0c0b6bcc --- /dev/null +++ b/lib/mnesia/examples/company.erl @@ -0,0 +1 @@ +../doc/src/company.erl
\ No newline at end of file diff --git a/lib/mnesia/examples/company.hrl b/lib/mnesia/examples/company.hrl new file mode 120000 index 0000000000..95014d9781 --- /dev/null +++ b/lib/mnesia/examples/company.hrl @@ -0,0 +1 @@ +../doc/src/company.hrl
\ No newline at end of file diff --git a/lib/mnesia/examples/company_o.erl b/lib/mnesia/examples/company_o.erl new file mode 120000 index 0000000000..f4a40b768a --- /dev/null +++ b/lib/mnesia/examples/company_o.erl @@ -0,0 +1 @@ +../doc/src/company_o.erl
\ No newline at end of file diff --git a/lib/mnesia/examples/company_o.hrl b/lib/mnesia/examples/company_o.hrl new file mode 120000 index 0000000000..bfa57e37ea --- /dev/null +++ b/lib/mnesia/examples/company_o.hrl @@ -0,0 +1 @@ +../doc/src/company_o.hrl
\ No newline at end of file diff --git a/lib/mnesia/examples/mnesia_meter.erl b/lib/mnesia/examples/mnesia_meter.erl new file mode 100644 index 0000000000..ea74d8691b --- /dev/null +++ b/lib/mnesia/examples/mnesia_meter.erl @@ -0,0 +1,465 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1997-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +%% Getting started: +%% +%% 1 Start one or more distributed Erlang nodes +%% 2a Connect the nodes, e.g. with net_adm:ping/1 +%% 3a Run mnesia_meter:go() +%% 3b Run mnesia_meter:go(ReplicaType) +%% 3c Run mnesia_meter:go(ReplicaType, Nodes) + +-module(mnesia_meter). +-author('[email protected]'). +-export([ + go/0, + go/1, + go/2, + repeat_meter/2 + ]). + +-record(person, {name, %% atomic, unique key + data, %% compound structure + married_to, %% name of partner or undefined + children}). %% list of children + +-record(meter, {desc, init, meter, micros}). + +-record(result, {desc, list}). + +-define(TIMES, 1000). + +go() -> + go(ram_copies). + +go(ReplicaType) -> + go(ReplicaType, [node() | nodes()]). + +go(ReplicaType, Nodes) -> + {ok, FunOverhead} = tc(fun(_) -> {atomic, ok} end, ?TIMES), + Size = size(term_to_binary(#person{})), + io:format("A fun apply costs ~p micro seconds. Record size is ~p bytes.~n", + [FunOverhead, Size]), + Res = go(ReplicaType, Nodes, [], FunOverhead, []), + NewRes = rearrange(Res, []), + DescHeader = lists:flatten(io_lib:format("~w on ~w", [ReplicaType, Nodes])), + ItemHeader = lists:seq(1, length(Nodes)), + Header = #result{desc = DescHeader, list = ItemHeader}, + SepList = ['--------' || _ <- Nodes], + Separator = #result{desc = "", list = SepList}, + display([Separator, Header, Separator | NewRes] ++ [Separator]). + +go(_ReplicaType, [], _Config, _FunOverhead, Acc) -> + Acc; +go(ReplicaType, [H | T], OldNodes, FunOverhead, Acc) -> + Nodes = [H | OldNodes], + Config = [{ReplicaType, Nodes}], + Res = run(Nodes, Config, FunOverhead), + go(ReplicaType, T, Nodes, FunOverhead, [{ReplicaType, Nodes, Res} | Acc]). + +rearrange([{_ReplicaType, _Nodes, Meters} | Tail], Acc) -> + Acc2 = [add_meter(M, Acc) || M <- Meters], + rearrange(Tail, Acc2); +rearrange([], Acc) -> + Acc. + +add_meter(M, Acc) -> + case lists:keysearch(M#meter.desc, #result.desc, Acc) of + {value, R} -> + R#result{list = [M#meter.micros | R#result.list]}; + false -> + #result{desc = M#meter.desc, list = [M#meter.micros]} + end. + +display(Res) -> + MaxDesc = lists:max([length(R#result.desc) || R <- Res]), + Format = lists:concat(["! ~-", MaxDesc, "s"]), + display(Res, Format, MaxDesc). + +display([R | Res], Format, MaxDesc) -> + case R#result.desc of + "" -> + io:format(Format, [lists:duplicate(MaxDesc, "-")]); + Desc -> + io:format(Format, [Desc]) + end, + display_items(R#result.list, R#result.desc), + io:format(" !~n", []), + display(Res, Format, MaxDesc); +display([], _Format, _MaxDesc) -> + ok. + +display_items([_Item | Items], "") -> + io:format(" ! ~s", [lists:duplicate(10, $-)]), + display_items(Items, ""); +display_items([Micros | Items], Desc) -> + io:format(" ! ~10w", [Micros]), + display_items(Items, Desc); +display_items([], _Desc) -> + ok. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +meters() -> + [#meter{desc = "transaction update two records with read and write", + init = fun write_records/2, + meter = fun update_records/1}, + #meter{desc = "transaction update two records with wread and write", + init = fun write_records/2, + meter = fun w_update_records/1}, + #meter{desc = "transaction update two records with read and s_write", + init = fun s_write_records/2, + meter = fun s_update_records/1}, + #meter{desc = "sync_dirty update two records with read and write", + init = fun sync_dirty_write_records/2, + meter = fun sync_dirty_update_records/1}, + #meter{desc = "async_dirty update two records with read and write", + init = fun async_dirty_write_records/2, + meter = fun async_dirty_update_records/1}, + #meter{desc = "plain fun update two records with dirty_read and dirty_write", + init = fun dirty_write_records/2, + meter = fun dirty_update_records/1}, + #meter{desc = "ets update two records with read and write (local only)", + init = fun ets_opt_write_records/2, + meter = fun ets_update_records/1}, + #meter{desc = "plain fun update two records with ets:lookup and ets:insert (local only)", + init = fun bif_opt_write_records/2, + meter = fun bif_update_records/1}, + #meter{desc = "plain fun update two records with dets:lookup and dets:insert (local only)", + init = fun dets_opt_write_records/2, + meter = fun dets_update_records/1}, + + #meter{desc = "transaction write two records with write", + init = fun write_records/2, + meter = fun(X) -> write_records(X, 0-X) end}, + #meter{desc = "transaction write two records with s_write", + init = fun s_write_records/2, + meter = fun(X) -> s_write_records(X, 0-X) end}, + #meter{desc = "sync_dirty write two records with write", + init = fun sync_dirty_write_records/2, + meter = fun(X) -> sync_dirty_write_records(X, 0-X) end}, + #meter{desc = "async_dirty write two records with write", + init = fun async_dirty_write_records/2, + meter = fun(X) -> async_dirty_write_records(X, 0-X) end}, + #meter{desc = "plain fun write two records with dirty_write", + init = fun dirty_write_records/2, + meter = fun(X) -> dirty_write_records(X, 0-X) end}, + #meter{desc = "ets write two records with write (local only)", + init = fun ets_opt_write_records/2, + meter = fun(X) -> ets_write_records(X, 0-X) end}, + #meter{desc = "plain fun write two records with ets:insert (local only)", + init = fun bif_opt_write_records/2, + meter = fun(X) -> bif_write_records(X, 0-X) end}, + #meter{desc = "plain fun write two records with dets:insert (local only)", + init = fun dets_opt_write_records/2, + meter = fun(X) -> dets_write_records(X, 0-X) end}, + + #meter{desc = "transaction read two records with read", + init = fun write_records/2, + meter = fun(X) -> read_records(X, 0-X) end}, + #meter{desc = "sync_dirty read two records with read", + init = fun sync_dirty_write_records/2, + meter = fun(X) -> sync_dirty_read_records(X, 0-X) end}, + #meter{desc = "async_dirty read two records with read", + init = fun async_dirty_write_records/2, + meter = fun(X) -> async_dirty_read_records(X, 0-X) end}, + #meter{desc = "plain fun read two records with dirty_read", + init = fun dirty_write_records/2, + meter = fun(X) -> dirty_read_records(X, 0-X) end}, + #meter{desc = "ets read two records with read", + init = fun ets_opt_write_records/2, + meter = fun(X) -> ets_read_records(X, 0-X) end}, + #meter{desc = "plain fun read two records with ets:lookup", + init = fun bif_opt_write_records/2, + meter = fun(X) -> bif_read_records(X, 0-X) end}, + #meter{desc = "plain fun read two records with dets:lookup", + init = fun dets_opt_write_records/2, + meter = fun(X) -> dets_read_records(X, 0-X) end} + ]. + +update_fun(Name) -> + fun() -> + case mnesia:read({person, Name}) of + [] -> + mnesia:abort(no_such_person); + [Pers] -> + [Partner] = mnesia:read({person, Pers#person.married_to}), + mnesia:write(Pers#person{married_to = undefined}), + mnesia:write(Partner#person{married_to = undefined}) + end + end. + +update_records(Name) -> + mnesia:transaction(update_fun(Name)). + +sync_dirty_update_records(Name) -> + {atomic, mnesia:sync_dirty(update_fun(Name))}. + +async_dirty_update_records(Name) -> + {atomic, mnesia:async_dirty(update_fun(Name))}. + +ets_update_records(Name) -> + {atomic, mnesia:ets(update_fun(Name))}. + +w_update_records(Name) -> + F = fun() -> + case mnesia:wread({person, Name}) of + [] -> + mnesia:abort(no_such_person); + [Pers] -> + [Partner] = mnesia:wread({person, Pers#person.married_to}), + mnesia:write(Pers#person{married_to = undefined}), + mnesia:write(Partner#person{married_to = undefined}) + end + end, + mnesia:transaction(F). + +s_update_records(Name) -> + F = fun() -> + case mnesia:read({person, Name}) of + [] -> + mnesia:abort(no_such_person); + [Pers] -> + [Partner] = mnesia:read({person, Pers#person.married_to}), + mnesia:s_write(Pers#person{married_to = undefined}), + mnesia:s_write(Partner#person{married_to = undefined}) + end + end, + mnesia:transaction(F). + +dirty_update_records(Name) -> + case mnesia:dirty_read({person, Name}) of + [] -> + mnesia:abort(no_such_person); + [Pers] -> + [Partner] = mnesia:dirty_read({person, Pers#person.married_to}), + mnesia:dirty_write(Pers#person{married_to = undefined}), + mnesia:dirty_write(Partner#person{married_to = undefined}) + end, + {atomic, ok}. + +bif_update_records(Name) -> + case ets:lookup(person, Name) of + [] -> + mnesia:abort(no_such_person); + [Pers] -> + [Partner] = ets:lookup(person, Pers#person.married_to), + ets:insert(person, Pers#person{married_to = undefined}), + ets:insert(person, Partner#person{married_to = undefined}) + end, + {atomic, ok}. + +dets_update_records(Name) -> + case dets:lookup(person, Name) of + [] -> + mnesia:abort(no_such_person); + [Pers] -> + [Partner] = dets:lookup(person, Pers#person.married_to), + dets:insert(person, Pers#person{married_to = undefined}), + dets:insert(person, Partner#person{married_to = undefined}) + end, + {atomic, ok}. + +write_records_fun(Pers, Partner) -> + fun() -> + P = #person{children = [ulla, bella]}, + mnesia:write(P#person{name = Pers, married_to = Partner}), + mnesia:write(P#person{name = Partner, married_to = Pers}) + end. + +write_records(Pers, Partner) -> + mnesia:transaction(write_records_fun(Pers, Partner)). + +sync_dirty_write_records(Pers, Partner) -> + {atomic, mnesia:sync_dirty(write_records_fun(Pers, Partner))}. + +async_dirty_write_records(Pers, Partner) -> + {atomic, mnesia:async_dirty(write_records_fun(Pers, Partner))}. + +ets_write_records(Pers, Partner) -> + {atomic, mnesia:ets(write_records_fun(Pers, Partner))}. + +s_write_records(Pers, Partner) -> + F = fun() -> + P = #person{children = [ulla, bella]}, + mnesia:s_write(P#person{name = Pers, married_to = Partner}), + mnesia:s_write(P#person{name = Partner, married_to = Pers}) + end, + mnesia:transaction(F). + +dirty_write_records(Pers, Partner) -> + P = #person{children = [ulla, bella]}, + mnesia:dirty_write(P#person{name = Pers, married_to = Partner}), + mnesia:dirty_write(P#person{name = Partner, married_to = Pers}), + {atomic, ok}. + +ets_opt_write_records(Pers, Partner) -> + case mnesia:table_info(person, where_to_commit) of + [{N, ram_copies}] when N == node() -> + ets_write_records(Pers, Partner); + _ -> + throw(skipped) + end. + +bif_opt_write_records(Pers, Partner) -> + case mnesia:table_info(person, where_to_commit) of + [{N, ram_copies}] when N == node() -> + bif_write_records(Pers, Partner); + _ -> + throw(skipped) + end. + +bif_write_records(Pers, Partner) -> + P = #person{children = [ulla, bella]}, + ets:insert(person, P#person{name = Pers, married_to = Partner}), + ets:insert(person, P#person{name = Partner, married_to = Pers}), + {atomic, ok}. + +dets_opt_write_records(Pers, Partner) -> + case mnesia:table_info(person, where_to_commit) of + [{N, disc_only_copies}] when N == node() -> + dets_write_records(Pers, Partner); + _ -> + throw(skipped) + end. + +dets_write_records(Pers, Partner) -> + P = #person{children = [ulla, bella]}, + dets:insert(person, P#person{name = Pers, married_to = Partner}), + dets:insert(person, P#person{name = Partner, married_to = Pers}), + {atomic, ok}. + +read_records_fun(Pers, Partner) -> + fun() -> + case {mnesia:read({person, Pers}), + mnesia:read({person, Partner})} of + {[_], [_]} -> + ok; + _ -> + mnesia:abort(no_such_person) + end + end. + +read_records(Pers, Partner) -> + mnesia:transaction(read_records_fun(Pers, Partner)). + +sync_dirty_read_records(Pers, Partner) -> + {atomic, mnesia:sync_dirty(read_records_fun(Pers, Partner))}. + +async_dirty_read_records(Pers, Partner) -> + {atomic, mnesia:async_dirty(read_records_fun(Pers, Partner))}. + +ets_read_records(Pers, Partner) -> + {atomic, mnesia:ets(read_records_fun(Pers, Partner))}. + +dirty_read_records(Pers, Partner) -> + case {mnesia:dirty_read({person, Pers}), + mnesia:dirty_read({person, Partner})} of + {[_], [_]} -> + {atomic, ok}; + _ -> + mnesia:abort(no_such_person) + end. + +bif_read_records(Pers, Partner) -> + case {ets:lookup(person, Pers), + ets:lookup(person, Partner)} of + {[_], [_]} -> + {atomic, ok}; + _ -> + mnesia:abort(no_such_person) + end. + +dets_read_records(Pers, Partner) -> + case {dets:lookup(person, Pers), + dets:lookup(person, Partner)} of + {[_], [_]} -> + {atomic, ok}; + _ -> + mnesia:abort(no_such_person) + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +run(Nodes, Config, FunOverhead) -> + Meters = meters(), + io:format("Run ~w meters with table config: ~w~n", [length(Meters), Config]), + rpc:multicall(Nodes, mnesia, lkill, []), + start(Nodes, Config), + Res = [run_meter(Data, Nodes, FunOverhead) || Data <- Meters], + stop(Nodes), + Res. + +run_meter(M, Nodes, FunOverhead) when record(M, meter) -> + io:format(".", []), + case catch init_records(M#meter.init, ?TIMES) of + {atomic, ok} -> + rpc:multicall(Nodes, mnesia, dump_log, []), + case tc(M#meter.meter, ?TIMES) of + {ok, Micros} -> + M#meter{micros = lists:max([0, Micros - FunOverhead])}; + {error, Reason} -> + M#meter{micros = Reason} + end; + Res -> + M#meter{micros = Res} + end. + +start(Nodes, Config) -> + mnesia:delete_schema(Nodes), + ok = mnesia:create_schema(Nodes), + Args = [[{dump_log_write_threshold, ?TIMES div 2}, + {dump_log_time_threshold, timer:hours(10)}]], + lists:foreach(fun(Node) -> rpc:call(Node, mnesia, start, Args) end, Nodes), + Attrs = record_info(fields, person), + TabDef = [{attributes, Attrs} | Config], + {atomic, _} = mnesia:create_table(person, TabDef). + +stop(Nodes) -> + rpc:multicall(Nodes, mnesia, stop, []). + +%% Generate some dummy persons +init_records(_Fun, 0) -> + {atomic, ok}; +init_records(Fun, Times) -> + {atomic, ok} = Fun(Times, 0 - Times), + init_records(Fun, Times - 1). + +tc(Fun, Times) -> + case catch timer:tc(?MODULE, repeat_meter, [Fun, Times]) of + {Micros, ok} -> + {ok, Micros div Times}; + {_Micros, {error, Reason}} -> + {error, Reason}; + {'EXIT', Reason} -> + {error, Reason} + end. + +%% The meter must return {atomic, ok} +repeat_meter(Meter, Times) -> + repeat_meter(Meter, {atomic, ok}, Times). + +repeat_meter(_, {atomic, ok}, 0) -> + ok; +repeat_meter(Meter, {atomic, _Result}, Times) when Times > 0 -> + repeat_meter(Meter, Meter(Times), Times - 1); +repeat_meter(_Meter, Reason, _Times) -> + {error, Reason}. + diff --git a/lib/mnesia/examples/mnesia_tpcb.erl b/lib/mnesia/examples/mnesia_tpcb.erl new file mode 100644 index 0000000000..903c53a21c --- /dev/null +++ b/lib/mnesia/examples/mnesia_tpcb.erl @@ -0,0 +1,1268 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1996-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% +%% MODULE +%% +%% mnesia_tpcb - TPC-B benchmarking of Mnesia +%% +%% DESCRIPTION +%% +%% The metrics used in the TPC-B benchmark are throughput as measured +%% in transactions per second (TPS). The benchmark uses a single, +%% simple update-intensive transaction to load the database system. +%% The single transaction type provides a simple, repeatable +%% unit of work, and is designed to exercise the basic components of +%% a database system. +%% +%% The definition of the TPC-B states lots of detailed rules and +%% conditions that must be fullfilled, e.g. how the ACID (atomicity, +%% consistency, isolation and durability) properties are verified, +%% how the random numbers must be distributed, minimum sizes of +%% the different types of records, minimum duration of the benchmark, +%% formulas to calculate prices (dollars per tps), disclosure issues +%% etc. Please, see http://www.tpc.org/ about the nitty gritty details. +%% +%% The TPC-B benchmark is stated in terms of a hypothetical bank. The +%% bank has one or more branches. Each branch has multiple tellers. The +%% bank has many customers, each with an account. The database represents +%% the cash position of each entity (branch, teller and account) and a +%% history of recent transactions run by the bank. The transaction +%% represents the work done when a customer makes a deposit or a +%% withdrawal against his account. The transaction is performed by a +%% teller at some branch. +%% +%% Each process that performs TPC-B transactions is called a driver. +%% Drivers generates teller_id, account_id and delta amount of +%% money randomly. An account, a teller and a branch are read, their +%% balances are adjusted and a history record is created. The driver +%% measures the time for 3 reads, 3 writes and 1 create. +%% +%% GETTING STARTED +%% +%% Generate tables and run with default configuration: +%% +%% mnesia_tpcb:start(). +%% +%% A little bit more advanced; +%% +%% spawn(mnesia_tpcb, start, [[[{n_drivers_per_node, 8}, {stop_after, infinity}]]), +%% mnesia_tpcb:stop(). +%% +%% Really advanced; +%% +%% mnesia_tpcb:init(([{n_branches, 8}, {replica_type, disc_only_copies}]), +%% mnesia_tpcb:run(([{n_drivers_per_node, 8}]), +%% mnesia_tpcb:run(([{n_drivers_per_node, 64}]). +%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-module(mnesia_tpcb). +-author('[email protected]'). + +-export([ + config/2, + count_balance/0, + driver_init/2, + init/1, + reporter_init/2, + run/1, + start/0, + start/1, + start/2, + stop/0, + real_trans/5, + verify_tabs/0, + reply_gen_branch/3, + frag_add_delta/7, + + conflict_test/1, + dist_test/1, + replica_test/1, + sticky_replica_test/1, + remote_test/1, + remote_frag2_test/1 + ]). + +-define(SECOND, 1000000). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% Account record, total size must be at least 100 bytes + +-define(ACCOUNT_FILLER, + {123456789012345678901234567890123456789012345678901234567890, + 123456789012345678901234567890123456789012345678901234567890, + 123456789012345678901234567890123456789012345678901234}). + +-record(account, + { + id = 0, % Unique account id + branch_id = 0, % Branch where the account is held + balance = 0, % Account balance + filler = ?ACCOUNT_FILLER % Gap filler to ensure size >= 100 bytes + }). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% Branch record, total size must be at least 100 bytes + +-define(BRANCH_FILLER, + {123456789012345678901234567890123456789012345678901234567890, + 123456789012345678901234567890123456789012345678901234567890, + 123456789012345678901234567890123456789012345678901234567890}). + +-record(branch, + { + id = 0, % Unique branch id + balance = 0, % Total balance of whole branch + filler = ?BRANCH_FILLER % Gap filler to ensure size >= 100 bytes + }). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% Teller record, total size must be at least 100 bytes + +-define(TELLER_FILLER, + {123456789012345678901234567890123456789012345678901234567890, + 123456789012345678901234567890123456789012345678901234567890, + 1234567890123456789012345678901234567890123456789012345678}). + +-record(teller, + { + id = 0, % Unique teller id + branch_id = 0, % Branch where the teller is located + balance = 0, % Teller balance + filler = ?TELLER_FILLER % Gap filler to ensure size >= 100 bytes + }). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% History record, total size must be at least 50 bytes + +-define(HISTORY_FILLER, 1234567890). + +-record(history, + { + history_id = {0, 0}, % {DriverId, DriverLocalHistoryid} + time_stamp = now(), % Time point during active transaction + branch_id = 0, % Branch associated with teller + teller_id = 0, % Teller invlolved in transaction + account_id = 0, % Account updated by transaction + amount = 0, % Amount (delta) specified by transaction + filler = ?HISTORY_FILLER % Gap filler to ensure size >= 50 bytes + }). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +-record(tab_config, + { + db_nodes = [node()], + n_replicas = 1, % Ignored for non-fragmented tables + replica_nodes = [node()], + replica_type = ram_copies, + use_running_mnesia = false, + n_fragments = 0, + n_branches = 1, + n_tellers_per_branch = 10, % Must be 10 + n_accounts_per_branch = 100000, % Must be 100000 + branch_filler = ?BRANCH_FILLER, + account_filler = ?ACCOUNT_FILLER, + teller_filler = ?TELLER_FILLER + }). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(run_config, + { + driver_nodes = [node()], + n_drivers_per_node = 1, + use_running_mnesia = false, + stop_after = timer:minutes(15), % Minimum 15 min + report_interval = timer:minutes(1), + use_sticky_locks = false, + spawn_near_branch = false, + activity_type = transaction, + reuse_history_id = false + }). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(time, + { + n_trans = 0, + min_n = 0, + max_n = 0, + acc_time = 0, + max_time = 0 + }). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(driver_state, + { + driver_id, + driver_node, + seed, + n_local_branches, + local_branches, + tab_config, + run_config, + history_id, + time = #time{}, + acc_time = #time{}, + reuse_history_id + }). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-record(reporter_state, + { + driver_pids, + starter_pid, + n_iters = 0, + prev_tps = 0, + curr = #time{}, + acc = #time{}, + init_micros, + prev_micros, + run_config + }). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% One driver on each node, table not replicated + +config(frag_test, ReplicaType) -> + Remote = nodes(), + Local = node(), + Nodes = [Local | Remote], + [ + {n_branches, length(Nodes)}, + {n_fragments, length(Nodes)}, + {replica_nodes, Nodes}, + {db_nodes, Nodes}, + {driver_nodes, Nodes}, + {n_accounts_per_branch, 100}, + {replica_type, ReplicaType}, + {stop_after, timer:minutes(1)}, + {report_interval, timer:seconds(10)}, + {reuse_history_id, true} + ]; + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% One driver on each node, table replicated to two nodes. + +config(frag2_test, ReplicaType) -> + Remote = nodes(), + Local = node(), + Nodes = [Local | Remote], + [ + {n_branches, length(Nodes)}, + {n_fragments, length(Nodes)}, + {n_replicas, 2}, + {replica_nodes, Nodes}, + {db_nodes, Nodes}, + {driver_nodes, Nodes}, + {n_accounts_per_branch, 100}, + {replica_type, ReplicaType}, + {stop_after, timer:minutes(1)}, + {report_interval, timer:seconds(10)}, + {reuse_history_id, true} + ]; + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% One driver on this node, table replicated to all nodes. + +config(replica_test, ReplicaType) -> + Remote = nodes(), + Local = node(), + Nodes = [Local | Remote], + [ + {db_nodes, Nodes}, + {driver_nodes, [Local]}, + {replica_nodes, Nodes}, + {n_accounts_per_branch, 100}, + {replica_type, ReplicaType}, + {stop_after, timer:minutes(1)}, + {report_interval, timer:seconds(10)}, + {reuse_history_id, true} + ]; + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% One driver on this node, table replicated to all nodes. + +config(sticky_replica_test, ReplicaType) -> + Remote = nodes(), + Local = node(), + Nodes = [Local | Remote], + [ + {db_nodes, Nodes}, + {driver_nodes, [node()]}, + {replica_nodes, Nodes}, + {n_accounts_per_branch, 100}, + {replica_type, ReplicaType}, + {use_sticky_locks, true}, + {stop_after, timer:minutes(1)}, + {report_interval, timer:seconds(10)}, + {reuse_history_id, true} + ]; + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Ten drivers per node, tables replicated to all nodes, lots of branches + +config(dist_test, ReplicaType) -> + Remote = nodes(), + Local = node(), + Nodes = [Local | Remote], + [ + {db_nodes, Nodes}, + {driver_nodes, Nodes}, + {replica_nodes, Nodes}, + {n_drivers_per_node, 10}, + {n_branches, 10 * length(Nodes) * 100}, + {n_accounts_per_branch, 10}, + {replica_type, ReplicaType}, + {stop_after, timer:minutes(1)}, + {report_interval, timer:seconds(10)}, + {reuse_history_id, true} + ]; + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Ten drivers per node, tables replicated to all nodes, single branch + +config(conflict_test, ReplicaType) -> + Remote = nodes(), + Local = node(), + Nodes = [Local | Remote], + [ + {db_nodes, Nodes}, + {driver_nodes, Nodes}, + {replica_nodes, Nodes}, + {n_drivers_per_node, 10}, + {n_branches, 1}, + {n_accounts_per_branch, 10}, + {replica_type, ReplicaType}, + {stop_after, timer:minutes(1)}, + {report_interval, timer:seconds(10)}, + {reuse_history_id, true} + ]; + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% One driver on this node, table replicated to all other nodes. + +config(remote_test, ReplicaType) -> + Remote = nodes(), + Local = node(), + Nodes = [Local | Remote], + [ + {db_nodes, Nodes}, + {driver_nodes, [Local]}, + {replica_nodes, Remote}, + {n_accounts_per_branch, 100}, + {replica_type, ReplicaType}, + {stop_after, timer:minutes(1)}, + {report_interval, timer:seconds(10)}, + {reuse_history_id, true} + ]; + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% One driver on this node, table replicated to two other nodes. + +config(remote_frag2_test, ReplicaType) -> + Remote = nodes(), + Local = node(), + Nodes = [Local | Remote], + [ + {n_branches, length(Remote)}, + {n_fragments, length(Remote)}, + {n_replicas, 2}, + {replica_nodes, Remote}, + {db_nodes, Nodes}, + {driver_nodes, [Local]}, + {n_accounts_per_branch, 100}, + {replica_type, ReplicaType}, + {stop_after, timer:minutes(1)}, + {report_interval, timer:seconds(10)}, + {reuse_history_id, true} + ]. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +start(What, ReplicaType) -> + spawn_link(?MODULE, start, [config(What, ReplicaType)]). + +replica_test(ReplicaType) -> + start(replica_test, ReplicaType). + +sticky_replica_test(ReplicaType) -> + start(sticky_replica_test, ReplicaType). + +dist_test(ReplicaType) -> + start(dist_test, ReplicaType). + +conflict_test(ReplicaType) -> + start(conflict_test, ReplicaType). + +remote_test(ReplicaType) -> + start(remote_test, ReplicaType). + +remote_frag2_test(ReplicaType) -> + start(remote_frag2_test, ReplicaType). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Args is a list of {Key, Val} tuples where Key is a field name +%% in either the record tab_config or run_config. Unknown keys are ignored. + +start() -> + start([]). +start(Args) -> + init(Args), + run(Args). + +list2rec(List, Fields, DefaultTuple) -> + [Name|Defaults] = tuple_to_list(DefaultTuple), + List2 = list2rec(List, Fields, Defaults, []), + list_to_tuple([Name] ++ List2). + +list2rec(_List, [], [], Acc) -> + Acc; +list2rec(List, [F|Fields], [D|Defaults], Acc) -> + {Val, List2} = + case lists:keysearch(F, 1, List) of + false -> + {D, List}; + {value, {F, NewVal}} -> + {NewVal, lists:keydelete(F, 1, List)} + end, + list2rec(List2, Fields, Defaults, Acc ++ [Val]). + +stop() -> + case whereis(mnesia_tpcb) of + undefined -> + {error, not_running}; + Pid -> + sync_stop(Pid) + end. + +sync_stop(Pid) -> + Pid ! {self(), stop}, + receive + {Pid, {stopped, Res}} -> Res + after timer:minutes(1) -> + exit(Pid, kill), + {error, brutal_kill} + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% Initialization + +%% Args is a list of {Key, Val} tuples where Key is a field name +%% in the record tab_config, unknown keys are ignored. + +init(Args) -> + TabConfig0 = list2rec(Args, record_info(fields, tab_config), #tab_config{}), + TabConfig = + if + TabConfig0#tab_config.n_fragments =:= 0 -> + TabConfig0#tab_config{n_replicas = length(TabConfig0#tab_config.replica_nodes)}; + true -> + TabConfig0 + end, + Tags = record_info(fields, tab_config), + Fun = fun(F, Pos) -> {{F, element(Pos, TabConfig)}, Pos + 1} end, + {List, _} = lists:mapfoldl(Fun, 2, Tags), + io:format("TPC-B: Table config: ~p ~n", [List]), + + DbNodes = TabConfig#tab_config.db_nodes, + stop(), + if + TabConfig#tab_config.use_running_mnesia =:= true -> + ignore; + true -> + rpc:multicall(DbNodes, mnesia, lkill, []), + case mnesia:delete_schema(DbNodes) of + ok -> + case mnesia:create_schema(DbNodes) of + ok -> + {Replies, BadNodes} = + rpc:multicall(DbNodes, mnesia, start, []), + case [Res || Res <- Replies, Res =/= ok] of + [] when BadNodes =:= [] -> + ok; + BadRes -> + io:format("TPC-B: <ERROR> " + "Failed to start ~p: ~p~n", + [BadNodes, BadRes]), + exit({start_failed, BadRes, BadNodes}) + end; + {error, Reason} -> + io:format("TPC-B: <ERROR> " + "Failed to create schema on disc: ~p~n", + [Reason]), + exit({create_schema_failed, Reason}) + end; + {error, Reason} -> + io:format("TPC-B: <ERROR> " + "Failed to delete schema on disc: ~p~n", + [Reason]), + exit({delete_schema_failed, Reason}) + end + end, + gen_tabs(TabConfig). + +gen_tabs(TC) -> + create_tab(TC, branch, record_info(fields, branch), + undefined), + create_tab(TC, account, record_info(fields, account), + {branch, #account.branch_id}), + create_tab(TC, teller, record_info(fields, teller), + {branch, #teller.branch_id}), + create_tab(TC, history, record_info(fields, history), + {branch, #history.branch_id}), + + NB = TC#tab_config.n_branches, + NT = TC#tab_config.n_tellers_per_branch, + NA = TC#tab_config.n_accounts_per_branch, + io:format("TPC-B: Generating ~p branches a ~p bytes~n", + [NB, size(term_to_binary(default_branch(TC)))]), + io:format("TPC-B: Generating ~p * ~p tellers a ~p bytes~n", + [NB, NT, size(term_to_binary(default_teller(TC)))]), + io:format("TPC-B: Generating ~p * ~p accounts a ~p bytes~n", + [NB, NA, size(term_to_binary(default_account(TC)))]), + io:format("TPC-B: Generating 0 history records a ~p bytes~n", + [size(term_to_binary(default_history(TC)))]), + gen_branches(TC), + + case verify_tabs() of + ok -> + ignore; + {error, Reason} -> + io:format("TPC-B: <ERROR> Inconsistent tables: ~w~n", + [Reason]), + exit({inconsistent_tables, Reason}) + end. + +create_tab(TC, Name, Attrs, _ForeignKey) when TC#tab_config.n_fragments =:= 0 -> + Nodes = TC#tab_config.replica_nodes, + Type = TC#tab_config.replica_type, + Def = [{Type, Nodes}, {attributes, Attrs}], + create_tab(Name, Def); +create_tab(TC, Name, Attrs, ForeignKey) -> + NReplicas = TC#tab_config.n_replicas, + NodePool = TC#tab_config.replica_nodes, + Type = TC#tab_config.replica_type, + NF = TC#tab_config.n_fragments, + Props = [{n_fragments, NF}, + {node_pool, NodePool}, + {n_copies(Type), NReplicas}, + {foreign_key, ForeignKey}], + Def = [{frag_properties, Props}, + {attributes, Attrs}], + create_tab(Name, Def). + +create_tab(Name, Def) -> + mnesia:delete_table(Name), + case mnesia:create_table(Name, Def) of + {atomic, ok} -> + ok; + {aborted, Reason} -> + io:format("TPC-B: <ERROR> failed to create table ~w ~w: ~p~n", + [Name, Def, Reason]), + exit({create_table_failed, Reason}) + end. + +n_copies(Type) -> + case Type of + ram_copies -> n_ram_copies; + disc_copies -> n_disc_copies; + disc_only_copies -> n_disc_only_copies + end. + +gen_branches(TC) -> + First = 0, + Last = First + TC#tab_config.n_branches - 1, + GenPids = gen_branches(TC, First, Last, []), + wait_for_gen(GenPids). + +wait_for_gen([]) -> + ok; +wait_for_gen(Pids) -> + receive + {branch_generated, Pid} -> wait_for_gen(lists:delete(Pid, Pids)); + Exit -> + exit({tpcb_failed, Exit}) + end. + +gen_branches(TC, BranchId, Last, UsedNs) when BranchId =< Last -> + UsedNs2 = get_branch_nodes(BranchId, UsedNs), + Node = hd(UsedNs2), + Pid = spawn_link(Node, ?MODULE, reply_gen_branch, + [self(), TC, BranchId]), + [Pid | gen_branches(TC, BranchId + 1, Last, UsedNs2)]; +gen_branches(_, _, _, _) -> + []. + +reply_gen_branch(ReplyTo, TC, BranchId) -> + gen_branch(TC, BranchId), + ReplyTo ! {branch_generated, self()}, + unlink(ReplyTo). + +%% Returns a new list of nodes with the best node as head +get_branch_nodes(BranchId, UsedNs) -> + WriteNs = table_info({branch, BranchId}, where_to_write), + WeightedNs = [{n_duplicates(N, UsedNs, 0), N} || N <- WriteNs], + [{_, LeastUsed} | _ ] = lists:sort(WeightedNs), + [LeastUsed | UsedNs]. + +n_duplicates(_N, [], Count) -> + Count; +n_duplicates(N, [N | Tail], Count) -> + n_duplicates(N, Tail, Count + 1); +n_duplicates(N, [_ | Tail], Count) -> + n_duplicates(N, Tail, Count). + +gen_branch(TC, BranchId) -> + A = default_account(TC), + NA = TC#tab_config.n_accounts_per_branch, + FirstA = BranchId * NA, + ArgsA = [FirstA, FirstA + NA - 1, BranchId, A], + ok = mnesia:activity(async_dirty, fun gen_accounts/4, ArgsA, mnesia_frag), + + T = default_teller(TC), + NT = TC#tab_config.n_tellers_per_branch, + FirstT = BranchId * NT, + ArgsT = [FirstT, FirstT + NT - 1, BranchId, T], + ok = mnesia:activity(async_dirty, fun gen_tellers/4, ArgsT, mnesia_frag), + + B = default_branch(TC), + FunB = fun() -> mnesia:write(branch, B#branch{id = BranchId}, write) end, + ok = mnesia:activity(sync_dirty, FunB, [], mnesia_frag). + +gen_tellers(Id, Last, BranchId, T) when Id =< Last -> + mnesia:write(teller, T#teller{id = Id, branch_id=BranchId}, write), + gen_tellers(Id + 1, Last, BranchId, T); +gen_tellers(_, _, _, _) -> + ok. + +gen_accounts(Id, Last, BranchId, A) when Id =< Last -> + mnesia:write(account, A#account{id = Id, branch_id=BranchId}, write), + gen_accounts(Id + 1, Last, BranchId, A); +gen_accounts(_, _, _, _) -> + ok. + +default_branch(TC) -> #branch{filler = TC#tab_config.branch_filler}. +default_teller(TC) -> #teller{filler = TC#tab_config.teller_filler}. +default_account(TC) -> #account{filler = TC#tab_config.account_filler}. +default_history(_TC) -> #history{}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%%% Run the benchmark + +%% Args is a list of {Key, Val} tuples where Key is a field name +%% in the record run_config, unknown keys are ignored. +run(Args) -> + RunConfig = list2rec(Args, record_info(fields, run_config), #run_config{}), + Tags = record_info(fields, run_config), + Fun = fun(F, Pos) -> {{F, element(Pos, RunConfig)}, Pos + 1} end, + {List, _} = lists:mapfoldl(Fun, 2, Tags), + io:format("TPC-B: Run config: ~p ~n", [List]), + + Pid = spawn_link(?MODULE, reporter_init, [self(), RunConfig]), + receive + {Pid, {stopped, Res}} -> + Res; % Stopped by other process + Else -> + {tpcb_got, Else} + after RunConfig#run_config.stop_after -> + sync_stop(Pid) + end. + +reporter_init(Starter, RC) -> + register(mnesia_tpcb, self()), + process_flag(trap_exit, true), + DbNodes = mnesia:system_info(db_nodes), + if + RC#run_config.use_running_mnesia =:= true -> + ignore; + true -> + {Replies, BadNodes} = + rpc:multicall(DbNodes, mnesia, start, []), + case [Res || Res <- Replies, Res =/= ok] of + [] when BadNodes =:= [] -> + ok; + BadRes -> + io:format("TPC-B: <ERROR> " + "Failed to start ~w: ~p~n", + [BadNodes, BadRes]), + exit({start_failed, BadRes, BadNodes}) + end, + verify_tabs() + end, + + N = table_info(branch, size), + NT = table_info(teller, size) div N, + NA = table_info(account, size) div N, + + {Type, NF, RepNodes} = table_storage(branch), + TC = #tab_config{n_fragments = NF, + n_branches = N, + n_tellers_per_branch = NT, + n_accounts_per_branch = NA, + db_nodes = DbNodes, + replica_nodes = RepNodes, + replica_type = Type + }, + Drivers = start_drivers(RC, TC), + Now = now_to_micros(erlang:now()), + State = #reporter_state{driver_pids = Drivers, + run_config = RC, + starter_pid = Starter, + init_micros = Now, + prev_micros = Now + }, + case catch reporter_loop(State) of + {'EXIT', Reason} -> + io:format("TPC-B: Abnormal termination: ~p~n", [Reason]), + if + RC#run_config.use_running_mnesia =:= true -> + ignore; + true -> + rpc:multicall(DbNodes, mnesia, lkill, []) + end, + unlink(Starter), + Starter ! {self(), {stopped, {error, Reason}}}, % To be sure + exit(shutdown); + {ok, Stopper, State2} -> + Time = State2#reporter_state.acc, + Res = + case verify_tabs() of + ok -> + {ok, Time}; + {error, Reason} -> + io:format("TPC-B: <ERROR> Inconsistent tables, ~p~n", + [{error, Reason}]), + {error, Reason} + end, + if + RC#run_config.use_running_mnesia =:= true -> + ignore; + true -> + rpc:multicall(DbNodes, mnesia, stop, []) + end, + unlink(Starter), + Starter ! {self(), {stopped, Res}}, + if + Stopper =/= Starter -> + Stopper ! {self(), {stopped, Res}}; + true -> + ignore + end, + exit(shutdown) + end. + +table_info(Tab, Item) -> + Fun = fun() -> mnesia:table_info(Tab, Item) end, + mnesia:activity(sync_dirty, Fun, mnesia_frag). + +%% Returns {Storage, NFragments, ReplicaNodes} +table_storage(Tab) -> + case mnesia:table_info(branch, frag_properties) of + [] -> + NFO = 0, + NR = length(mnesia:table_info(Tab, ram_copies)), + ND = length(mnesia:table_info(Tab, disc_copies)), + NDO = length(mnesia:table_info(Tab, disc_only_copies)), + if + NR =/= 0 -> {ram_copies, NFO, NR}; + ND =/= 0 -> {disc_copies, NFO, ND}; + NDO =/= 0 -> {disc_copies, NFO, NDO} + end; + Props -> + {value, NFO} = lists:keysearch(n_fragments, 1, Props), + NR = table_info(Tab, n_ram_copies), + ND = table_info(Tab, n_disc_copies), + NDO = table_info(Tab, n_disc_only_copies), + if + NR =/= 0 -> {ram_copies, NFO, NR}; + ND =/= 0 -> {disc_copies, NFO, ND}; + NDO =/= 0 -> {disc_copies, NFO, NDO} + end + end. + +reporter_loop(State) -> + RC = State#reporter_state.run_config, + receive + {From, stop} -> + {ok, From, call_drivers(State, stop)}; + {'EXIT', Pid, Reason} when Pid =:= State#reporter_state.starter_pid -> + %% call_drivers(State, stop), + exit({starter_died, Pid, Reason}) + after RC#run_config.report_interval -> + Iters = State#reporter_state.n_iters, + State2 = State#reporter_state{n_iters = Iters + 1}, + case call_drivers(State2, report) of + State3 when State3#reporter_state.driver_pids =/= [] -> + State4 = State3#reporter_state{curr = #time{}}, + reporter_loop(State4); + _ -> + exit(drivers_died) + end + end. + +call_drivers(State, Msg) -> + Drivers = State#reporter_state.driver_pids, + lists:foreach(fun(Pid) -> Pid ! {self(), Msg} end, Drivers), + State2 = show_report(calc_reports(Drivers, State)), + case Msg =:= stop of + true -> + Acc = State2#reporter_state.acc, + Init = State2#reporter_state.init_micros, + show_report(State2#reporter_state{n_iters = 0, + curr = Acc, + prev_micros = Init}); + false -> + ignore + end, + State2. + +calc_reports([], State) -> + State; +calc_reports([Pid|Drivers], State) -> + receive + {'EXIT', P, Reason} when P =:= State#reporter_state.starter_pid -> + exit({starter_died, P, Reason}); + {'EXIT', Pid, Reason} -> + exit({driver_died, Pid, Reason}); + {Pid, Time} when is_record(Time, time) -> + %% io:format("~w: ~w~n", [Pid, Time]), + A = add_time(State#reporter_state.acc, Time), + C = add_time(State#reporter_state.curr, Time), + State2 = State#reporter_state{acc = A, curr = C}, + calc_reports(Drivers, State2) + end. + +add_time(Acc, New) -> + Acc#time{n_trans = New#time.n_trans + Acc#time.n_trans, + min_n = lists:min([New#time.n_trans, Acc#time.min_n] -- [0]), + max_n = lists:max([New#time.n_trans, Acc#time.max_n]), + acc_time = New#time.acc_time + Acc#time.acc_time, + max_time = lists:max([New#time.max_time, Acc#time.max_time])}. + +-define(AVOID_DIV_ZERO(_What_), try (_What_) catch _:_ -> 0 end). + +show_report(State) -> + Now = now_to_micros(erlang:now()), + Iters = State#reporter_state.n_iters, + Time = State#reporter_state.curr, + Max = Time#time.max_time, + N = Time#time.n_trans, + Avg = ?AVOID_DIV_ZERO(Time#time.acc_time div N), + AliveN = length(State#reporter_state.driver_pids), + Tps = ?AVOID_DIV_ZERO((?SECOND * AliveN) div Avg), + PrevTps= State#reporter_state.prev_tps, + {DiffSign, DiffTps} = signed_diff(Iters, Tps, PrevTps), + Unfairness = ?AVOID_DIV_ZERO(Time#time.max_n / Time#time.min_n), + BruttoAvg = ?AVOID_DIV_ZERO((Now - State#reporter_state.prev_micros) div N), +%% io:format("n_iters=~p, n_trans=~p, n_drivers=~p, avg=~p, now=~p, prev=~p~n", +%% [Iters, N, AliveN, BruttoAvg, Now, State#reporter_state.prev_micros]), + BruttoTps = ?AVOID_DIV_ZERO(?SECOND div BruttoAvg), + case Iters > 0 of + true -> + io:format("TPC-B: ~p iter ~s~p diff ~p (~p) tps ~p avg micros ~p max micros ~p unfairness~n", + [Iters, DiffSign, DiffTps, Tps, BruttoTps, Avg, Max, Unfairness]); + false -> + io:format("TPC-B: ~p (~p) transactions per second, " + "duration of longest transaction was ~p milliseconds~n", + [Tps, BruttoTps, Max div 1000]) + end, + State#reporter_state{prev_tps = Tps, prev_micros = Now}. + +signed_diff(Iters, Curr, Prev) -> + case Iters > 1 of + true -> sign(Curr - Prev); + false -> sign(0) + end. + +sign(N) when N > 0 -> {"+", N}; +sign(N) -> {"", N}. + +now_to_micros({Mega, Secs, Micros}) -> + DT = calendar:now_to_datetime({Mega, Secs, 0}), + S = calendar:datetime_to_gregorian_seconds(DT), + (S * ?SECOND) + Micros. + +start_drivers(RC, TC) -> + LastHistoryId = table_info(history, size), + Reuse = RC#run_config.reuse_history_id, + DS = #driver_state{tab_config = TC, + run_config = RC, + n_local_branches = 0, + local_branches = [], + history_id = LastHistoryId, + reuse_history_id = Reuse}, + Nodes = RC#run_config.driver_nodes, + NB = TC#tab_config.n_branches, + First = 0, + AllBranches = lists:seq(First, First + NB - 1), + ND = RC#run_config.n_drivers_per_node, + Spawn = fun(Spec) -> + Node = Spec#driver_state.driver_node, + spawn_link(Node, ?MODULE, driver_init, [Spec, AllBranches]) + end, + Specs = [DS#driver_state{driver_id = Id, driver_node = N} + || N <- Nodes, + Id <- lists:seq(1, ND)], + Specs2 = lists:sort(lists:flatten(Specs)), + {Specs3, OrphanBranches} = alloc_local_branches(AllBranches, Specs2, []), + case length(OrphanBranches) of + N when N =< 10 -> + io:format("TPC-B: Orphan branches: ~p~n", [OrphanBranches]); + N -> + io:format("TPC-B: Orphan branches: ~p~n", [N]) + end, + [Spawn(Spec) || Spec <- Specs3]. + +alloc_local_branches([BranchId | Tail], Specs, OrphanBranches) -> + Nodes = table_info({branch, BranchId}, where_to_write), + LocalSpecs = [DS || DS <- Specs, + lists:member(DS#driver_state.driver_node, Nodes)], + case lists:keysort(#driver_state.n_local_branches, LocalSpecs) of + [] -> + alloc_local_branches(Tail, Specs, [BranchId | OrphanBranches]); + [DS | _] -> + LocalNB = DS#driver_state.n_local_branches + 1, + LocalBranches = [BranchId | DS#driver_state.local_branches], + DS2 = DS#driver_state{n_local_branches = LocalNB, + local_branches = LocalBranches}, + Specs2 = Specs -- [DS], + Specs3 = [DS2 | Specs2], + alloc_local_branches(Tail, Specs3, OrphanBranches) + end; +alloc_local_branches([], Specs, OrphanBranches) -> + {Specs, OrphanBranches}. + +driver_init(DS, AllBranches) -> + Seed = erlang:now(), + DS2 = + if + DS#driver_state.n_local_branches =:= 0 -> + DS#driver_state{seed = Seed, + n_local_branches = length(AllBranches), + local_branches = AllBranches}; + true -> + DS#driver_state{seed = Seed} + end, + io:format("TPC-B: Driver ~p started as ~p on node ~p with ~p local branches~n", + [DS2#driver_state.driver_id, self(), node(), DS2#driver_state.n_local_branches]), + driver_loop(DS2). + +driver_loop(DS) -> + receive + {From, report} -> + From ! {self(), DS#driver_state.time}, + Acc = add_time(DS#driver_state.time, DS#driver_state.acc_time), + DS2 = DS#driver_state{time=#time{}, acc_time = Acc}, % Reset timer + DS3 = calc_trans(DS2), + driver_loop(DS3); + {From, stop} -> + Acc = add_time(DS#driver_state.time, DS#driver_state.acc_time), + io:format("TPC-B: Driver ~p (~p) on node ~p stopped: ~w~n", + [DS#driver_state.driver_id, self(), node(self()), Acc]), + From ! {self(), DS#driver_state.time}, + unlink(From), + exit(stopped) + after 0 -> + DS2 = calc_trans(DS), + driver_loop(DS2) + end. + +calc_trans(DS) -> + {Micros, DS2} = time_trans(DS), + Time = DS2#driver_state.time, + Time2 = Time#time{n_trans = Time#time.n_trans + 1, + acc_time = Time#time.acc_time + Micros, + max_time = lists:max([Micros, Time#time.max_time]) + }, + case DS#driver_state.reuse_history_id of + false -> + HistoryId = DS#driver_state.history_id + 1, + DS2#driver_state{time=Time2, history_id = HistoryId}; + true -> + DS2#driver_state{time=Time2} + end. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% Generate teller_id, account_id and delta +%% Time the TPC-B transaction +time_trans(DS) -> + OldSeed = get(random_seed), % Avoid interference with Mnesia + put(random_seed, DS#driver_state.seed), + Random = random:uniform(), + NewSeed = get(random_seed), + case OldSeed of + undefined -> erase(random_seed); + _ -> put(random_seed, OldSeed) + end, + + TC = DS#driver_state.tab_config, + RC = DS#driver_state.run_config, + {Branchid, Args} = random_to_args(Random, DS), + {Fun, Mod} = trans_type(TC, RC), + {Time, Res} = timer:tc(?MODULE, real_trans, [RC, Branchid, Fun, Args, Mod]), + + case Res of + AccountBal when is_integer(AccountBal) -> + {Time, DS#driver_state{seed = NewSeed}}; + Other -> + exit({crash, Other, Args, Random, DS}) + end. + +random_to_args(Random, DS) -> + DriverId = DS#driver_state.driver_id, + TC = DS#driver_state.tab_config, + HistoryId = DS#driver_state.history_id, + Delta = trunc(Random * 1999998) - 999999, % -999999 <= Delta <= +999999 + + Branches = DS#driver_state.local_branches, + NB = DS#driver_state.n_local_branches, + NT = TC#tab_config.n_tellers_per_branch, + NA = TC#tab_config.n_accounts_per_branch, + Tmp = trunc(Random * NB * NT), + BranchPos = (Tmp div NT) + 1, + BranchId = + case TC#tab_config.n_fragments of + 0 -> BranchPos - 1; + _ -> lists:nth(BranchPos, Branches) + end, + RelativeTellerId = Tmp div NT, + TellerId = (BranchId * NT) + RelativeTellerId, + {AccountBranchId, AccountId} = + if + Random >= 0.85, NB > 1 -> + %% Pick from a remote account + TmpAccountId= trunc(Random * (NB - 1) * NA), + TmpAccountBranchId = TmpAccountId div NA, + if + TmpAccountBranchId =:= BranchId -> + {TmpAccountBranchId + 1, TmpAccountId + NA}; + true -> + {TmpAccountBranchId, TmpAccountId} + end; + true -> + %% Pick from a local account + RelativeAccountId = trunc(Random * NA), + TmpAccountId = (BranchId * NA) + RelativeAccountId, + {BranchId, TmpAccountId} + end, + + {BranchId, [DriverId, BranchId, TellerId, AccountBranchId, AccountId, HistoryId, Delta]}. + +real_trans(RC, BranchId, Fun, Args, Mod) -> + Type = RC#run_config.activity_type, + case RC#run_config.spawn_near_branch of + false -> + mnesia:activity(Type, Fun, Args, Mod); + true -> + Node = table_info({branch, BranchId}, where_to_read), + case rpc:call(Node, mnesia, activity, [Type, Fun, Args, Mod]) of + {badrpc, Reason} -> exit(Reason); + Other -> Other + end + end. + +trans_type(TC, RC) -> + if + TC#tab_config.n_fragments =:= 0, + RC#run_config.use_sticky_locks =:= false -> + {fun add_delta/7, mnesia}; + TC#tab_config.n_fragments =:= 0, + RC#run_config.use_sticky_locks =:= true -> + {fun sticky_add_delta/7, mnesia}; + TC#tab_config.n_fragments > 0, + RC#run_config.use_sticky_locks =:= false -> + {fun frag_add_delta/7, mnesia_frag} + end. + +%% +%% Runs the TPC-B defined transaction and returns NewAccountBalance +%% + +add_delta(DriverId, BranchId, TellerId, _AccountBranchId, AccountId, HistoryId, Delta) -> + %% Grab write lock already when the record is read + + %% Add delta to branch balance + [B] = mnesia:read(branch, BranchId, write), + NewB = B#branch{balance = B#branch.balance + Delta}, + ok = mnesia:write(branch, NewB, write), + + %% Add delta to teller balance + [T] = mnesia:read(teller, TellerId, write), + NewT = T#teller{balance = T#teller.balance + Delta}, + ok = mnesia:write(teller, NewT, write), + + %% Add delta to account balance + [A] = mnesia:read(account, AccountId, write), + NewA = A#account{balance = A#account.balance + Delta}, + ok = mnesia:write(account, NewA, write), + + %% Append to history log + History = #history{history_id = {DriverId, HistoryId}, + account_id = AccountId, + teller_id = TellerId, + branch_id = BranchId, + amount = Delta + }, + ok = mnesia:write(history, History, write), + + %% Return account balance + NewA#account.balance. + +sticky_add_delta(DriverId, BranchId, TellerId, _AccountBranchId, AccountId, HistoryId, Delta) -> + %% Grab orinary read lock when the record is read + %% Grab sticky write lock when the record is written + %% This transaction would benefit of an early stick_write lock at read + + %% Add delta to branch balance + [B] = mnesia:read(branch, BranchId, read), + NewB = B#branch{balance = B#branch.balance + Delta}, + ok = mnesia:write(branch, NewB, sticky_write), + + %% Add delta to teller balance + [T] = mnesia:read(teller, TellerId, read), + NewT = T#teller{balance = T#teller.balance + Delta}, + ok = mnesia:write(teller, NewT, sticky_write), + + %% Add delta to account balance + [A] = mnesia:read(account, AccountId, read), + NewA = A#account{balance = A#account.balance + Delta}, + ok = mnesia:write(account, NewA, sticky_write), + + %% Append to history log + History = #history{history_id = {DriverId, HistoryId}, + account_id = AccountId, + teller_id = TellerId, + branch_id = BranchId, + amount = Delta + }, + ok = mnesia:write(history, History, sticky_write), + + %% Return account balance + NewA#account.balance. + +frag_add_delta(DriverId, BranchId, TellerId, AccountBranchId, AccountId, HistoryId, Delta) -> + %% Access fragmented table + %% Grab write lock already when the record is read + + %% Add delta to branch balance + [B] = mnesia:read(branch, BranchId, write), + NewB = B#branch{balance = B#branch.balance + Delta}, + ok = mnesia:write(NewB), + + %% Add delta to teller balance + [T] = mnesia:read({teller, BranchId}, TellerId, write), + NewT = T#teller{balance = T#teller.balance + Delta}, + ok = mnesia:write(NewT), + + %% Add delta to account balance + %%io:format("frag_add_delta(~p): ~p\n", [node(), {account, BranchId, AccountId}]), + [A] = mnesia:read({account, AccountBranchId}, AccountId, write), + NewA = A#account{balance = A#account.balance + Delta}, + ok = mnesia:write(NewA), + + %% Append to history log + History = #history{history_id = {DriverId, HistoryId}, + account_id = AccountId, + teller_id = TellerId, + branch_id = BranchId, + amount = Delta + }, + ok = mnesia:write(History), + + %% Return account balance + NewA#account.balance. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Verify table consistency + +verify_tabs() -> + Nodes = mnesia:system_info(running_db_nodes), + case lists:member(node(), Nodes) of + true -> + Tabs = [branch, teller, account, history], + io:format("TPC-B: Verifying tables: ~w~n", [Tabs]), + rpc:multicall(Nodes, mnesia, wait_for_tables, [Tabs, infinity]), + + Fun = fun() -> + mnesia:write_lock_table(branch), + mnesia:write_lock_table(teller), + mnesia:write_lock_table(account), + mnesia:write_lock_table(history), + {Res, BadNodes} = + rpc:multicall(Nodes, ?MODULE, count_balance, []), + check_balance(Res, BadNodes) + end, + case mnesia:transaction(Fun) of + {atomic, Res} -> Res; + {aborted, Reason} -> {error, Reason} + end; + false -> + {error, "Must be initiated from a running db_node"} + end. + +%% Returns a list of {Table, Node, Balance} tuples +%% Assumes that no updates are performed + +-record(summary, {table, node, balance, size}). + +count_balance() -> + [count_balance(branch, #branch.balance), + count_balance(teller, #teller.balance), + count_balance(account, #account.balance)]. + +count_balance(Tab, BalPos) -> + Frags = table_info(Tab, frag_names), + count_balance(Tab, Frags, 0, 0, BalPos). + +count_balance(Tab, [Frag | Frags], Bal, Size, BalPos) -> + First = mnesia:dirty_first(Frag), + {Bal2, Size2} = count_frag_balance(Frag, First, Bal, Size, BalPos), + count_balance(Tab, Frags, Bal2, Size2, BalPos); +count_balance(Tab, [], Bal, Size, _BalPos) -> + #summary{table = Tab, node = node(), balance = Bal, size = Size}. + +count_frag_balance(_Frag, '$end_of_table', Bal, Size, _BalPos) -> + {Bal, Size}; +count_frag_balance(Frag, Key, Bal, Size, BalPos) -> + [Record] = mnesia:dirty_read({Frag, Key}), + Bal2 = Bal + element(BalPos, Record), + Next = mnesia:dirty_next(Frag, Key), + count_frag_balance(Frag, Next, Bal2, Size + 1, BalPos). + +check_balance([], []) -> + mnesia:abort({"No balance"}); +check_balance(Summaries, []) -> + [One | Rest] = lists:flatten(Summaries), + Balance = One#summary.balance, + %% Size = One#summary.size, + case [S || S <- Rest, S#summary.balance =/= Balance] of + [] -> + ok; + BadSummaries -> + mnesia:abort({"Bad balance", One, BadSummaries}) + end; +check_balance(_, BadNodes) -> + mnesia:abort({"Bad nodes", BadNodes}). |