diff options
| author | Kjell Winblad <[email protected]> | 2019-04-30 14:41:16 +0200 | 
|---|---|---|
| committer | Kjell Winblad <[email protected]> | 2019-06-18 15:44:53 +0200 | 
| commit | f707dc058dc0aa880d6f2604acb7b420b082a69c (patch) | |
| tree | 6a791414fbb6375443f7e9a49c3a150ee3b583b6 /lib/stdlib/test | |
| parent | 6618ce7b6a621e92db72ea4f01f7d38553c8818c (diff) | |
| download | otp-f707dc058dc0aa880d6f2604acb7b420b082a69c.tar.gz otp-f707dc058dc0aa880d6f2604acb7b420b082a69c.tar.bz2 otp-f707dc058dc0aa880d6f2604acb7b420b082a69c.zip | |
ETS ordered_set: Improvements to the CA tree implementation
This commit only affects the implementation of ETS `ordered_set`
tables with the `write_concurrency` option enabled. Such tables are
implemented with a data structure that is called the contention
adapting search tree (CA tree). This commit introduces the following
changes:
* This commit causes a join to be triggered in one randomly selected
  base node in about one of 1000 read unlock calls for base node
  locks. No such joins happened before this commit. Before this
  commit, operations that only acquired looks in read-mode never
  triggered any contention adaptation. Therefore, the CA tree could
  get stuck in a sub-optimal state in certain scenarios. This could
  happen, for example, when a CA tree is first populated with parallel
  inserts (which will cause splits of base nodes) and then only
  read-only operations are applied to the data structure. Benchmark
  results from the
  `ets_SUITE:lookup_catree_par_vs_seq_init_benchmark/0` benchmark
  function (which is included in this commit) shows that this change
  can improve the throughput of the CA tree in the scenario described
  above.
* Read-only operations will now also increase values of statistics
  counters when they detect that they need to wait for other
  operations. Only write operation changed statistics counters before
  this commit. This improves the statistics that the adaptation
  heuristics is based on.
* Additionally, this commit adds an upper and lower limit to the
  contention statistics variables in the base nodes. Such limits did
  not exist before this commit. This should, for example, make the CA
  tree more responsive to contention after long periods of low
  contention.
