diff options
author | Kjell Winblad <[email protected]> | 2019-02-22 16:49:37 +0100 |
---|---|---|
committer | Kjell Winblad <[email protected]> | 2019-04-10 15:42:42 +0200 |
commit | c5e9766712436bea2b91bccd062f66a3ad1841bb (patch) | |
tree | 25ce2b16b9f847088997b0d4e77af46dbb6d2d66 /lib/stdlib | |
parent | 326c3cb70c1b37c794b781a42c50725766098810 (diff) | |
download | otp-c5e9766712436bea2b91bccd062f66a3ad1841bb.tar.gz otp-c5e9766712436bea2b91bccd062f66a3ad1841bb.tar.bz2 otp-c5e9766712436bea2b91bccd062f66a3ad1841bb.zip |
Decentralized counters for ETS ordered_set with write_concurrency
Previously, all ETS tables used centralized counter variables to keep
track of the number of items stored and the amount of memory
consumed. These counters can cause scalability problems (especially on
big NUMA systems). This commit adds an implementation of a
decentralized counter and modifies the implementation of ETS so that
ETS tables of type ordered_set with write_concurrency enabled use the
decentralized counter. [Experiments][1] indicate that this change
substantially improves the scalability of ETS ordered_set tables with
write_concurrency enabled in scenarios with frequent `ets:insert/2`
and `ets:delete/2` calls.
The new counter is implemented in the module erts_flxctr
(`erts_flxctr.h` and `erts_flxctr.c`). The module has the suffix
flxctr as it contains the implementation of a flexible counter (i.e.,
counter instances can be configured to be either centralized or
decentralized). Counters that are configured to be centralized are
implemented with a single counter variable which is modified with
atomic operations. Decentralized counters are spread over several
cache lines (how many can be configured with the parameter
`+dcg`). The scheduler threads are mapped to cache lines so that there
is no single point of contention when decentralized counters are
updated. The thread progress functionality of the Erlang VM is
utilized to implement support for linearizable snapshots of
decentralized counters. The snapshot functionality is used by the
`ets:info/1` and `ets:info/2` functions.
[1]: http://winsh.me/ets_catree_benchmark/flxctr_res.html
Diffstat (limited to 'lib/stdlib')
-rw-r--r-- | lib/stdlib/test/ets_SUITE.erl | 140 |
1 files changed, 138 insertions, 2 deletions
diff --git a/lib/stdlib/test/ets_SUITE.erl b/lib/stdlib/test/ets_SUITE.erl index 87ca9bd32c..cd32f25abd 100644 --- a/lib/stdlib/test/ets_SUITE.erl +++ b/lib/stdlib/test/ets_SUITE.erl @@ -42,6 +42,8 @@ select_bound_chunk/1, t_delete_all_objects/1, t_insert_list/1, t_test_ms/1, t_select_delete/1,t_select_replace/1,t_select_replace_next_bug/1,t_ets_dets/1]). +-export([test_table_size_concurrency/1,test_table_memory_concurrency/1, + test_delete_table_while_size_snapshot/1, test_delete_table_while_size_snapshot_helper/0]). -export([ordered/1, ordered_match/1, interface_equality/1, fixtable_next/1, fixtable_insert/1, rename/1, rename_unnamed/1, evil_rename/1, @@ -156,7 +158,11 @@ all() -> whereis_table, delete_unfix_race, test_throughput_benchmark, - {group, benchmark}]. + {group, benchmark}, + test_table_size_concurrency, + test_table_memory_concurrency, + test_delete_table_while_size_snapshot]. + groups() -> [{new, [], @@ -828,7 +834,11 @@ adjust_xmem([_T1,_T2,_T3,_T4], {A0,B0,C0,D0} = _Mem0, EstCnt) -> {TabSz, EstSz} = erts_debug:get_internal_state('DbTable_words'), HTabSz = TabSz + EstCnt*EstSz, - {A0+TabSz, B0+HTabSz, C0+HTabSz, D0+HTabSz}. + OrdSetExtra = case erlang:system_info(wordsize) of + 8 -> 40; % larger stack on 64 bit architectures + _ -> 0 + end, + {A0+TabSz+OrdSetExtra, B0+HTabSz, C0+HTabSz, D0+HTabSz}. %% Misc. whitebox tests t_whitebox(Config) when is_list(Config) -> @@ -4102,6 +4112,11 @@ slot_do(Opts) -> fill_tab(Tab,foo), Elts = ets:info(Tab,size), Elts = slot_loop(Tab,0,0), + case ets:info(Tab, type) of + ordered_set -> + '$end_of_table' = ets:slot(Tab,Elts); + _ -> ok + end, true = ets:delete(Tab), verify_etsmem(EtsMem). @@ -4453,6 +4468,127 @@ info_do(Opts) -> undefined = ets:info(non_existing_table_xxyy,safe_fixed), verify_etsmem(EtsMem). +size_loop(_T, 0, _, _) -> + ok; +size_loop(T, I, PrevSize, WhatToTest) -> + Size = ets:info(T, WhatToTest), + case Size < PrevSize of + true -> ct:fail("Bad ets:info/2"); + _ -> ok + end, + size_loop(T, I -1, Size, WhatToTest). + +add_loop(_T, 0) -> + ok; +add_loop(T, I) -> + ets:insert(T, {I}), + add_loop(T, I -1). + + +test_table_counter_concurrency(WhatToTest) -> + ItemsToAdd = 1000000, + SizeLoopSize = 1000, + T = ets:new(k, [public, ordered_set, {write_concurrency, true}]), + 0 = ets:info(T, size), + P = self(), + SpawnedSizeProcs = + [spawn(fun() -> + size_loop(T, SizeLoopSize, 0, WhatToTest), + P ! done + end) + || _ <- lists:seq(1, 6)], + spawn(fun() -> + add_loop(T, ItemsToAdd), + P ! done_add + end), + [receive + done -> ok; + done_add -> ok + end + || _ <- [ok|SpawnedSizeProcs]], + case WhatToTest =:= size of + true -> + ItemsToAdd = ets:info(T, size); + _ -> + ok + end, + ok. + +test_table_size_concurrency(Config) when is_list(Config) -> + test_table_counter_concurrency(size). + +test_table_memory_concurrency(Config) when is_list(Config) -> + test_table_counter_concurrency(memory). + +%% Tests that calling the ets:delete operation on a table T with +%% decentralized counters works while ets:info(T, size) operations are +%% active +test_delete_table_while_size_snapshot(Config) when is_list(Config) -> + %% Run test case in a slave node as other test suites in stdlib + %% depend on that pids are ordered in creation order which is no + %% longer the case when many processes have been started before + Node = start_slave(), + ok = rpc:call(Node, ?MODULE, test_delete_table_while_size_snapshot_helper, []), + test_server:stop_node(Node), + ok. + +test_delete_table_while_size_snapshot_helper()-> + TopParent = self(), + repeat_par( + fun() -> + Table = ets:new(t, [public, ordered_set, + {write_concurrency, true}]), + Parent = self(), + NrOfSizeProcs = 100, + Pids = [ spawn(fun()-> size_process(Table, Parent) end) + || _ <- lists:seq(1, NrOfSizeProcs)], + timer:sleep(1), + ets:delete(Table), + [receive + table_gone -> ok; + Problem -> TopParent ! Problem + end || _ <- Pids] + end, + 15000), + receive + Problem -> throw(Problem) + after 0 -> ok + end. + +size_process(Table, Parent) -> + try ets:info(Table, size) of + N when is_integer(N) -> + size_process(Table, Parent); + undefined -> Parent ! table_gone; + E -> Parent ! {got_unexpected, E} + catch + E -> Parent ! {got_unexpected_exception, E} + end. + +start_slave() -> + MicroSecs = erlang:monotonic_time(), + Name = "ets_" ++ integer_to_list(MicroSecs), + Pa = filename:dirname(code:which(?MODULE)), + {ok, Node} = test_server:start_node(list_to_atom(Name), slave, [{args, "-pa " ++ Pa}]), + Node. + +repeat_par(FunToRepeat, NrOfTimes) -> + repeat_par_help(FunToRepeat, NrOfTimes, NrOfTimes). + +repeat_par_help(_FunToRepeat, 0, OrgNrOfTimes) -> + repeat(fun()-> receive done -> ok end end, OrgNrOfTimes); +repeat_par_help(FunToRepeat, NrOfTimes, OrgNrOfTimes) -> + Parent = self(), + case NrOfTimes rem 5 of + 0 -> timer:sleep(1); + _ -> ok + end, + spawn(fun()-> + FunToRepeat(), + Parent ! done + end), + repeat_par_help(FunToRepeat, NrOfTimes-1, OrgNrOfTimes). + %% Test various duplicate_bags stuff. dups(Config) when is_list(Config) -> repeat_for_opts(fun dups_do/1). |