diff options
author | Sverker Eriksson <[email protected]> | 2018-11-21 15:54:17 +0100 |
---|---|---|
committer | Sverker Eriksson <[email protected]> | 2018-11-21 15:54:17 +0100 |
commit | f504781a136b1992a1cee26a7da841892d796d91 (patch) | |
tree | e0b5268ab1cfdcf229154fb316200fbd59a9782e /erts/emulator | |
parent | fefb5d039e87ff7137e78b3d5f2eaf01e498ec4d (diff) | |
download | otp-f504781a136b1992a1cee26a7da841892d796d91.tar.gz otp-f504781a136b1992a1cee26a7da841892d796d91.tar.bz2 otp-f504781a136b1992a1cee26a7da841892d796d91.zip |
erts: Add counters:put/3
Diffstat (limited to 'erts/emulator')
-rw-r--r-- | erts/emulator/beam/bif.tab | 1 | ||||
-rw-r--r-- | erts/emulator/beam/erl_bif_counters.c | 50 | ||||
-rw-r--r-- | erts/emulator/test/counters_SUITE.erl | 81 |
3 files changed, 117 insertions, 15 deletions
diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index aa3c3acd9f..d4ba90a61a 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -723,4 +723,5 @@ bif atomics:info/1 bif erts_internal:counters_new/1 bif erts_internal:counters_get/2 bif erts_internal:counters_add/3 +bif erts_internal:counters_put/3 bif erts_internal:counters_info/1 diff --git a/erts/emulator/beam/erl_bif_counters.c b/erts/emulator/beam/erl_bif_counters.c index a46b462225..001e643857 100644 --- a/erts/emulator/beam/erl_bif_counters.c +++ b/erts/emulator/beam/erl_bif_counters.c @@ -19,7 +19,7 @@ */ /* - * Purpose: High performance atomics. + * Purpose: The implementation for 'counters' with 'write_concurrency'. */ #ifdef HAVE_CONFIG_H @@ -37,6 +37,15 @@ #include "erl_bif_unique.h" #include "erl_map.h" +/* + * Each logical counter consists of one 64-bit atomic instance per scheduler + * plus one instance for the "base value". + * + * get() reads all atomics for the counter and return the sum. + * add() reads and writes only its own scheduler specific atomic instance. + * put() reads all scheduler specific atomics and writes a new base value. + */ +#define ATOMICS_PER_COUNTER (erts_no_schedulers + 1) #define COUNTERS_PER_CACHE_LINE (ERTS_CACHE_LINE_SIZE / sizeof(erts_atomic64_t)) @@ -52,7 +61,7 @@ typedef struct } u[1]; }CountersRef; -static int counters_destructor(Binary *unused) +static int counters_destructor(Binary *mbin) { return 1; } @@ -76,10 +85,10 @@ BIF_RETTYPE erts_internal_counters_new_1(BIF_ALIST_1) BIF_ERROR(BIF_P, BADARG); } - if (cnt > (ERTS_UWORD_MAX / (sizeof(erts_atomic64_t)*2*erts_no_schedulers))) + if (cnt > (ERTS_UWORD_MAX / (sizeof(erts_atomic64_t)*2*ATOMICS_PER_COUNTER))) BIF_ERROR(BIF_P, SYSTEM_LIMIT); - cache_lines = erts_no_schedulers * div_ceil(cnt, COUNTERS_PER_CACHE_LINE); + cache_lines = ATOMICS_PER_COUNTER * div_ceil(cnt, COUNTERS_PER_CACHE_LINE); bytes = offsetof(CountersRef, u) + cache_lines * ERTS_CACHE_LINE_SIZE; mbin = erts_create_magic_binary_x(bytes, counters_destructor, @@ -87,6 +96,7 @@ BIF_RETTYPE erts_internal_counters_new_1(BIF_ALIST_1) 0); p = ERTS_MAGIC_BIN_DATA(mbin); p->arity = cnt; + #ifdef DEBUG p->ulen = cache_lines; #endif @@ -120,7 +130,7 @@ static ERTS_INLINE int get_ref_cnt(Eterm ref, Eterm index, UWord ix, ui, vi; if (!get_ref(ref, &p) || !term_to_UWord(index, &ix) || --ix >= p->arity) return 0; - ui = (ix / COUNTERS_PER_CACHE_LINE) * erts_no_schedulers + sched_ix; + ui = (ix / COUNTERS_PER_CACHE_LINE) * ATOMICS_PER_COUNTER; vi = ix % COUNTERS_PER_CACHE_LINE; ASSERT(ui < p->ulen); *pp = p; @@ -134,7 +144,8 @@ static ERTS_INLINE int get_ref_my_cnt(Eterm ref, Eterm index, { ErtsSchedulerData *esdp = erts_get_scheduler_data(); ASSERT(esdp && !ERTS_SCHEDULER_IS_DIRTY(esdp)); - return get_ref_cnt(ref, index, pp, app, esdp->no - 1); + ASSERT(esdp->no > 0 && esdp->no < ATOMICS_PER_COUNTER); + return get_ref_cnt(ref, index, pp, app, esdp->no); } static ERTS_INLINE int get_ref_first_cnt(Eterm ref, Eterm index, @@ -172,7 +183,7 @@ BIF_RETTYPE erts_internal_counters_get_2(BIF_ALIST_2) if (!get_ref_first_cnt(BIF_ARG_1, BIF_ARG_2, &p, &ap)) { BIF_ERROR(BIF_P, BADARG); } - for (j = erts_no_schedulers; j ; --j) { + for (j = ATOMICS_PER_COUNTER; j ; --j) { acc += erts_atomic64_read_nob(ap); ap = (erts_atomic64_t*) ((byte*)ap + ERTS_CACHE_LINE_SIZE); } @@ -194,6 +205,31 @@ BIF_RETTYPE erts_internal_counters_add_3(BIF_ALIST_3) return am_ok; } +BIF_RETTYPE erts_internal_counters_put_3(BIF_ALIST_3) +{ + CountersRef* p; + erts_atomic64_t* first_ap; + erts_atomic64_t* ap; + erts_aint64_t acc; + erts_aint64_t val; + int j; + + if (!get_ref_first_cnt(BIF_ARG_1, BIF_ARG_2, &p, &first_ap) + || !term_to_Sint64(BIF_ARG_3, &val)) { + BIF_ERROR(BIF_P, BADARG); + } + + ap = first_ap; + acc = 0; + j = ATOMICS_PER_COUNTER - 1; + do { + ap = (erts_atomic64_t*) ((byte*)ap + ERTS_CACHE_LINE_SIZE); + acc += erts_atomic64_read_nob(ap); + } while (--j); + erts_atomic64_set_nob(first_ap, val-acc); + + return am_ok; +} BIF_RETTYPE erts_internal_counters_info_1(BIF_ALIST_1) { diff --git a/erts/emulator/test/counters_SUITE.erl b/erts/emulator/test/counters_SUITE.erl index 7de164096b..8cc963e3b9 100644 --- a/erts/emulator/test/counters_SUITE.erl +++ b/erts/emulator/test/counters_SUITE.erl @@ -21,12 +21,13 @@ -include_lib("common_test/include/ct.hrl"). --compile(export_all). +-export([suite/0, all/0]). +-export([basic/1, bad/1, limits/1, indep/1]). suite() -> [{ct_hooks,[ts_install_cth]}]. all() -> - [basic, bad, limits]. + [basic, bad, limits, indep]. basic(Config) when is_list(Config) -> Size = 10, @@ -53,15 +54,21 @@ basic_do(Ref, Ix) -> 77 = counters:get(Ref, Ix), ok = counters:sub(Ref, Ix, -10), 87 = counters:get(Ref, Ix), + ok = counters:put(Ref, Ix, 0), + 0 = counters:get(Ref, Ix), + ok = counters:put(Ref, Ix, 123), + 123 = counters:get(Ref, Ix), + ok = counters:put(Ref, Ix, -321), + -321 = counters:get(Ref, Ix), ok. check_memory(atomics, Memory, Size) -> {_,true} = {Memory, Memory > Size*8}, {_,true} = {Memory, Memory < Size*max_atomic_sz() + 100}; check_memory(write_concurrency, Memory, Size) -> - NScheds = erlang:system_info(schedulers), - {_,true} = {Memory, Memory > NScheds*Size*8}, - {_,true} = {Memory, Memory < NScheds*(Size+7)*max_atomic_sz() + 100}. + NWords = erlang:system_info(schedulers) + 1, + {_,true} = {Memory, Memory > NWords*Size*8}, + {_,true} = {Memory, Memory < NWords*(Size+7)*max_atomic_sz() + 100}. max_atomic_sz() -> case erlang:system_info({wordsize, external}) of @@ -90,23 +97,81 @@ bad(Config) when is_list(Config) -> limits(Config) when is_list(Config) -> + limits_do(counters:new(1,[atomics])), + limits_do(counters:new(1,[write_concurrency])), + ok. + +limits_do(Ref) -> Bits = 64, Max = (1 bsl (Bits-1)) - 1, Min = -(1 bsl (Bits-1)), - Ref = counters:new(1,[]), 0 = counters:get(Ref, 1), - ok = counters:add(Ref, 1, Max), + ok = counters:put(Ref, 1, Max), + Max = counters:get(Ref, 1), ok = counters:add(Ref, 1, 1), Min = counters:get(Ref, 1), ok = counters:sub(Ref, 1, 1), Max = counters:get(Ref, 1), + ok = counters:put(Ref, 1, Min), + Min = counters:get(Ref, 1), IncrMax = (Max bsl 1) bor 1, - ok = counters:sub(Ref, 1, counters:get(Ref, 1)), + ok = counters:put(Ref, 1, 0), ok = counters:add(Ref, 1, IncrMax), -1 = counters:get(Ref, 1), {'EXIT',{badarg,_}} = (catch counters:add(Ref, 1, IncrMax+1)), {'EXIT',{badarg,_}} = (catch counters:add(Ref, 1, Min-1)), + {'EXIT',{badarg,_}} = (catch counters:put(Ref, 1, Max+1)), + {'EXIT',{badarg,_}} = (catch counters:add(Ref, 1, Min-1)), + ok. + +%% Verify that independent workers, using different counters +%% within the same array, do not interfere with each other. +indep(Config) when is_list(Config) -> + NScheds = erlang:system_info(schedulers), + Ref = counters:new(NScheds,[write_concurrency]), + Rounds = 100, + Papa = self(), + Pids = [spawn_opt(fun () -> + Val = I*197, + counters:put(Ref, I, Val), + indep_looper(Rounds, Ref, I, Val), + Papa ! {self(), done} + end, + [link, {scheduler, I}]) + || I <- lists:seq(1, NScheds)], + [receive {P,done} -> ok end || P <- Pids], ok. + +indep_looper(0, _, _ , _) -> + ok; +indep_looper(N, Ref, I, Val0) -> + %%io:format("Val0 = ~p\n", [Val0]), + Val0 = counters:get(Ref, I), + Val1 = indep_adder(Ref, I, Val0), + indep_subber(Ref, I, Val1), + Val2 = N*7 + I, + counters:put(Ref, I, Val2), + indep_looper(N-1, Ref, I, Val2). + +indep_adder(Ref, I, Val) when Val < (1 bsl 62) -> + %%io:format("adder Val = ~p\n", [Val]), + Incr = abs(Val div 2) + I + 984735, + counters:add(Ref, I, Incr), + Res = Val + Incr, + Res = counters:get(Ref, I), + indep_adder(Ref, I, Res); +indep_adder(_Ref, _I, Val) -> + Val. + +indep_subber(Ref, I, Val) when Val > -(1 bsl 62) -> + %%io:format("subber Val = ~p\n", [Val]), + Decr = (abs(Val div 2) + I + 725634), + counters:sub(Ref, I, Decr), + Res = Val - Decr, + Res = counters:get(Ref, I), + indep_subber(Ref, I, Res); +indep_subber(_Ref, _I, Val) -> + Val. |