Diffstat (limited to 'lib/stdlib/test')
| -rw-r--r-- | lib/stdlib/test/ets_SUITE.erl | 225 | 
1 files changed, 179 insertions, 46 deletions
| diff --git a/lib/stdlib/test/ets_SUITE.erl b/lib/stdlib/test/ets_SUITE.erl index dd49288417..09238ae2b4 100644 --- a/lib/stdlib/test/ets_SUITE.erl +++ b/lib/stdlib/test/ets_SUITE.erl @@ -75,7 +75,8 @@  -export([throughput_benchmark/0,           throughput_benchmark/1,           test_throughput_benchmark/1, -         long_throughput_benchmark/1]). +         long_throughput_benchmark/1, +         lookup_catree_par_vs_seq_init_benchmark/0]).  -export([exit_large_table_owner/1,  	 exit_many_large_table_owner/1,  	 exit_many_tables_owner/1, @@ -6728,6 +6729,14 @@ do_work(WorksDoneSoFar, Table, ProbHelpTab, Range, Operations) ->      end.  prefill_table(T, KeyRange, Num, ObjFun) -> +    Parent = self(), +    spawn_link(fun() -> +                       prefill_table_helper(T, KeyRange, Num, ObjFun), +                       Parent ! done +               end), +    receive done -> ok end. + +prefill_table_helper(T, KeyRange, Num, ObjFun) ->      Seed = rand:uniform(KeyRange),      %%io:format("prefill_table: Seed = ~p\n", [Seed]),      RState = unique_rand_start(KeyRange, Seed), @@ -6740,11 +6749,77 @@ prefill_table_loop(T, RS0, N, ObjFun) ->      ets:insert(T, ObjFun(Key)),      prefill_table_loop(T, RS1, N-1, ObjFun). +inserter_proc_starter(T, ToInsert, Parent) -> +    receive +        start -> ok +    end, +    inserter_proc(T, ToInsert, [], Parent, false). + +inserter_proc(T, [], Inserted, Parent, _) -> +    inserter_proc(T, Inserted, [], Parent, true); +inserter_proc(T, [I | ToInsert], Inserted, Parent, CanStop) -> +    Stop = +        case CanStop of +            true -> +                receive +                    stop -> Parent ! stopped +                after 0 -> no_stop +                end; +            false -> no_stop +        end, +    case Stop of +        no_stop -> +            ets:insert(T, I), +            inserter_proc(T, ToInsert, [I | Inserted], Parent, CanStop); +        _ -> ok +    end. + +prefill_table_parallel(T, KeyRange, Num, ObjFun) -> +    Parent = self(), +    spawn_link(fun() -> +                       prefill_table_parallel_helper(T, KeyRange, Num, ObjFun), +                       Parent ! done +               end), +    receive done -> ok end. + +prefill_table_parallel_helper(T, KeyRange, Num, ObjFun) -> +    NrOfSchedulers = erlang:system_info(schedulers), +    Seed = rand:uniform(KeyRange), +    %%io:format("prefill_table: Seed = ~p\n", [Seed]), +    RState = unique_rand_start(KeyRange, Seed), +    InsertMap = prefill_insert_map_loop(T, RState, Num, ObjFun, #{}, NrOfSchedulers), +    Self = self(), +    Pids = [ +        begin +            InserterFun = +                fun() -> +                    inserter_proc_starter(T, ToInsert, Self) +                end, +            spawn_link(InserterFun) +        end +        || ToInsert <- maps:values(InsertMap)], +    [Pid ! start || Pid <- Pids], +    timer:sleep(1000), +    [Pid ! stop || Pid <- Pids], +    [receive stopped -> ok end || _Pid <- Pids]. + +prefill_insert_map_loop(_, _, 0, _, InsertMap, _NrOfSchedulers) -> +    InsertMap; +prefill_insert_map_loop(T, RS0, N, ObjFun, InsertMap, NrOfSchedulers) -> +    {Key, RS1} = unique_rand_next(RS0), +    Sched = N rem NrOfSchedulers, +    PrevInserts = maps:get(Sched, InsertMap, []), +    NewPrevInserts = [ObjFun(Key) | PrevInserts], +    NewInsertMap = maps:put(Sched, NewPrevInserts, InsertMap), +    prefill_insert_map_loop(T, RS1, N-1, ObjFun, NewInsertMap, NrOfSchedulers). +  -record(ets_throughput_bench_config,          {benchmark_duration_ms = 3000,           recover_time_ms = 1000,           thread_counts = not_set,           key_ranges = [1000000], +         init_functions = [fun prefill_table/4], +         nr_of_repeats = 1,           scenarios =               [                [ @@ -6838,7 +6913,7 @@ prefill_table_loop(T, RS0, N, ObjFun) ->           notify_res_fun = fun(_Name, _Throughput) -> ok end,           print_result_paths_fun =               fun(ResultPath, _LatestResultPath) -> -                     Comment =  +                     Comment =                           io_lib:format("<a href=\"file:///~s\">Result visualization</a>",[ResultPath]),                       {comment, Comment}               end @@ -6848,7 +6923,7 @@ stdout_notify_res(ResultPath, LatestResultPath) ->      io:format("Result Location: /~s~n", [ResultPath]),      io:format("Latest Result Location: ~s~n", [LatestResultPath]). -throughput_benchmark() ->  +throughput_benchmark() ->      throughput_benchmark(        #ets_throughput_bench_config{           print_result_paths_fun = fun stdout_notify_res/2}). @@ -6856,9 +6931,11 @@ throughput_benchmark() ->  throughput_benchmark(    #ets_throughput_bench_config{       benchmark_duration_ms  = BenchmarkDurationMs, -     recover_time_ms        = RecoverTimeMs,  -     thread_counts          = ThreadCountsOpt,  -     key_ranges             = KeyRanges,  +     recover_time_ms        = RecoverTimeMs, +     thread_counts          = ThreadCountsOpt, +     key_ranges             = KeyRanges, +     init_functions         = InitFuns, +     nr_of_repeats          = NrOfRepeats,       scenarios              = Scenarios,       table_types            = TableTypes,       etsmem_fun             = ETSMemFun, @@ -6872,21 +6949,21 @@ throughput_benchmark(                  Start = rand:uniform(KeyRange),                  Last =                      lists:foldl( -                      fun(_, Prev) ->  +                      fun(_, Prev) ->                                case Prev of                                    '$end_of_table'-> ok;                                    _ ->                                        try ets:next(T, Prev) of                                             Normal -> Normal                                         catch -                                           error:badarg ->  +                                           error:badarg ->                                                 % sets (not ordered_sets) cannot handle when the argument                                                 % to next is not in the set                                                 rand:uniform(KeyRange)                                         end                                end                        end, -                      Start,  +                      Start,                        lists:seq(1, SeqSize)),                  case Last =:= -1 of                      true -> io:format("Will never be printed"); @@ -6898,26 +6975,26 @@ throughput_benchmark(                  Start = rand:uniform(KeyRange),                  Last = Start + SeqSize,                  case -1 =:= ets:select_count(T, -                                             ets:fun2ms(fun({X}) when X > Start andalso X =< Last  -> true end)) of   +                                             ets:fun2ms(fun({X}) when X > Start andalso X =< Last  -> true end)) of                      true -> io:format("Will never be printed");                      false -> ok                  end          end,      %% Mapping benchmark operation names to their corresponding functions that do them -    Operations =  +    Operations =          #{insert => -              fun(T,KeyRange) ->  +              fun(T,KeyRange) ->                        Num = rand:uniform(KeyRange),                        ets:insert(T, {Num})                end,            delete => -              fun(T,KeyRange) ->  +              fun(T,KeyRange) ->                        Num = rand:uniform(KeyRange),                        ets:delete(T, Num)                end,            lookup => -              fun(T,KeyRange) ->  +              fun(T,KeyRange) ->                        Num = rand:uniform(KeyRange),                        ets:lookup(T, Num)                end, @@ -6928,8 +7005,8 @@ throughput_benchmark(            nextseq1000 =>                fun(T,KeyRange) -> NextSeqOp(T,KeyRange,1000) end,            selectAll => -              fun(T,_KeyRange) ->  -                      case -1 =:= ets:select_count(T, ets:fun2ms(fun(_X) -> true end)) of   +              fun(T,_KeyRange) -> +                      case -1 =:= ets:select_count(T, ets:fun2ms(fun(_X) -> true end)) of                            true -> io:format("Will never be printed");                            false -> ok                        end @@ -6951,7 +7028,7 @@ throughput_benchmark(                  NewCurrent = Current + OpPropability,                  [{NewCurrent, OpName}| Calculate(Res, NewCurrent)]          end, -    RenderScenario =  +    RenderScenario =          fun R([], StringSoFar) ->                  StringSoFar;              R([{Fraction, Operation}], StringSoFar) -> @@ -6978,7 +7055,7 @@ throughput_benchmark(                      false -> ok                  end          end, -    DataHolder =  +    DataHolder =          fun DataHolderFun(Data)->                  receive                      {get_data, Pid} -> Pid ! {ets_bench_data, Data}; @@ -6992,18 +7069,21 @@ throughput_benchmark(                  DataHolderPid ! io_lib:format(Str, List)          end,      GetData = -        fun () ->  +        fun () ->                  DataHolderPid ! {get_data, self()},                  receive {ets_bench_data, Data} -> Data end          end,      %% Function that runs a benchmark instance and returns the number      %% of operations that were performed      RunBenchmark = -        fun({NrOfProcs, TableConfig, Scenario, Range, Duration}) -> +        fun({NrOfProcs, TableConfig, Scenario, Range, Duration, InitFun}) ->                  ProbHelpTab = CalculateOpsProbHelpTab(Scenario, 0),                  Table = ets:new(t, TableConfig),                  Nobj = Range div 2, -                prefill_table(Table, Range, Nobj, fun(K) -> {K} end), +                case InitFun of +                    not_set -> prefill_table(Table, Range, Nobj, fun(K) -> {K} end); +                    _ -> InitFun(Table, Range, Nobj, fun(K) -> {K} end) +                end,                  Nobj = ets:info(Table, size),                  SafeFixTableIfRequired(Table, Scenario, true),                  ParentPid = self(), @@ -7016,12 +7096,14 @@ throughput_benchmark(                      end,                  ChildPids =                      lists:map(fun(_N) ->spawn_link(Worker)end, lists:seq(1, NrOfProcs)), +                erlang:garbage_collect(), +                timer:sleep(RecoverTimeMs),                  lists:foreach(fun(Pid) -> Pid ! start end, ChildPids),                  timer:sleep(Duration),                  lists:foreach(fun(Pid) -> Pid ! stop end, ChildPids),                  TotalWorksDone = lists:foldl( -                                   fun(_, Sum) ->  -                                           receive  +                                   fun(_, Sum) -> +                                           receive                                                 Count -> Sum + Count                                             end                                     end, 0, ChildPids), @@ -7032,27 +7114,32 @@ throughput_benchmark(      RunBenchmarkInSepProcess =          fun(ParameterTuple) ->                  P = self(), -                spawn_link(fun()-> P ! {bench_result, RunBenchmark(ParameterTuple)} end), -                Result = receive {bench_result, Res} -> Res end, -                timer:sleep(RecoverTimeMs), -                Result +                Results = +                    [begin +                         spawn_link(fun()-> P ! {bench_result, RunBenchmark(ParameterTuple)} end), +                         receive {bench_result, Res} -> Res end +                     end || _ <- lists:seq(1, NrOfRepeats)], +                lists:sum(Results) / NrOfRepeats          end,      RunBenchmarkAndReport =          fun(ThreadCount,              TableType,              Scenario,              KeyRange, -            Duration) -> +            Duration, +            InitFunName, +            InitFun) ->                  Result = RunBenchmarkInSepProcess({ThreadCount,                                                     TableType,                                                     Scenario,                                                     KeyRange, -                                                   Duration}), +                                                   Duration, +                                                   InitFun}),                  Throughput = Result/(Duration/1000.0),                  PrintData("; ~f",[Throughput]), -                Name = io_lib:format("Scenario: ~w, Key Range Size: ~w, " +                Name = io_lib:format("Scenario: ~s, ~w, Key Range Size: ~w, "                                       "# of Processes: ~w, Table Type: ~w", -                                     [Scenario, KeyRange, ThreadCount, TableType]), +                                     [InitFunName, Scenario, KeyRange, ThreadCount, TableType]),                  NotifyResFun(Name, Throughput)          end,      ThreadCounts = @@ -7087,17 +7174,29 @@ throughput_benchmark(                          PrintData("$~n",[]),                          lists:foreach(                            fun(TableType) -> -                                  PrintData("~w ",[TableType]),                                    lists:foreach( -                                    fun(ThreadCount) -> -                                            RunBenchmarkAndReport(ThreadCount, -                                                                  TableType, -                                                                  Scenario, -                                                                  KeyRange, -                                                                  BenchmarkDurationMs) +                                    fun(InitFunArg) -> +                                            {InitFunName, InitFun} = +                                                case InitFunArg of +                                                    {FunName, Fun} -> {FunName, Fun}; +                                                    Fun -> {"", Fun} +                                                end, +                                            PrintData("~s,~w ",[InitFunName,TableType]), +                                            lists:foreach( +                                              fun(ThreadCount) -> +                                                      RunBenchmarkAndReport(ThreadCount, +                                                                            TableType, +                                                                            Scenario, +                                                                            KeyRange, +                                                                            BenchmarkDurationMs, +                                                                            InitFunName, +                                                                            InitFun) +                                              end, +                                              ThreadCounts), +                                            PrintData("$~n",[])                                      end, -                                    ThreadCounts), -                                  PrintData("$~n",[]) +                                    InitFuns) +                            end,                            TableTypes)                  end, @@ -7121,7 +7220,7 @@ throughput_benchmark(  test_throughput_benchmark(Config) when is_list(Config) ->      throughput_benchmark(        #ets_throughput_bench_config{ -         benchmark_duration_ms = 100,  +         benchmark_duration_ms = 100,           recover_time_ms = 0,           thread_counts = [1, erlang:system_info(schedulers)],           key_ranges = [50000], @@ -7136,7 +7235,7 @@ long_throughput_benchmark(Config) when is_list(Config) ->           recover_time_ms = 1000,           thread_counts = [1, N div 2, N],           key_ranges = [1000000], -         scenarios =  +         scenarios =               [                [                 {0.5, insert}, @@ -7171,15 +7270,15 @@ long_throughput_benchmark(Config) when is_list(Config) ->                 {0.01, partial_select1000}                ]               ], -         table_types =  +         table_types =               [                [ordered_set, public, {write_concurrency, true}, {read_concurrency, true}],                [set, public, {write_concurrency, true}, {read_concurrency, true}]               ],           etsmem_fun = fun etsmem/0,           verify_etsmem_fun = fun verify_etsmem/1, -         notify_res_fun =  -             fun(Name, Throughput) ->  +         notify_res_fun = +             fun(Name, Throughput) ->                       SummaryTable =                           proplists:get_value(ets_benchmark_result_summary_tab, Config),                       AddToSummaryCounter = @@ -7209,13 +7308,47 @@ long_throughput_benchmark(Config) when is_list(Config) ->                                      total_throughput_ordered_set)                       end,                       ct_event:notify( -                          #event{name = benchmark_data,  +                          #event{name = benchmark_data,                                   data = [{suite,"ets_bench"},                                           {name, Name},                                           {value,Throughput}]})               end          }). +%% This function compares the lookup operation's performance for +%% ordered_set ETS tables with and without write_concurrency enabled +%% when the data structures have been populated in parallel and +%% sequentially. +%% +%% The main purpose of this function is to check that the +%% implementation of ordered_set with write_concurrency (CA tree) +%% adapts its structure to contention even when only lookup operations +%% are used. +lookup_catree_par_vs_seq_init_benchmark() -> +    N = erlang:system_info(schedulers), +    throughput_benchmark( +      #ets_throughput_bench_config{ +         benchmark_duration_ms = 600000, +         recover_time_ms = 1000, +         thread_counts = [1, N div 2, N], +         key_ranges = [1000000], +         init_functions = [{"seq_init", fun prefill_table/4}, +                           {"par_init", fun prefill_table_parallel/4}], +         nr_of_repeats = 1, +         scenarios = +             [ +              [ +               {1.0, lookup} +              ] +             ], +         table_types = +             [ +              [ordered_set, public, {write_concurrency, true}], +              [ordered_set, public] +             ], +          print_result_paths_fun = fun stdout_notify_res/2 +        }). +  add_lists(L1,L2) ->      add_lists(L1,L2,[]).  add_lists([],[],Acc) -> | 
