diff options
author | Sverker Eriksson <[email protected]> | 2018-11-21 15:54:17 +0100 |
---|---|---|
committer | Sverker Eriksson <[email protected]> | 2018-11-21 15:54:17 +0100 |
commit | f504781a136b1992a1cee26a7da841892d796d91 (patch) | |
tree | e0b5268ab1cfdcf229154fb316200fbd59a9782e | |
parent | fefb5d039e87ff7137e78b3d5f2eaf01e498ec4d (diff) | |
download | otp-f504781a136b1992a1cee26a7da841892d796d91.tar.gz otp-f504781a136b1992a1cee26a7da841892d796d91.tar.bz2 otp-f504781a136b1992a1cee26a7da841892d796d91.zip |
erts: Add counters:put/3
-rw-r--r-- | erts/doc/src/counters.xml | 36 | ||||
-rw-r--r-- | erts/emulator/beam/bif.tab | 1 | ||||
-rw-r--r-- | erts/emulator/beam/erl_bif_counters.c | 50 | ||||
-rw-r--r-- | erts/emulator/test/counters_SUITE.erl | 81 | ||||
-rw-r--r-- | erts/preloaded/ebin/counters.beam | bin | 2808 -> 3140 bytes | |||
-rw-r--r-- | erts/preloaded/ebin/erts_internal.beam | bin | 17508 -> 17632 bytes | |||
-rw-r--r-- | erts/preloaded/src/counters.erl | 14 | ||||
-rw-r--r-- | erts/preloaded/src/erts_internal.erl | 6 |
8 files changed, 168 insertions, 20 deletions
diff --git a/erts/doc/src/counters.xml b/erts/doc/src/counters.xml index 85eedfdadc..ba4a22759f 100644 --- a/erts/doc/src/counters.xml +++ b/erts/doc/src/counters.xml @@ -85,14 +85,22 @@ </item> <tag><c>write_concurrency</c></tag> <item><p>This is an optimization to achieve very efficient concurrent - write operations at the expense of potential read inconsistency and memory - consumption per counter.</p> + <seealso marker="#add/3"><c>add</c></seealso> and <seealso + marker="#sub/3"><c>sub</c></seealso> operations at the expense of potential read + inconsistency and memory consumption per counter.</p> <p>Read operations may see sequentially inconsistent results with regard to concurrent write operations. Even if write operation A is done sequencially before write operation B, a concurrent reader may see any combination of A and B, including only B. A read operation is only guaranteed to see all writes done sequentially before the read. No writes are ever lost, but will eventually all be seen.</p> + <p>The typical use case for <c>write_concurrency</c> is when + concurrent calls to <seealso marker="#add/3"><c>add</c></seealso> and + <seealso marker="#sub/3"><c>sub</c></seealso> toward the same counters + are very frequent, while calls to <seealso marker="#get/2"><c>get</c> + </seealso> and <seealso marker="#put/3"><c>put</c></seealso> are much + less frequent. The lack of absolute read consistency must also be + acceptable.</p> </item> </taglist> </desc> @@ -110,7 +118,8 @@ <name name="add" arity="3"/> <fsummary>Add to counter</fsummary> <desc> - <p>Add <c><anno>Incr</anno></c> to counter.</p> + <p>Add <c><anno>Incr</anno></c> to counter at index + <c><anno>Ix</anno></c>.</p> </desc> </func> @@ -118,7 +127,26 @@ <name name="sub" arity="3"/> <fsummary>Subtract from counter</fsummary> <desc> - <p>Subtract <c><anno>Decr</anno></c> from counter.</p> + <p>Subtract <c><anno>Decr</anno></c> from counter at index + <c><anno>Ix</anno></c>.</p> + </desc> + </func> + + <func> + <name name="put" arity="3"/> + <fsummary>Set counter to value</fsummary> + <desc> + <p>Write <c><anno>Value</anno></c> to counter at index + <c><anno>Ix</anno></c>.</p> + <note> + <p>Despite its name, the <c>write_concurrency</c> optimization does not + improve <c>put</c>. A call to <c>put</c> is a relative heavy + operation compared to the very lightweight and scalable <seealso + marker="#add/3"><c>add</c></seealso> and <seealso marker="#sub/3"> + <c>sub</c></seealso>. The cost for a <c>put</c> with + <c>write_concurrency</c> is lika a <seealso marker="#get/2"><c>get</c> + </seealso> plus a <c>put</c> without <c>write_concurrency</c>.</p> + </note> </desc> </func> diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index aa3c3acd9f..d4ba90a61a 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -723,4 +723,5 @@ bif atomics:info/1 bif erts_internal:counters_new/1 bif erts_internal:counters_get/2 bif erts_internal:counters_add/3 +bif erts_internal:counters_put/3 bif erts_internal:counters_info/1 diff --git a/erts/emulator/beam/erl_bif_counters.c b/erts/emulator/beam/erl_bif_counters.c index a46b462225..001e643857 100644 --- a/erts/emulator/beam/erl_bif_counters.c +++ b/erts/emulator/beam/erl_bif_counters.c @@ -19,7 +19,7 @@ */ /* - * Purpose: High performance atomics. + * Purpose: The implementation for 'counters' with 'write_concurrency'. */ #ifdef HAVE_CONFIG_H @@ -37,6 +37,15 @@ #include "erl_bif_unique.h" #include "erl_map.h" +/* + * Each logical counter consists of one 64-bit atomic instance per scheduler + * plus one instance for the "base value". + * + * get() reads all atomics for the counter and return the sum. + * add() reads and writes only its own scheduler specific atomic instance. + * put() reads all scheduler specific atomics and writes a new base value. + */ +#define ATOMICS_PER_COUNTER (erts_no_schedulers + 1) #define COUNTERS_PER_CACHE_LINE (ERTS_CACHE_LINE_SIZE / sizeof(erts_atomic64_t)) @@ -52,7 +61,7 @@ typedef struct } u[1]; }CountersRef; -static int counters_destructor(Binary *unused) +static int counters_destructor(Binary *mbin) { return 1; } @@ -76,10 +85,10 @@ BIF_RETTYPE erts_internal_counters_new_1(BIF_ALIST_1) BIF_ERROR(BIF_P, BADARG); } - if (cnt > (ERTS_UWORD_MAX / (sizeof(erts_atomic64_t)*2*erts_no_schedulers))) + if (cnt > (ERTS_UWORD_MAX / (sizeof(erts_atomic64_t)*2*ATOMICS_PER_COUNTER))) BIF_ERROR(BIF_P, SYSTEM_LIMIT); - cache_lines = erts_no_schedulers * div_ceil(cnt, COUNTERS_PER_CACHE_LINE); + cache_lines = ATOMICS_PER_COUNTER * div_ceil(cnt, COUNTERS_PER_CACHE_LINE); bytes = offsetof(CountersRef, u) + cache_lines * ERTS_CACHE_LINE_SIZE; mbin = erts_create_magic_binary_x(bytes, counters_destructor, @@ -87,6 +96,7 @@ BIF_RETTYPE erts_internal_counters_new_1(BIF_ALIST_1) 0); p = ERTS_MAGIC_BIN_DATA(mbin); p->arity = cnt; + #ifdef DEBUG p->ulen = cache_lines; #endif @@ -120,7 +130,7 @@ static ERTS_INLINE int get_ref_cnt(Eterm ref, Eterm index, UWord ix, ui, vi; if (!get_ref(ref, &p) || !term_to_UWord(index, &ix) || --ix >= p->arity) return 0; - ui = (ix / COUNTERS_PER_CACHE_LINE) * erts_no_schedulers + sched_ix; + ui = (ix / COUNTERS_PER_CACHE_LINE) * ATOMICS_PER_COUNTER; vi = ix % COUNTERS_PER_CACHE_LINE; ASSERT(ui < p->ulen); *pp = p; @@ -134,7 +144,8 @@ static ERTS_INLINE int get_ref_my_cnt(Eterm ref, Eterm index, { ErtsSchedulerData *esdp = erts_get_scheduler_data(); ASSERT(esdp && !ERTS_SCHEDULER_IS_DIRTY(esdp)); - return get_ref_cnt(ref, index, pp, app, esdp->no - 1); + ASSERT(esdp->no > 0 && esdp->no < ATOMICS_PER_COUNTER); + return get_ref_cnt(ref, index, pp, app, esdp->no); } static ERTS_INLINE int get_ref_first_cnt(Eterm ref, Eterm index, @@ -172,7 +183,7 @@ BIF_RETTYPE erts_internal_counters_get_2(BIF_ALIST_2) if (!get_ref_first_cnt(BIF_ARG_1, BIF_ARG_2, &p, &ap)) { BIF_ERROR(BIF_P, BADARG); } - for (j = erts_no_schedulers; j ; --j) { + for (j = ATOMICS_PER_COUNTER; j ; --j) { acc += erts_atomic64_read_nob(ap); ap = (erts_atomic64_t*) ((byte*)ap + ERTS_CACHE_LINE_SIZE); } @@ -194,6 +205,31 @@ BIF_RETTYPE erts_internal_counters_add_3(BIF_ALIST_3) return am_ok; } +BIF_RETTYPE erts_internal_counters_put_3(BIF_ALIST_3) +{ + CountersRef* p; + erts_atomic64_t* first_ap; + erts_atomic64_t* ap; + erts_aint64_t acc; + erts_aint64_t val; + int j; + + if (!get_ref_first_cnt(BIF_ARG_1, BIF_ARG_2, &p, &first_ap) + || !term_to_Sint64(BIF_ARG_3, &val)) { + BIF_ERROR(BIF_P, BADARG); + } + + ap = first_ap; + acc = 0; + j = ATOMICS_PER_COUNTER - 1; + do { + ap = (erts_atomic64_t*) ((byte*)ap + ERTS_CACHE_LINE_SIZE); + acc += erts_atomic64_read_nob(ap); + } while (--j); + erts_atomic64_set_nob(first_ap, val-acc); + + return am_ok; +} BIF_RETTYPE erts_internal_counters_info_1(BIF_ALIST_1) { diff --git a/erts/emulator/test/counters_SUITE.erl b/erts/emulator/test/counters_SUITE.erl index 7de164096b..8cc963e3b9 100644 --- a/erts/emulator/test/counters_SUITE.erl +++ b/erts/emulator/test/counters_SUITE.erl @@ -21,12 +21,13 @@ -include_lib("common_test/include/ct.hrl"). --compile(export_all). +-export([suite/0, all/0]). +-export([basic/1, bad/1, limits/1, indep/1]). suite() -> [{ct_hooks,[ts_install_cth]}]. all() -> - [basic, bad, limits]. + [basic, bad, limits, indep]. basic(Config) when is_list(Config) -> Size = 10, @@ -53,15 +54,21 @@ basic_do(Ref, Ix) -> 77 = counters:get(Ref, Ix), ok = counters:sub(Ref, Ix, -10), 87 = counters:get(Ref, Ix), + ok = counters:put(Ref, Ix, 0), + 0 = counters:get(Ref, Ix), + ok = counters:put(Ref, Ix, 123), + 123 = counters:get(Ref, Ix), + ok = counters:put(Ref, Ix, -321), + -321 = counters:get(Ref, Ix), ok. check_memory(atomics, Memory, Size) -> {_,true} = {Memory, Memory > Size*8}, {_,true} = {Memory, Memory < Size*max_atomic_sz() + 100}; check_memory(write_concurrency, Memory, Size) -> - NScheds = erlang:system_info(schedulers), - {_,true} = {Memory, Memory > NScheds*Size*8}, - {_,true} = {Memory, Memory < NScheds*(Size+7)*max_atomic_sz() + 100}. + NWords = erlang:system_info(schedulers) + 1, + {_,true} = {Memory, Memory > NWords*Size*8}, + {_,true} = {Memory, Memory < NWords*(Size+7)*max_atomic_sz() + 100}. max_atomic_sz() -> case erlang:system_info({wordsize, external}) of @@ -90,23 +97,81 @@ bad(Config) when is_list(Config) -> limits(Config) when is_list(Config) -> + limits_do(counters:new(1,[atomics])), + limits_do(counters:new(1,[write_concurrency])), + ok. + +limits_do(Ref) -> Bits = 64, Max = (1 bsl (Bits-1)) - 1, Min = -(1 bsl (Bits-1)), - Ref = counters:new(1,[]), 0 = counters:get(Ref, 1), - ok = counters:add(Ref, 1, Max), + ok = counters:put(Ref, 1, Max), + Max = counters:get(Ref, 1), ok = counters:add(Ref, 1, 1), Min = counters:get(Ref, 1), ok = counters:sub(Ref, 1, 1), Max = counters:get(Ref, 1), + ok = counters:put(Ref, 1, Min), + Min = counters:get(Ref, 1), IncrMax = (Max bsl 1) bor 1, - ok = counters:sub(Ref, 1, counters:get(Ref, 1)), + ok = counters:put(Ref, 1, 0), ok = counters:add(Ref, 1, IncrMax), -1 = counters:get(Ref, 1), {'EXIT',{badarg,_}} = (catch counters:add(Ref, 1, IncrMax+1)), {'EXIT',{badarg,_}} = (catch counters:add(Ref, 1, Min-1)), + {'EXIT',{badarg,_}} = (catch counters:put(Ref, 1, Max+1)), + {'EXIT',{badarg,_}} = (catch counters:add(Ref, 1, Min-1)), + ok. + +%% Verify that independent workers, using different counters +%% within the same array, do not interfere with each other. +indep(Config) when is_list(Config) -> + NScheds = erlang:system_info(schedulers), + Ref = counters:new(NScheds,[write_concurrency]), + Rounds = 100, + Papa = self(), + Pids = [spawn_opt(fun () -> + Val = I*197, + counters:put(Ref, I, Val), + indep_looper(Rounds, Ref, I, Val), + Papa ! {self(), done} + end, + [link, {scheduler, I}]) + || I <- lists:seq(1, NScheds)], + [receive {P,done} -> ok end || P <- Pids], ok. + +indep_looper(0, _, _ , _) -> + ok; +indep_looper(N, Ref, I, Val0) -> + %%io:format("Val0 = ~p\n", [Val0]), + Val0 = counters:get(Ref, I), + Val1 = indep_adder(Ref, I, Val0), + indep_subber(Ref, I, Val1), + Val2 = N*7 + I, + counters:put(Ref, I, Val2), + indep_looper(N-1, Ref, I, Val2). + +indep_adder(Ref, I, Val) when Val < (1 bsl 62) -> + %%io:format("adder Val = ~p\n", [Val]), + Incr = abs(Val div 2) + I + 984735, + counters:add(Ref, I, Incr), + Res = Val + Incr, + Res = counters:get(Ref, I), + indep_adder(Ref, I, Res); +indep_adder(_Ref, _I, Val) -> + Val. + +indep_subber(Ref, I, Val) when Val > -(1 bsl 62) -> + %%io:format("subber Val = ~p\n", [Val]), + Decr = (abs(Val div 2) + I + 725634), + counters:sub(Ref, I, Decr), + Res = Val - Decr, + Res = counters:get(Ref, I), + indep_subber(Ref, I, Res); +indep_subber(_Ref, _I, Val) -> + Val. diff --git a/erts/preloaded/ebin/counters.beam b/erts/preloaded/ebin/counters.beam Binary files differindex caaa6167e1..4e1a3566f7 100644 --- a/erts/preloaded/ebin/counters.beam +++ b/erts/preloaded/ebin/counters.beam diff --git a/erts/preloaded/ebin/erts_internal.beam b/erts/preloaded/ebin/erts_internal.beam Binary files differindex e174f71966..651d5e9d05 100644 --- a/erts/preloaded/ebin/erts_internal.beam +++ b/erts/preloaded/ebin/erts_internal.beam diff --git a/erts/preloaded/src/counters.erl b/erts/preloaded/src/counters.erl index 67354f648d..a0e3035e0f 100644 --- a/erts/preloaded/src/counters.erl +++ b/erts/preloaded/src/counters.erl @@ -26,6 +26,7 @@ get/2, add/3, sub/3, + put/3, info/1]). -export_type([counters_ref/0]). @@ -76,6 +77,19 @@ add(_, _, _) -> sub(Ref, Ix, Decr) -> add(Ref, Ix, -Decr). + +-spec put(Ref, Ix, Value) -> ok when + Ref :: counters_ref(), + Ix :: integer(), + Value :: integer(). +put({atomics, Ref}, Ix, Value) -> + atomics:put(Ref, Ix, Value); +put({write_concurrency, Ref}, Ix, Value) -> + erts_internal:counters_put(Ref, Ix, Value); +put(_, _, _) -> + erlang:error(badarg). + + -spec info(Ref) -> Info when Ref :: counters_ref(), Info :: #{'size':=Size, 'memory':=Memory}, diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl index d491a505c6..8f29a569f2 100644 --- a/erts/preloaded/src/erts_internal.erl +++ b/erts/preloaded/src/erts_internal.erl @@ -95,7 +95,7 @@ -export([atomics_new/2]). -export([counters_new/1, counters_get/2, counters_add/3, - counters_info/1]). + counters_put/3, counters_info/1]). %% %% Await result of send to port @@ -719,6 +719,10 @@ counters_get(_Ref, _Ix) -> counters_add(_Ref, _Ix, _Incr) -> erlang:nif_error(undef). +-spec counters_put(reference(), pos_integer(), integer()) -> ok. +counters_put(_Ref, _Ix, _Value) -> + erlang:nif_error(undef). + -spec counters_info(reference()) -> #{}. counters_info(_Ref) -> erlang:nif_error(undef). |