aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSverker Eriksson <[email protected]>2018-11-26 13:45:26 +0100
committerSverker Eriksson <[email protected]>2018-11-26 13:45:26 +0100
commit80ee7211fa2d1ad58e53c438b4615388796fb601 (patch)
treec5cf06dc1f86c2ca1cf3d93a8bf470411b13dd90
parent9db0c37b94cb7eb2b5a50b5e843ec6e7d0b248b2 (diff)
parent1c4242fab0ce77e6081665f61d74413ca11038a8 (diff)
downloadotp-80ee7211fa2d1ad58e53c438b4615388796fb601.tar.gz
otp-80ee7211fa2d1ad58e53c438b4615388796fb601.tar.bz2
otp-80ee7211fa2d1ad58e53c438b4615388796fb601.zip
Merge branch 'maint'
-rw-r--r--erts/emulator/beam/erl_bif_counters.c14
-rw-r--r--erts/emulator/test/counters_SUITE.erl61
2 files changed, 66 insertions, 9 deletions
diff --git a/erts/emulator/beam/erl_bif_counters.c b/erts/emulator/beam/erl_bif_counters.c
index 001e643857..7c8884ba32 100644
--- a/erts/emulator/beam/erl_bif_counters.c
+++ b/erts/emulator/beam/erl_bif_counters.c
@@ -41,13 +41,13 @@
* Each logical counter consists of one 64-bit atomic instance per scheduler
* plus one instance for the "base value".
*
- * get() reads all atomics for the counter and return the sum.
+ * get() reads all atomics for the counter and returns the sum.
* add() reads and writes only its own scheduler specific atomic instance.
* put() reads all scheduler specific atomics and writes a new base value.
*/
#define ATOMICS_PER_COUNTER (erts_no_schedulers + 1)
-#define COUNTERS_PER_CACHE_LINE (ERTS_CACHE_LINE_SIZE / sizeof(erts_atomic64_t))
+#define ATOMICS_PER_CACHE_LINE (ERTS_CACHE_LINE_SIZE / sizeof(erts_atomic64_t))
typedef struct
{
@@ -56,7 +56,7 @@ typedef struct
UWord ulen;
#endif
union {
- erts_atomic64_t v[COUNTERS_PER_CACHE_LINE];
+ erts_atomic64_t v[ATOMICS_PER_CACHE_LINE];
byte cache_line__[ERTS_CACHE_LINE_SIZE];
} u[1];
}CountersRef;
@@ -88,7 +88,7 @@ BIF_RETTYPE erts_internal_counters_new_1(BIF_ALIST_1)
if (cnt > (ERTS_UWORD_MAX / (sizeof(erts_atomic64_t)*2*ATOMICS_PER_COUNTER)))
BIF_ERROR(BIF_P, SYSTEM_LIMIT);
- cache_lines = ATOMICS_PER_COUNTER * div_ceil(cnt, COUNTERS_PER_CACHE_LINE);
+ cache_lines = ATOMICS_PER_COUNTER * div_ceil(cnt, ATOMICS_PER_CACHE_LINE);
bytes = offsetof(CountersRef, u) + cache_lines * ERTS_CACHE_LINE_SIZE;
mbin = erts_create_magic_binary_x(bytes,
counters_destructor,
@@ -102,7 +102,7 @@ BIF_RETTYPE erts_internal_counters_new_1(BIF_ALIST_1)
#endif
ASSERT((byte*)&p->u[cache_lines] <= ((byte*)p + bytes));
for (ui=0; ui < cache_lines; ui++)
- for (vi=0; vi < COUNTERS_PER_CACHE_LINE; vi++)
+ for (vi=0; vi < ATOMICS_PER_CACHE_LINE; vi++)
erts_atomic64_init_nob(&p->u[ui].v[vi], 0);
hp = HAlloc(BIF_P, ERTS_MAGIC_REF_THING_SIZE);
return erts_mk_magic_ref(&hp, &MSO(BIF_P), mbin);
@@ -130,8 +130,8 @@ static ERTS_INLINE int get_ref_cnt(Eterm ref, Eterm index,
UWord ix, ui, vi;
if (!get_ref(ref, &p) || !term_to_UWord(index, &ix) || --ix >= p->arity)
return 0;
- ui = (ix / COUNTERS_PER_CACHE_LINE) * ATOMICS_PER_COUNTER;
- vi = ix % COUNTERS_PER_CACHE_LINE;
+ ui = (ix / ATOMICS_PER_CACHE_LINE) * ATOMICS_PER_COUNTER + sched_ix;
+ vi = ix % ATOMICS_PER_CACHE_LINE;
ASSERT(ui < p->ulen);
*pp = p;
*app = &p->u[ui].v[vi];
diff --git a/erts/emulator/test/counters_SUITE.erl b/erts/emulator/test/counters_SUITE.erl
index 8cc963e3b9..b3f0358c1e 100644
--- a/erts/emulator/test/counters_SUITE.erl
+++ b/erts/emulator/test/counters_SUITE.erl
@@ -22,12 +22,12 @@
-include_lib("common_test/include/ct.hrl").
-export([suite/0, all/0]).
--export([basic/1, bad/1, limits/1, indep/1]).
+-export([basic/1, bad/1, limits/1, indep/1, write_concurrency/1]).
suite() -> [{ct_hooks,[ts_install_cth]}].
all() ->
- [basic, bad, limits, indep].
+ [basic, bad, limits, indep, write_concurrency].
basic(Config) when is_list(Config) ->
Size = 10,
@@ -175,3 +175,60 @@ indep_subber(Ref, I, Val) when Val > -(1 bsl 62) ->
indep_subber(Ref, I, Res);
indep_subber(_Ref, _I, Val) ->
Val.
+
+
+
+%% Verify write_concurrency yields correct results.
+write_concurrency(Config) when is_list(Config) ->
+ rand:seed(exs1024s),
+ io:format("*** SEED: ~p ***\n", [rand:export_seed()]),
+ NScheds = erlang:system_info(schedulers),
+ Size = 100,
+ Ref = counters:new(Size,[write_concurrency]),
+ Rounds = 1000,
+ Papa = self(),
+ Pids = [spawn_opt(fun Worker() ->
+ receive
+ {go, Ix, Incr} ->
+ wc_looper(Rounds, Ref, Ix, Incr),
+ Papa ! {self(), done, Rounds*Incr},
+ Worker();
+ stop ->
+ ok
+ end
+ end,
+ [link, {scheduler, N}])
+ || N <- lists:seq(1, NScheds)],
+ [begin
+ Base = rand_log64(),
+ counters:put(Ref, Index, Base),
+ SendList = [{P,{go, Index, rand_log64()}} || P <- Pids],
+ [P ! Msg || {P,Msg} <- SendList],
+ Added = lists:sum([receive {P,done,Contrib} -> Contrib end || P <- Pids]),
+ Result = mask_sint64(Base+Added),
+ {_,Result} = {Result, counters:get(Ref, Index)}
+ end
+ || Index <- lists:seq(1, Size)],
+
+ [begin unlink(P), P ! stop end || P <- Pids],
+ ok.
+
+wc_looper(0, _, _, _) ->
+ ok;
+wc_looper(N, Ref, Ix, Incr) ->
+ counters:add(Ref, Ix, Incr),
+ wc_looper(N-1, Ref, Ix, Incr).
+
+mask_sint64(X) ->
+ SMask = 1 bsl 63,
+ UMask = SMask - 1,
+ (X band UMask) - (X band SMask).
+
+%% A random signed 64-bit integer
+%% with a uniformly distributed number of significant bits.
+rand_log64() ->
+ Uint = round(math:pow(2, rand:uniform()*63)),
+ case rand:uniform(2) of
+ 1 -> -Uint;
+ 2 -> Uint
+ end.