diff options
author | Kjell Winblad <[email protected]> | 2019-04-30 14:41:16 +0200 |
---|---|---|
committer | Kjell Winblad <[email protected]> | 2019-06-18 15:44:53 +0200 |
commit | f707dc058dc0aa880d6f2604acb7b420b082a69c (patch) | |
tree | 6a791414fbb6375443f7e9a49c3a150ee3b583b6 /lib | |
parent | 6618ce7b6a621e92db72ea4f01f7d38553c8818c (diff) | |
download | otp-f707dc058dc0aa880d6f2604acb7b420b082a69c.tar.gz otp-f707dc058dc0aa880d6f2604acb7b420b082a69c.tar.bz2 otp-f707dc058dc0aa880d6f2604acb7b420b082a69c.zip |
ETS ordered_set: Improvements to the CA tree implementation
This commit only affects the implementation of ETS `ordered_set`
tables with the `write_concurrency` option enabled. Such tables are
implemented with a data structure that is called the contention
adapting search tree (CA tree). This commit introduces the following
changes:
* This commit causes a join to be triggered in one randomly selected
base node in about one of 1000 read unlock calls for base node
locks. No such joins happened before this commit. Before this
commit, operations that only acquired looks in read-mode never
triggered any contention adaptation. Therefore, the CA tree could
get stuck in a sub-optimal state in certain scenarios. This could
happen, for example, when a CA tree is first populated with parallel
inserts (which will cause splits of base nodes) and then only
read-only operations are applied to the data structure. Benchmark
results from the
`ets_SUITE:lookup_catree_par_vs_seq_init_benchmark/0` benchmark
function (which is included in this commit) shows that this change
can improve the throughput of the CA tree in the scenario described
above.
* Read-only operations will now also increase values of statistics
counters when they detect that they need to wait for other
operations. Only write operation changed statistics counters before
this commit. This improves the statistics that the adaptation
heuristics is based on.
* Additionally, this commit adds an upper and lower limit to the
contention statistics variables in the base nodes. Such limits did
not exist before this commit. This should, for example, make the CA
tree more responsive to contention after long periods of low
contention.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/stdlib/test/ets_SUITE.erl | 225 |
1 files changed, 179 insertions, 46 deletions
diff --git a/lib/stdlib/test/ets_SUITE.erl b/lib/stdlib/test/ets_SUITE.erl index dd49288417..09238ae2b4 100644 --- a/lib/stdlib/test/ets_SUITE.erl +++ b/lib/stdlib/test/ets_SUITE.erl @@ -75,7 +75,8 @@ -export([throughput_benchmark/0, throughput_benchmark/1, test_throughput_benchmark/1, - long_throughput_benchmark/1]). + long_throughput_benchmark/1, + lookup_catree_par_vs_seq_init_benchmark/0]). -export([exit_large_table_owner/1, exit_many_large_table_owner/1, exit_many_tables_owner/1, @@ -6728,6 +6729,14 @@ do_work(WorksDoneSoFar, Table, ProbHelpTab, Range, Operations) -> end. prefill_table(T, KeyRange, Num, ObjFun) -> + Parent = self(), + spawn_link(fun() -> + prefill_table_helper(T, KeyRange, Num, ObjFun), + Parent ! done + end), + receive done -> ok end. + +prefill_table_helper(T, KeyRange, Num, ObjFun) -> Seed = rand:uniform(KeyRange), %%io:format("prefill_table: Seed = ~p\n", [Seed]), RState = unique_rand_start(KeyRange, Seed), @@ -6740,11 +6749,77 @@ prefill_table_loop(T, RS0, N, ObjFun) -> ets:insert(T, ObjFun(Key)), prefill_table_loop(T, RS1, N-1, ObjFun). +inserter_proc_starter(T, ToInsert, Parent) -> + receive + start -> ok + end, + inserter_proc(T, ToInsert, [], Parent, false). + +inserter_proc(T, [], Inserted, Parent, _) -> + inserter_proc(T, Inserted, [], Parent, true); +inserter_proc(T, [I | ToInsert], Inserted, Parent, CanStop) -> + Stop = + case CanStop of + true -> + receive + stop -> Parent ! stopped + after 0 -> no_stop + end; + false -> no_stop + end, + case Stop of + no_stop -> + ets:insert(T, I), + inserter_proc(T, ToInsert, [I | Inserted], Parent, CanStop); + _ -> ok + end. + +prefill_table_parallel(T, KeyRange, Num, ObjFun) -> + Parent = self(), + spawn_link(fun() -> + prefill_table_parallel_helper(T, KeyRange, Num, ObjFun), + Parent ! done + end), + receive done -> ok end. + +prefill_table_parallel_helper(T, KeyRange, Num, ObjFun) -> + NrOfSchedulers = erlang:system_info(schedulers), + Seed = rand:uniform(KeyRange), + %%io:format("prefill_table: Seed = ~p\n", [Seed]), + RState = unique_rand_start(KeyRange, Seed), + InsertMap = prefill_insert_map_loop(T, RState, Num, ObjFun, #{}, NrOfSchedulers), + Self = self(), + Pids = [ + begin + InserterFun = + fun() -> + inserter_proc_starter(T, ToInsert, Self) + end, + spawn_link(InserterFun) + end + || ToInsert <- maps:values(InsertMap)], + [Pid ! start || Pid <- Pids], + timer:sleep(1000), + [Pid ! stop || Pid <- Pids], + [receive stopped -> ok end || _Pid <- Pids]. + +prefill_insert_map_loop(_, _, 0, _, InsertMap, _NrOfSchedulers) -> + InsertMap; +prefill_insert_map_loop(T, RS0, N, ObjFun, InsertMap, NrOfSchedulers) -> + {Key, RS1} = unique_rand_next(RS0), + Sched = N rem NrOfSchedulers, + PrevInserts = maps:get(Sched, InsertMap, []), + NewPrevInserts = [ObjFun(Key) | PrevInserts], + NewInsertMap = maps:put(Sched, NewPrevInserts, InsertMap), + prefill_insert_map_loop(T, RS1, N-1, ObjFun, NewInsertMap, NrOfSchedulers). + -record(ets_throughput_bench_config, {benchmark_duration_ms = 3000, recover_time_ms = 1000, thread_counts = not_set, key_ranges = [1000000], + init_functions = [fun prefill_table/4], + nr_of_repeats = 1, scenarios = [ [ @@ -6838,7 +6913,7 @@ prefill_table_loop(T, RS0, N, ObjFun) -> notify_res_fun = fun(_Name, _Throughput) -> ok end, print_result_paths_fun = fun(ResultPath, _LatestResultPath) -> - Comment = + Comment = io_lib:format("<a href=\"file:///~s\">Result visualization</a>",[ResultPath]), {comment, Comment} end @@ -6848,7 +6923,7 @@ stdout_notify_res(ResultPath, LatestResultPath) -> io:format("Result Location: /~s~n", [ResultPath]), io:format("Latest Result Location: ~s~n", [LatestResultPath]). -throughput_benchmark() -> +throughput_benchmark() -> throughput_benchmark( #ets_throughput_bench_config{ print_result_paths_fun = fun stdout_notify_res/2}). @@ -6856,9 +6931,11 @@ throughput_benchmark() -> throughput_benchmark( #ets_throughput_bench_config{ benchmark_duration_ms = BenchmarkDurationMs, - recover_time_ms = RecoverTimeMs, - thread_counts = ThreadCountsOpt, - key_ranges = KeyRanges, + recover_time_ms = RecoverTimeMs, + thread_counts = ThreadCountsOpt, + key_ranges = KeyRanges, + init_functions = InitFuns, + nr_of_repeats = NrOfRepeats, scenarios = Scenarios, table_types = TableTypes, etsmem_fun = ETSMemFun, @@ -6872,21 +6949,21 @@ throughput_benchmark( Start = rand:uniform(KeyRange), Last = lists:foldl( - fun(_, Prev) -> + fun(_, Prev) -> case Prev of '$end_of_table'-> ok; _ -> try ets:next(T, Prev) of Normal -> Normal catch - error:badarg -> + error:badarg -> % sets (not ordered_sets) cannot handle when the argument % to next is not in the set rand:uniform(KeyRange) end end end, - Start, + Start, lists:seq(1, SeqSize)), case Last =:= -1 of true -> io:format("Will never be printed"); @@ -6898,26 +6975,26 @@ throughput_benchmark( Start = rand:uniform(KeyRange), Last = Start + SeqSize, case -1 =:= ets:select_count(T, - ets:fun2ms(fun({X}) when X > Start andalso X =< Last -> true end)) of + ets:fun2ms(fun({X}) when X > Start andalso X =< Last -> true end)) of true -> io:format("Will never be printed"); false -> ok end end, %% Mapping benchmark operation names to their corresponding functions that do them - Operations = + Operations = #{insert => - fun(T,KeyRange) -> + fun(T,KeyRange) -> Num = rand:uniform(KeyRange), ets:insert(T, {Num}) end, delete => - fun(T,KeyRange) -> + fun(T,KeyRange) -> Num = rand:uniform(KeyRange), ets:delete(T, Num) end, lookup => - fun(T,KeyRange) -> + fun(T,KeyRange) -> Num = rand:uniform(KeyRange), ets:lookup(T, Num) end, @@ -6928,8 +7005,8 @@ throughput_benchmark( nextseq1000 => fun(T,KeyRange) -> NextSeqOp(T,KeyRange,1000) end, selectAll => - fun(T,_KeyRange) -> - case -1 =:= ets:select_count(T, ets:fun2ms(fun(_X) -> true end)) of + fun(T,_KeyRange) -> + case -1 =:= ets:select_count(T, ets:fun2ms(fun(_X) -> true end)) of true -> io:format("Will never be printed"); false -> ok end @@ -6951,7 +7028,7 @@ throughput_benchmark( NewCurrent = Current + OpPropability, [{NewCurrent, OpName}| Calculate(Res, NewCurrent)] end, - RenderScenario = + RenderScenario = fun R([], StringSoFar) -> StringSoFar; R([{Fraction, Operation}], StringSoFar) -> @@ -6978,7 +7055,7 @@ throughput_benchmark( false -> ok end end, - DataHolder = + DataHolder = fun DataHolderFun(Data)-> receive {get_data, Pid} -> Pid ! {ets_bench_data, Data}; @@ -6992,18 +7069,21 @@ throughput_benchmark( DataHolderPid ! io_lib:format(Str, List) end, GetData = - fun () -> + fun () -> DataHolderPid ! {get_data, self()}, receive {ets_bench_data, Data} -> Data end end, %% Function that runs a benchmark instance and returns the number %% of operations that were performed RunBenchmark = - fun({NrOfProcs, TableConfig, Scenario, Range, Duration}) -> + fun({NrOfProcs, TableConfig, Scenario, Range, Duration, InitFun}) -> ProbHelpTab = CalculateOpsProbHelpTab(Scenario, 0), Table = ets:new(t, TableConfig), Nobj = Range div 2, - prefill_table(Table, Range, Nobj, fun(K) -> {K} end), + case InitFun of + not_set -> prefill_table(Table, Range, Nobj, fun(K) -> {K} end); + _ -> InitFun(Table, Range, Nobj, fun(K) -> {K} end) + end, Nobj = ets:info(Table, size), SafeFixTableIfRequired(Table, Scenario, true), ParentPid = self(), @@ -7016,12 +7096,14 @@ throughput_benchmark( end, ChildPids = lists:map(fun(_N) ->spawn_link(Worker)end, lists:seq(1, NrOfProcs)), + erlang:garbage_collect(), + timer:sleep(RecoverTimeMs), lists:foreach(fun(Pid) -> Pid ! start end, ChildPids), timer:sleep(Duration), lists:foreach(fun(Pid) -> Pid ! stop end, ChildPids), TotalWorksDone = lists:foldl( - fun(_, Sum) -> - receive + fun(_, Sum) -> + receive Count -> Sum + Count end end, 0, ChildPids), @@ -7032,27 +7114,32 @@ throughput_benchmark( RunBenchmarkInSepProcess = fun(ParameterTuple) -> P = self(), - spawn_link(fun()-> P ! {bench_result, RunBenchmark(ParameterTuple)} end), - Result = receive {bench_result, Res} -> Res end, - timer:sleep(RecoverTimeMs), - Result + Results = + [begin + spawn_link(fun()-> P ! {bench_result, RunBenchmark(ParameterTuple)} end), + receive {bench_result, Res} -> Res end + end || _ <- lists:seq(1, NrOfRepeats)], + lists:sum(Results) / NrOfRepeats end, RunBenchmarkAndReport = fun(ThreadCount, TableType, Scenario, KeyRange, - Duration) -> + Duration, + InitFunName, + InitFun) -> Result = RunBenchmarkInSepProcess({ThreadCount, TableType, Scenario, KeyRange, - Duration}), + Duration, + InitFun}), Throughput = Result/(Duration/1000.0), PrintData("; ~f",[Throughput]), - Name = io_lib:format("Scenario: ~w, Key Range Size: ~w, " + Name = io_lib:format("Scenario: ~s, ~w, Key Range Size: ~w, " "# of Processes: ~w, Table Type: ~w", - [Scenario, KeyRange, ThreadCount, TableType]), + [InitFunName, Scenario, KeyRange, ThreadCount, TableType]), NotifyResFun(Name, Throughput) end, ThreadCounts = @@ -7087,17 +7174,29 @@ throughput_benchmark( PrintData("$~n",[]), lists:foreach( fun(TableType) -> - PrintData("~w ",[TableType]), lists:foreach( - fun(ThreadCount) -> - RunBenchmarkAndReport(ThreadCount, - TableType, - Scenario, - KeyRange, - BenchmarkDurationMs) + fun(InitFunArg) -> + {InitFunName, InitFun} = + case InitFunArg of + {FunName, Fun} -> {FunName, Fun}; + Fun -> {"", Fun} + end, + PrintData("~s,~w ",[InitFunName,TableType]), + lists:foreach( + fun(ThreadCount) -> + RunBenchmarkAndReport(ThreadCount, + TableType, + Scenario, + KeyRange, + BenchmarkDurationMs, + InitFunName, + InitFun) + end, + ThreadCounts), + PrintData("$~n",[]) end, - ThreadCounts), - PrintData("$~n",[]) + InitFuns) + end, TableTypes) end, @@ -7121,7 +7220,7 @@ throughput_benchmark( test_throughput_benchmark(Config) when is_list(Config) -> throughput_benchmark( #ets_throughput_bench_config{ - benchmark_duration_ms = 100, + benchmark_duration_ms = 100, recover_time_ms = 0, thread_counts = [1, erlang:system_info(schedulers)], key_ranges = [50000], @@ -7136,7 +7235,7 @@ long_throughput_benchmark(Config) when is_list(Config) -> recover_time_ms = 1000, thread_counts = [1, N div 2, N], key_ranges = [1000000], - scenarios = + scenarios = [ [ {0.5, insert}, @@ -7171,15 +7270,15 @@ long_throughput_benchmark(Config) when is_list(Config) -> {0.01, partial_select1000} ] ], - table_types = + table_types = [ [ordered_set, public, {write_concurrency, true}, {read_concurrency, true}], [set, public, {write_concurrency, true}, {read_concurrency, true}] ], etsmem_fun = fun etsmem/0, verify_etsmem_fun = fun verify_etsmem/1, - notify_res_fun = - fun(Name, Throughput) -> + notify_res_fun = + fun(Name, Throughput) -> SummaryTable = proplists:get_value(ets_benchmark_result_summary_tab, Config), AddToSummaryCounter = @@ -7209,13 +7308,47 @@ long_throughput_benchmark(Config) when is_list(Config) -> total_throughput_ordered_set) end, ct_event:notify( - #event{name = benchmark_data, + #event{name = benchmark_data, data = [{suite,"ets_bench"}, {name, Name}, {value,Throughput}]}) end }). +%% This function compares the lookup operation's performance for +%% ordered_set ETS tables with and without write_concurrency enabled +%% when the data structures have been populated in parallel and +%% sequentially. +%% +%% The main purpose of this function is to check that the +%% implementation of ordered_set with write_concurrency (CA tree) +%% adapts its structure to contention even when only lookup operations +%% are used. +lookup_catree_par_vs_seq_init_benchmark() -> + N = erlang:system_info(schedulers), + throughput_benchmark( + #ets_throughput_bench_config{ + benchmark_duration_ms = 600000, + recover_time_ms = 1000, + thread_counts = [1, N div 2, N], + key_ranges = [1000000], + init_functions = [{"seq_init", fun prefill_table/4}, + {"par_init", fun prefill_table_parallel/4}], + nr_of_repeats = 1, + scenarios = + [ + [ + {1.0, lookup} + ] + ], + table_types = + [ + [ordered_set, public, {write_concurrency, true}], + [ordered_set, public] + ], + print_result_paths_fun = fun stdout_notify_res/2 + }). + add_lists(L1,L2) -> add_lists(L1,L2,[]). add_lists([],[],Acc) -> |