From f504781a136b1992a1cee26a7da841892d796d91 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 21 Nov 2018 15:54:17 +0100 Subject: erts: Add counters:put/3 --- erts/emulator/beam/erl_bif_counters.c | 50 ++++++++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 7 deletions(-) (limited to 'erts/emulator/beam/erl_bif_counters.c') diff --git a/erts/emulator/beam/erl_bif_counters.c b/erts/emulator/beam/erl_bif_counters.c index a46b462225..001e643857 100644 --- a/erts/emulator/beam/erl_bif_counters.c +++ b/erts/emulator/beam/erl_bif_counters.c @@ -19,7 +19,7 @@ */ /* - * Purpose: High performance atomics. + * Purpose: The implementation for 'counters' with 'write_concurrency'. */ #ifdef HAVE_CONFIG_H @@ -37,6 +37,15 @@ #include "erl_bif_unique.h" #include "erl_map.h" +/* + * Each logical counter consists of one 64-bit atomic instance per scheduler + * plus one instance for the "base value". + * + * get() reads all atomics for the counter and return the sum. + * add() reads and writes only its own scheduler specific atomic instance. + * put() reads all scheduler specific atomics and writes a new base value. + */ +#define ATOMICS_PER_COUNTER (erts_no_schedulers + 1) #define COUNTERS_PER_CACHE_LINE (ERTS_CACHE_LINE_SIZE / sizeof(erts_atomic64_t)) @@ -52,7 +61,7 @@ typedef struct } u[1]; }CountersRef; -static int counters_destructor(Binary *unused) +static int counters_destructor(Binary *mbin) { return 1; } @@ -76,10 +85,10 @@ BIF_RETTYPE erts_internal_counters_new_1(BIF_ALIST_1) BIF_ERROR(BIF_P, BADARG); } - if (cnt > (ERTS_UWORD_MAX / (sizeof(erts_atomic64_t)*2*erts_no_schedulers))) + if (cnt > (ERTS_UWORD_MAX / (sizeof(erts_atomic64_t)*2*ATOMICS_PER_COUNTER))) BIF_ERROR(BIF_P, SYSTEM_LIMIT); - cache_lines = erts_no_schedulers * div_ceil(cnt, COUNTERS_PER_CACHE_LINE); + cache_lines = ATOMICS_PER_COUNTER * div_ceil(cnt, COUNTERS_PER_CACHE_LINE); bytes = offsetof(CountersRef, u) + cache_lines * ERTS_CACHE_LINE_SIZE; mbin = erts_create_magic_binary_x(bytes, counters_destructor, @@ -87,6 +96,7 @@ BIF_RETTYPE erts_internal_counters_new_1(BIF_ALIST_1) 0); p = ERTS_MAGIC_BIN_DATA(mbin); p->arity = cnt; + #ifdef DEBUG p->ulen = cache_lines; #endif @@ -120,7 +130,7 @@ static ERTS_INLINE int get_ref_cnt(Eterm ref, Eterm index, UWord ix, ui, vi; if (!get_ref(ref, &p) || !term_to_UWord(index, &ix) || --ix >= p->arity) return 0; - ui = (ix / COUNTERS_PER_CACHE_LINE) * erts_no_schedulers + sched_ix; + ui = (ix / COUNTERS_PER_CACHE_LINE) * ATOMICS_PER_COUNTER; vi = ix % COUNTERS_PER_CACHE_LINE; ASSERT(ui < p->ulen); *pp = p; @@ -134,7 +144,8 @@ static ERTS_INLINE int get_ref_my_cnt(Eterm ref, Eterm index, { ErtsSchedulerData *esdp = erts_get_scheduler_data(); ASSERT(esdp && !ERTS_SCHEDULER_IS_DIRTY(esdp)); - return get_ref_cnt(ref, index, pp, app, esdp->no - 1); + ASSERT(esdp->no > 0 && esdp->no < ATOMICS_PER_COUNTER); + return get_ref_cnt(ref, index, pp, app, esdp->no); } static ERTS_INLINE int get_ref_first_cnt(Eterm ref, Eterm index, @@ -172,7 +183,7 @@ BIF_RETTYPE erts_internal_counters_get_2(BIF_ALIST_2) if (!get_ref_first_cnt(BIF_ARG_1, BIF_ARG_2, &p, &ap)) { BIF_ERROR(BIF_P, BADARG); } - for (j = erts_no_schedulers; j ; --j) { + for (j = ATOMICS_PER_COUNTER; j ; --j) { acc += erts_atomic64_read_nob(ap); ap = (erts_atomic64_t*) ((byte*)ap + ERTS_CACHE_LINE_SIZE); } @@ -194,6 +205,31 @@ BIF_RETTYPE erts_internal_counters_add_3(BIF_ALIST_3) return am_ok; } +BIF_RETTYPE erts_internal_counters_put_3(BIF_ALIST_3) +{ + CountersRef* p; + erts_atomic64_t* first_ap; + erts_atomic64_t* ap; + erts_aint64_t acc; + erts_aint64_t val; + int j; + + if (!get_ref_first_cnt(BIF_ARG_1, BIF_ARG_2, &p, &first_ap) + || !term_to_Sint64(BIF_ARG_3, &val)) { + BIF_ERROR(BIF_P, BADARG); + } + + ap = first_ap; + acc = 0; + j = ATOMICS_PER_COUNTER - 1; + do { + ap = (erts_atomic64_t*) ((byte*)ap + ERTS_CACHE_LINE_SIZE); + acc += erts_atomic64_read_nob(ap); + } while (--j); + erts_atomic64_set_nob(first_ap, val-acc); + + return am_ok; +} BIF_RETTYPE erts_internal_counters_info_1(BIF_ALIST_1) { -- cgit v1.2.3