aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator')
-rw-r--r--erts/emulator/beam/beam_emu.c1
-rw-r--r--erts/emulator/beam/bif.tab1
-rw-r--r--erts/emulator/beam/erl_alloc_util.c4
-rw-r--r--erts/emulator/beam/erl_bif_counters.c58
-rw-r--r--erts/emulator/beam/erl_gc.c12
-rw-r--r--erts/emulator/beam/erl_node_tables.c21
-rw-r--r--erts/emulator/drivers/common/inet_drv.c53
-rw-r--r--erts/emulator/nifs/common/prim_file_nif.c4
-rw-r--r--erts/emulator/test/counters_SUITE.erl138
-rw-r--r--erts/emulator/test/driver_SUITE.erl17
-rw-r--r--erts/emulator/test/driver_SUITE_data/chkio_drv.c49
11 files changed, 302 insertions, 56 deletions
diff --git a/erts/emulator/beam/beam_emu.c b/erts/emulator/beam/beam_emu.c
index ab5920a67e..e909a0b4da 100644
--- a/erts/emulator/beam/beam_emu.c
+++ b/erts/emulator/beam/beam_emu.c
@@ -579,6 +579,7 @@ init_emulator(void)
* the instructions' C labels to the loader.
* The second call starts execution of BEAM code. This call never returns.
*/
+ERTS_NO_RETPOLINE
void process_main(Eterm * x_reg_array, FloatDef* f_reg_array)
{
static int init_done = 0;
diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab
index aa3c3acd9f..d4ba90a61a 100644
--- a/erts/emulator/beam/bif.tab
+++ b/erts/emulator/beam/bif.tab
@@ -723,4 +723,5 @@ bif atomics:info/1
bif erts_internal:counters_new/1
bif erts_internal:counters_get/2
bif erts_internal:counters_add/3
+bif erts_internal:counters_put/3
bif erts_internal:counters_info/1
diff --git a/erts/emulator/beam/erl_alloc_util.c b/erts/emulator/beam/erl_alloc_util.c
index a5740a08cf..0be4562785 100644
--- a/erts/emulator/beam/erl_alloc_util.c
+++ b/erts/emulator/beam/erl_alloc_util.c
@@ -834,6 +834,8 @@ static ERTS_INLINE void clr_bit(UWord* map, Uint ix)
&= ~((UWord)1 << (ix % ERTS_VSPACE_WORD_BITS));
}
+#ifdef DEBUG
+
static ERTS_INLINE int is_bit_set(UWord* map, Uint ix)
{
ASSERT(ix / ERTS_VSPACE_WORD_BITS < VSPACE_MAP_SZ);
@@ -841,6 +843,8 @@ static ERTS_INLINE int is_bit_set(UWord* map, Uint ix)
& ((UWord)1 << (ix % ERTS_VSPACE_WORD_BITS));
}
+#endif
+
UWord erts_literal_vspace_map[VSPACE_MAP_SZ];
static void set_literal_range(void* start, Uint size)
diff --git a/erts/emulator/beam/erl_bif_counters.c b/erts/emulator/beam/erl_bif_counters.c
index a46b462225..7c8884ba32 100644
--- a/erts/emulator/beam/erl_bif_counters.c
+++ b/erts/emulator/beam/erl_bif_counters.c
@@ -19,7 +19,7 @@
*/
/*
- * Purpose: High performance atomics.
+ * Purpose: The implementation for 'counters' with 'write_concurrency'.
*/
#ifdef HAVE_CONFIG_H
@@ -37,8 +37,17 @@
#include "erl_bif_unique.h"
#include "erl_map.h"
+/*
+ * Each logical counter consists of one 64-bit atomic instance per scheduler
+ * plus one instance for the "base value".
+ *
+ * get() reads all atomics for the counter and returns the sum.
+ * add() reads and writes only its own scheduler specific atomic instance.
+ * put() reads all scheduler specific atomics and writes a new base value.
+ */
+#define ATOMICS_PER_COUNTER (erts_no_schedulers + 1)
-#define COUNTERS_PER_CACHE_LINE (ERTS_CACHE_LINE_SIZE / sizeof(erts_atomic64_t))
+#define ATOMICS_PER_CACHE_LINE (ERTS_CACHE_LINE_SIZE / sizeof(erts_atomic64_t))
typedef struct
{
@@ -47,12 +56,12 @@ typedef struct
UWord ulen;
#endif
union {
- erts_atomic64_t v[COUNTERS_PER_CACHE_LINE];
+ erts_atomic64_t v[ATOMICS_PER_CACHE_LINE];
byte cache_line__[ERTS_CACHE_LINE_SIZE];
} u[1];
}CountersRef;
-static int counters_destructor(Binary *unused)
+static int counters_destructor(Binary *mbin)
{
return 1;
}
@@ -76,10 +85,10 @@ BIF_RETTYPE erts_internal_counters_new_1(BIF_ALIST_1)
BIF_ERROR(BIF_P, BADARG);
}
- if (cnt > (ERTS_UWORD_MAX / (sizeof(erts_atomic64_t)*2*erts_no_schedulers)))
+ if (cnt > (ERTS_UWORD_MAX / (sizeof(erts_atomic64_t)*2*ATOMICS_PER_COUNTER)))
BIF_ERROR(BIF_P, SYSTEM_LIMIT);
- cache_lines = erts_no_schedulers * div_ceil(cnt, COUNTERS_PER_CACHE_LINE);
+ cache_lines = ATOMICS_PER_COUNTER * div_ceil(cnt, ATOMICS_PER_CACHE_LINE);
bytes = offsetof(CountersRef, u) + cache_lines * ERTS_CACHE_LINE_SIZE;
mbin = erts_create_magic_binary_x(bytes,
counters_destructor,
@@ -87,12 +96,13 @@ BIF_RETTYPE erts_internal_counters_new_1(BIF_ALIST_1)
0);
p = ERTS_MAGIC_BIN_DATA(mbin);
p->arity = cnt;
+
#ifdef DEBUG
p->ulen = cache_lines;
#endif
ASSERT((byte*)&p->u[cache_lines] <= ((byte*)p + bytes));
for (ui=0; ui < cache_lines; ui++)
- for (vi=0; vi < COUNTERS_PER_CACHE_LINE; vi++)
+ for (vi=0; vi < ATOMICS_PER_CACHE_LINE; vi++)
erts_atomic64_init_nob(&p->u[ui].v[vi], 0);
hp = HAlloc(BIF_P, ERTS_MAGIC_REF_THING_SIZE);
return erts_mk_magic_ref(&hp, &MSO(BIF_P), mbin);
@@ -120,8 +130,8 @@ static ERTS_INLINE int get_ref_cnt(Eterm ref, Eterm index,
UWord ix, ui, vi;
if (!get_ref(ref, &p) || !term_to_UWord(index, &ix) || --ix >= p->arity)
return 0;
- ui = (ix / COUNTERS_PER_CACHE_LINE) * erts_no_schedulers + sched_ix;
- vi = ix % COUNTERS_PER_CACHE_LINE;
+ ui = (ix / ATOMICS_PER_CACHE_LINE) * ATOMICS_PER_COUNTER + sched_ix;
+ vi = ix % ATOMICS_PER_CACHE_LINE;
ASSERT(ui < p->ulen);
*pp = p;
*app = &p->u[ui].v[vi];
@@ -134,7 +144,8 @@ static ERTS_INLINE int get_ref_my_cnt(Eterm ref, Eterm index,
{
ErtsSchedulerData *esdp = erts_get_scheduler_data();
ASSERT(esdp && !ERTS_SCHEDULER_IS_DIRTY(esdp));
- return get_ref_cnt(ref, index, pp, app, esdp->no - 1);
+ ASSERT(esdp->no > 0 && esdp->no < ATOMICS_PER_COUNTER);
+ return get_ref_cnt(ref, index, pp, app, esdp->no);
}
static ERTS_INLINE int get_ref_first_cnt(Eterm ref, Eterm index,
@@ -172,7 +183,7 @@ BIF_RETTYPE erts_internal_counters_get_2(BIF_ALIST_2)
if (!get_ref_first_cnt(BIF_ARG_1, BIF_ARG_2, &p, &ap)) {
BIF_ERROR(BIF_P, BADARG);
}
- for (j = erts_no_schedulers; j ; --j) {
+ for (j = ATOMICS_PER_COUNTER; j ; --j) {
acc += erts_atomic64_read_nob(ap);
ap = (erts_atomic64_t*) ((byte*)ap + ERTS_CACHE_LINE_SIZE);
}
@@ -194,6 +205,31 @@ BIF_RETTYPE erts_internal_counters_add_3(BIF_ALIST_3)
return am_ok;
}
+BIF_RETTYPE erts_internal_counters_put_3(BIF_ALIST_3)
+{
+ CountersRef* p;
+ erts_atomic64_t* first_ap;
+ erts_atomic64_t* ap;
+ erts_aint64_t acc;
+ erts_aint64_t val;
+ int j;
+
+ if (!get_ref_first_cnt(BIF_ARG_1, BIF_ARG_2, &p, &first_ap)
+ || !term_to_Sint64(BIF_ARG_3, &val)) {
+ BIF_ERROR(BIF_P, BADARG);
+ }
+
+ ap = first_ap;
+ acc = 0;
+ j = ATOMICS_PER_COUNTER - 1;
+ do {
+ ap = (erts_atomic64_t*) ((byte*)ap + ERTS_CACHE_LINE_SIZE);
+ acc += erts_atomic64_read_nob(ap);
+ } while (--j);
+ erts_atomic64_set_nob(first_ap, val-acc);
+
+ return am_ok;
+}
BIF_RETTYPE erts_internal_counters_info_1(BIF_ALIST_1)
{
diff --git a/erts/emulator/beam/erl_gc.c b/erts/emulator/beam/erl_gc.c
index 47dd115c82..b4df418cd5 100644
--- a/erts/emulator/beam/erl_gc.c
+++ b/erts/emulator/beam/erl_gc.c
@@ -681,7 +681,7 @@ garbage_collect(Process* p, ErlHeapFragment *live_hf_end,
ErtsMonotonicTime start_time;
ErtsSchedulerData *esdp = erts_proc_sched_data(p);
erts_aint32_t state;
- ERTS_MSACC_PUSH_STATE_M();
+ ERTS_MSACC_PUSH_STATE();
#ifdef USE_VM_PROBES
DTRACE_CHARBUF(pidbuf, DTRACE_TERM_BUF_SIZE);
#endif
@@ -711,7 +711,7 @@ garbage_collect(Process* p, ErlHeapFragment *live_hf_end,
else if (p->live_hf_end != ERTS_INVALID_HFRAG_PTR)
live_hf_end = p->live_hf_end;
- ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_GC);
+ ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_GC);
erts_atomic32_read_bor_nob(&p->state, ERTS_PSFLG_GC);
if (erts_system_monitor_long_gc != 0)
@@ -759,7 +759,7 @@ garbage_collect(Process* p, ErlHeapFragment *live_hf_end,
gc_trace_end_tag = am_gc_minor_end;
} else {
do_major_collection:
- ERTS_MSACC_SET_STATE_CACHED_M_X(ERTS_MSACC_STATE_GC_FULL);
+ ERTS_MSACC_SET_STATE_CACHED_X(ERTS_MSACC_STATE_GC_FULL);
if (IS_TRACED_FL(p, F_TRACE_GC)) {
trace_gc(p, am_gc_major_start, need, THE_NON_VALUE);
}
@@ -770,7 +770,7 @@ do_major_collection:
p->flags &= ~(F_DIRTY_MAJOR_GC|F_DIRTY_MINOR_GC);
DTRACE2(gc_major_end, pidbuf, reclaimed_now);
gc_trace_end_tag = am_gc_major_end;
- ERTS_MSACC_SET_STATE_CACHED_M_X(ERTS_MSACC_STATE_GC);
+ ERTS_MSACC_SET_STATE_CACHED_X(ERTS_MSACC_STATE_GC);
}
reset_active_writer(p);
@@ -800,7 +800,7 @@ do_major_collection:
/* We have to make sure that we have space for need on the heap */
res = delay_garbage_collection(p, live_hf_end, need, fcalls);
- ERTS_MSACC_POP_STATE_M();
+ ERTS_MSACC_POP_STATE();
return res;
}
@@ -843,7 +843,7 @@ do_major_collection:
FLAGS(p) &= ~(F_FORCE_GC|F_HIBERNATED);
p->live_hf_end = ERTS_INVALID_HFRAG_PTR;
- ERTS_MSACC_POP_STATE_M();
+ ERTS_MSACC_POP_STATE();
#ifdef CHECK_FOR_HOLES
/*
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index f4dc60941a..18ed782ae3 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -421,8 +421,25 @@ static void schedule_delete_dist_entry(DistEntry* dep)
*
* Note that timeouts do not guarantee thread progress.
*/
- erts_schedule_thr_prgr_later_op(start_timer_delete_dist_entry,
- dep, &dep->later_op);
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
+ if (esdp && !ERTS_SCHEDULER_IS_DIRTY(esdp)) {
+ erts_schedule_thr_prgr_later_op(start_timer_delete_dist_entry,
+ dep, &dep->later_op);
+ } else {
+ /*
+ * Since OTP 20, it's possible that destructor is executed on
+ * a dirty scheduler. Aux work cannot be done on a dirty
+ * scheduler, and scheduling any aux work on a dirty scheduler
+ * makes the scheduler to loop infinitely.
+ * To avoid this, make a spot jump: schedule this function again
+ * on a first normal scheduler. It is guaranteed to be always
+ * online. Since it's a rare event, this shall not pose a big
+ * utilisation hit.
+ */
+ erts_schedule_misc_aux_work(1,
+ (void (*)(void *))schedule_delete_dist_entry,
+ (void *) dep);
+ }
}
static void
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c
index 3195ca3874..4c1ab90c01 100644
--- a/erts/emulator/drivers/common/inet_drv.c
+++ b/erts/emulator/drivers/common/inet_drv.c
@@ -1320,7 +1320,10 @@ static int tcp_deliver(tcp_descriptor* desc, int len);
static int tcp_shutdown_error(tcp_descriptor* desc, int err);
+#ifdef HAVE_SENDFILE
static int tcp_inet_sendfile(tcp_descriptor* desc);
+static int tcp_sendfile_aborted(tcp_descriptor* desc, int socket_error);
+#endif
static int tcp_inet_output(tcp_descriptor* desc, HANDLE event);
static int tcp_inet_input(tcp_descriptor* desc, HANDLE event);
@@ -10300,12 +10303,11 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd,
desc->tcp_add_flags |= TCP_ADDF_SENDFILE;
/* See if we can finish sending without selecting & rescheduling. */
- tcp_inet_sendfile(desc);
-
- if(desc->sendfile.length > 0) {
- sock_select(INETP(desc), FD_WRITE, 1);
+ if (tcp_inet_sendfile(desc) == 0) {
+ if(desc->sendfile.length > 0) {
+ sock_select(INETP(desc), FD_WRITE, 1);
+ }
}
-
return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize);
#else
return ctl_error(ENOTSUP, rbuf, rsize);
@@ -10518,6 +10520,7 @@ static int tcp_recv_closed(tcp_descriptor* desc)
#ifdef DEBUG
long port = (long) desc->inet.port; /* Used after driver_exit() */
#endif
+ int blocking_send = 0;
DEBUGF(("tcp_recv_closed(%ld): s=%d, in %s, line %d\r\n",
port, desc->inet.s, __FILE__, __LINE__));
if (IS_BUSY(INETP(desc))) {
@@ -10533,7 +10536,15 @@ static int tcp_recv_closed(tcp_descriptor* desc)
set_busy_port(desc->inet.port, 0);
inet_reply_error_am(INETP(desc), am_closed);
DEBUGF(("tcp_recv_closed(%ld): busy reply 'closed'\r\n", port));
- } else {
+ blocking_send = 1;
+ }
+#ifdef HAVE_SENDFILE
+ if (desc->tcp_add_flags & TCP_ADDF_SENDFILE) {
+ tcp_sendfile_aborted(desc, ENOTCONN);
+ blocking_send = 1;
+ }
+#endif
+ if (!blocking_send) {
/* No blocking send op to reply to right now.
* If next op is a send, make sure it returns {error,closed}
* rather than {error,enotconn}.
@@ -10584,6 +10595,11 @@ static int tcp_recv_error(tcp_descriptor* desc, int err)
set_busy_port(desc->inet.port, 0);
inet_reply_error_am(INETP(desc), am_closed);
}
+#ifdef HAVE_SENDFILE
+ if (desc->tcp_add_flags & TCP_ADDF_SENDFILE) {
+ tcp_sendfile_aborted(desc, err);
+ }
+#endif
if (!desc->inet.active) {
/* We must cancel any timer here ! */
driver_cancel_timer(desc->inet.port);
@@ -11247,27 +11263,31 @@ static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err)
DEBUGF(("driver_failure_eof(%ld) in %s, line %d\r\n",
(long)desc->inet.port, __FILE__, __LINE__));
if (desc->inet.active) {
+ ErlDrvTermData err_atom;
if (show_econnreset) {
tcp_error_message(desc, err);
- tcp_closed_message(desc);
- inet_reply_error(INETP(desc), err);
+ err_atom = error_atom(err);
} else {
- tcp_closed_message(desc);
- inet_reply_error_am(INETP(desc), am_closed);
+ err_atom = am_closed;
}
+ tcp_closed_message(desc);
+ if (!(desc->tcp_add_flags & TCP_ADDF_SENDFILE))
+ inet_reply_error_am(INETP(desc), err_atom);
+
if (desc->inet.exitf)
driver_exit(desc->inet.port, 0);
else
tcp_desc_close(desc);
} else {
tcp_close_check(desc);
- tcp_desc_close(desc);
if (desc->inet.caller) {
- if (show_econnreset)
- inet_reply_error(INETP(desc), err);
- else
- inet_reply_error_am(INETP(desc), am_closed);
+ if (!(desc->tcp_add_flags & TCP_ADDF_SENDFILE)) {
+ if (show_econnreset)
+ inet_reply_error(INETP(desc), err);
+ else
+ inet_reply_error_am(INETP(desc), am_closed);
+ }
}
else {
/* No blocking send op to reply to right now.
@@ -11276,6 +11296,7 @@ static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err)
*/
desc->tcp_add_flags |= TCP_ADDF_DELAYED_CLOSE_SEND;
}
+ tcp_desc_close(desc);
/*
* Make sure that the next receive operation gets an {error,closed}
@@ -11788,8 +11809,8 @@ socket_error: {
DEBUGF(("tcp_inet_sendfile(%ld): send errno = %d (errno %d)\r\n",
(long)desc->inet.port, socket_errno, errno));
- result = tcp_send_error(desc, socket_errno);
tcp_sendfile_aborted(desc, socket_errno);
+ result = tcp_send_error(desc, socket_errno);
goto done;
}
diff --git a/erts/emulator/nifs/common/prim_file_nif.c b/erts/emulator/nifs/common/prim_file_nif.c
index ba36a33458..b34bf11205 100644
--- a/erts/emulator/nifs/common/prim_file_nif.c
+++ b/erts/emulator/nifs/common/prim_file_nif.c
@@ -933,7 +933,7 @@ static ERL_NIF_TERM set_permissions_nif(ErlNifEnv *env, int argc, const ERL_NIF_
posix_errno_t posix_errno;
efile_path_t path;
- Uint32 permissions;
+ Uint permissions;
if(argc != 2 || !enif_get_uint(env, argv[1], &permissions)) {
return enif_make_badarg(env);
@@ -952,7 +952,7 @@ static ERL_NIF_TERM set_owner_nif(ErlNifEnv *env, int argc, const ERL_NIF_TERM a
posix_errno_t posix_errno;
efile_path_t path;
- Sint32 uid, gid;
+ Sint uid, gid;
if(argc != 3 || !enif_get_int(env, argv[1], &uid)
|| !enif_get_int(env, argv[2], &gid)) {
diff --git a/erts/emulator/test/counters_SUITE.erl b/erts/emulator/test/counters_SUITE.erl
index 7de164096b..b3f0358c1e 100644
--- a/erts/emulator/test/counters_SUITE.erl
+++ b/erts/emulator/test/counters_SUITE.erl
@@ -21,12 +21,13 @@
-include_lib("common_test/include/ct.hrl").
--compile(export_all).
+-export([suite/0, all/0]).
+-export([basic/1, bad/1, limits/1, indep/1, write_concurrency/1]).
suite() -> [{ct_hooks,[ts_install_cth]}].
all() ->
- [basic, bad, limits].
+ [basic, bad, limits, indep, write_concurrency].
basic(Config) when is_list(Config) ->
Size = 10,
@@ -53,15 +54,21 @@ basic_do(Ref, Ix) ->
77 = counters:get(Ref, Ix),
ok = counters:sub(Ref, Ix, -10),
87 = counters:get(Ref, Ix),
+ ok = counters:put(Ref, Ix, 0),
+ 0 = counters:get(Ref, Ix),
+ ok = counters:put(Ref, Ix, 123),
+ 123 = counters:get(Ref, Ix),
+ ok = counters:put(Ref, Ix, -321),
+ -321 = counters:get(Ref, Ix),
ok.
check_memory(atomics, Memory, Size) ->
{_,true} = {Memory, Memory > Size*8},
{_,true} = {Memory, Memory < Size*max_atomic_sz() + 100};
check_memory(write_concurrency, Memory, Size) ->
- NScheds = erlang:system_info(schedulers),
- {_,true} = {Memory, Memory > NScheds*Size*8},
- {_,true} = {Memory, Memory < NScheds*(Size+7)*max_atomic_sz() + 100}.
+ NWords = erlang:system_info(schedulers) + 1,
+ {_,true} = {Memory, Memory > NWords*Size*8},
+ {_,true} = {Memory, Memory < NWords*(Size+7)*max_atomic_sz() + 100}.
max_atomic_sz() ->
case erlang:system_info({wordsize, external}) of
@@ -90,23 +97,138 @@ bad(Config) when is_list(Config) ->
limits(Config) when is_list(Config) ->
+ limits_do(counters:new(1,[atomics])),
+ limits_do(counters:new(1,[write_concurrency])),
+ ok.
+
+limits_do(Ref) ->
Bits = 64,
Max = (1 bsl (Bits-1)) - 1,
Min = -(1 bsl (Bits-1)),
- Ref = counters:new(1,[]),
0 = counters:get(Ref, 1),
- ok = counters:add(Ref, 1, Max),
+ ok = counters:put(Ref, 1, Max),
+ Max = counters:get(Ref, 1),
ok = counters:add(Ref, 1, 1),
Min = counters:get(Ref, 1),
ok = counters:sub(Ref, 1, 1),
Max = counters:get(Ref, 1),
+ ok = counters:put(Ref, 1, Min),
+ Min = counters:get(Ref, 1),
IncrMax = (Max bsl 1) bor 1,
- ok = counters:sub(Ref, 1, counters:get(Ref, 1)),
+ ok = counters:put(Ref, 1, 0),
ok = counters:add(Ref, 1, IncrMax),
-1 = counters:get(Ref, 1),
{'EXIT',{badarg,_}} = (catch counters:add(Ref, 1, IncrMax+1)),
{'EXIT',{badarg,_}} = (catch counters:add(Ref, 1, Min-1)),
+ {'EXIT',{badarg,_}} = (catch counters:put(Ref, 1, Max+1)),
+ {'EXIT',{badarg,_}} = (catch counters:add(Ref, 1, Min-1)),
+ ok.
+
+%% Verify that independent workers, using different counters
+%% within the same array, do not interfere with each other.
+indep(Config) when is_list(Config) ->
+ NScheds = erlang:system_info(schedulers),
+ Ref = counters:new(NScheds,[write_concurrency]),
+ Rounds = 100,
+ Papa = self(),
+ Pids = [spawn_opt(fun () ->
+ Val = I*197,
+ counters:put(Ref, I, Val),
+ indep_looper(Rounds, Ref, I, Val),
+ Papa ! {self(), done}
+ end,
+ [link, {scheduler, I}])
+ || I <- lists:seq(1, NScheds)],
+ [receive {P,done} -> ok end || P <- Pids],
ok.
+
+indep_looper(0, _, _ , _) ->
+ ok;
+indep_looper(N, Ref, I, Val0) ->
+ %%io:format("Val0 = ~p\n", [Val0]),
+ Val0 = counters:get(Ref, I),
+ Val1 = indep_adder(Ref, I, Val0),
+ indep_subber(Ref, I, Val1),
+ Val2 = N*7 + I,
+ counters:put(Ref, I, Val2),
+ indep_looper(N-1, Ref, I, Val2).
+
+indep_adder(Ref, I, Val) when Val < (1 bsl 62) ->
+ %%io:format("adder Val = ~p\n", [Val]),
+ Incr = abs(Val div 2) + I + 984735,
+ counters:add(Ref, I, Incr),
+ Res = Val + Incr,
+ Res = counters:get(Ref, I),
+ indep_adder(Ref, I, Res);
+indep_adder(_Ref, _I, Val) ->
+ Val.
+
+indep_subber(Ref, I, Val) when Val > -(1 bsl 62) ->
+ %%io:format("subber Val = ~p\n", [Val]),
+ Decr = (abs(Val div 2) + I + 725634),
+ counters:sub(Ref, I, Decr),
+ Res = Val - Decr,
+ Res = counters:get(Ref, I),
+ indep_subber(Ref, I, Res);
+indep_subber(_Ref, _I, Val) ->
+ Val.
+
+
+
+%% Verify write_concurrency yields correct results.
+write_concurrency(Config) when is_list(Config) ->
+ rand:seed(exs1024s),
+ io:format("*** SEED: ~p ***\n", [rand:export_seed()]),
+ NScheds = erlang:system_info(schedulers),
+ Size = 100,
+ Ref = counters:new(Size,[write_concurrency]),
+ Rounds = 1000,
+ Papa = self(),
+ Pids = [spawn_opt(fun Worker() ->
+ receive
+ {go, Ix, Incr} ->
+ wc_looper(Rounds, Ref, Ix, Incr),
+ Papa ! {self(), done, Rounds*Incr},
+ Worker();
+ stop ->
+ ok
+ end
+ end,
+ [link, {scheduler, N}])
+ || N <- lists:seq(1, NScheds)],
+ [begin
+ Base = rand_log64(),
+ counters:put(Ref, Index, Base),
+ SendList = [{P,{go, Index, rand_log64()}} || P <- Pids],
+ [P ! Msg || {P,Msg} <- SendList],
+ Added = lists:sum([receive {P,done,Contrib} -> Contrib end || P <- Pids]),
+ Result = mask_sint64(Base+Added),
+ {_,Result} = {Result, counters:get(Ref, Index)}
+ end
+ || Index <- lists:seq(1, Size)],
+
+ [begin unlink(P), P ! stop end || P <- Pids],
+ ok.
+
+wc_looper(0, _, _, _) ->
+ ok;
+wc_looper(N, Ref, Ix, Incr) ->
+ counters:add(Ref, Ix, Incr),
+ wc_looper(N-1, Ref, Ix, Incr).
+
+mask_sint64(X) ->
+ SMask = 1 bsl 63,
+ UMask = SMask - 1,
+ (X band UMask) - (X band SMask).
+
+%% A random signed 64-bit integer
+%% with a uniformly distributed number of significant bits.
+rand_log64() ->
+ Uint = round(math:pow(2, rand:uniform()*63)),
+ case rand:uniform(2) of
+ 1 -> -Uint;
+ 2 -> Uint
+ end.
diff --git a/erts/emulator/test/driver_SUITE.erl b/erts/emulator/test/driver_SUITE.erl
index 6f5d639d04..bd62708aa7 100644
--- a/erts/emulator/test/driver_SUITE.erl
+++ b/erts/emulator/test/driver_SUITE.erl
@@ -1754,7 +1754,7 @@ smp_select0(Config) ->
ProcFun = fun()-> io:format("Worker ~p starting\n",[self()]),
Port = open_port({spawn, DrvName}, []),
smp_select_loop(Port, 100000),
- sleep(1000), % wait for driver to handle pending events
+ smp_select_done(Port),
true = erlang:port_close(Port),
Master ! {ok,self()},
io:format("Worker ~p finished\n",[self()])
@@ -1784,6 +1784,21 @@ smp_select_loop(Port, N) ->
smp_select_loop(Port, N-1)
end.
+smp_select_done(Port) ->
+ case erlang:port_control(Port, ?CHKIO_SMP_SELECT, "done") of
+ "wait" ->
+ receive
+ {Port, done} ->
+ ok
+ after 10*1000 ->
+ %% Seems we have a lost ready_input event.
+ %% Go ahead anyway, port will crash VM when closed.
+ ok
+ end;
+
+ "ok" -> ok
+ end.
+
smp_select_wait([], _) ->
ok;
smp_select_wait(Pids, TimeoutMsg) ->
diff --git a/erts/emulator/test/driver_SUITE_data/chkio_drv.c b/erts/emulator/test/driver_SUITE_data/chkio_drv.c
index ee8f28e8b1..b9ee155b4b 100644
--- a/erts/emulator/test/driver_SUITE_data/chkio_drv.c
+++ b/erts/emulator/test/driver_SUITE_data/chkio_drv.c
@@ -90,7 +90,7 @@ typedef struct chkio_smp_select {
int next_read;
int next_write;
int first_write;
- enum {Closed, Opened, Selected, Waiting} state;
+ enum {Closed, Opened, Selected, Waiting, WaitingDone} state;
int wasSelected;
unsigned rand_state;
}ChkioSmpSelect;
@@ -292,18 +292,20 @@ stop_steal_aux(ChkioDrvData *cddp)
static void free_smp_select(ChkioSmpSelect* pip, ErlDrvPort port)
{
switch (pip->state) {
+ case WaitingDone:
case Waiting: {
int word;
- fprintf(stderr, "Closing pipe in state Waiting. Event lost?\n");
+ fprintf(stderr, "Closing pipe in state Waiting*. Event lost?\r\n");
for (;;) {
int bytes = read(pip->read_fd, &word, sizeof(word));
if (bytes != sizeof(word)) {
if (bytes != 0) {
- fprintf(stderr, "Failed to read from pipe, bytes=%d, errno=%d\n", bytes, errno);
+ fprintf(stderr, "Failed to read from pipe, bytes=%d, errno=%d\r\n",
+ bytes, errno);
}
break;
}
- fprintf(stderr, "Read from pipe: %d\n", word);
+ fprintf(stderr, "Read from pipe: %d\r\n", word);
}
abort();
}
@@ -318,6 +320,8 @@ static void free_smp_select(ChkioSmpSelect* pip, ErlDrvPort port)
close(pip->write_fd);
pip->state = Closed;
break;
+ case Closed:
+ break;
}
driver_free(pip);
}
@@ -383,6 +387,9 @@ chkio_drv_start(ErlDrvPort port, char *command)
cddp->id = driver_mk_port(port);
cddp->test = CHKIO_STOP;
cddp->test_data = NULL;
+
+ drv_use_singleton.fd_stop_select = -2; /* disable stop_select asserts */
+
return (ErlDrvData) cddp;
#endif
}
@@ -526,7 +533,7 @@ chkio_drv_ready_input(ErlDrvData drv_data, ErlDrvEvent event)
printf("Read event on uninitiated pipe %d\n", fd);
abort();
}
- if (pip->state != Selected && pip->state != Waiting) {
+ if (pip->state != Selected && pip->state != Waiting && pip->state != WaitingDone) {
printf("Read event on pipe in strange state %d\n", pip->state);
abort();
}
@@ -536,9 +543,9 @@ chkio_drv_ready_input(ErlDrvData drv_data, ErlDrvEvent event)
inPipe = (pip->next_write - pip->next_read);
if (inPipe == 0) {
bytes = read(pip->read_fd, &word, sizeof(word));
- printf("Unexpected empty pipe, expected %u -> %u, bytes=%d, word=%d, written=%d\n",
- pip->next_read, pip->next_write-1, bytes, word,
- (pip->next_write - pip->first_write));
+ printf("Unexpected empty pipe: ptr=%p, fds=%d->%d, read bytes=%d, word=%d, written=%d\n",
+ pip, pip->write_fd, pip->read_fd,
+ bytes, word, (pip->next_write - pip->first_write));
/*abort();
Allow unexpected events as it's been seen to be triggered by epoll
on Linux. Most of the time the unwanted events are filtered by
@@ -564,7 +571,20 @@ chkio_drv_ready_input(ErlDrvData drv_data, ErlDrvEvent event)
TRACEF(("Read %d from fd=%d\n", word, fd));
pip->next_read++;
}
- pip->state = Selected; /* not Waiting anymore */
+ if (pip->state == WaitingDone) {
+ if (pip->next_write == pip->next_read) {
+ /* All data read, send {Port, done} */
+ ErlDrvTermData spec[] = {ERL_DRV_PORT, driver_mk_port(cddp->port),
+ ERL_DRV_ATOM, driver_mk_atom("done"),
+ ERL_DRV_TUPLE, 2};
+ erl_drv_output_term(driver_mk_port(cddp->port),
+ spec, sizeof(spec) / sizeof(spec[0]));
+ pip->state = Selected;
+ }
+ }
+ else {
+ pip->state = Selected; /* not Waiting anymore */
+ }
break;
}
case CHKIO_DRV_USE:
@@ -962,6 +982,16 @@ chkio_drv_control(ErlDrvData drv_data,
}
case CHKIO_SMP_SELECT: {
ChkioSmpSelect* pip = (ChkioSmpSelect*) cddp->test_data;
+ if (len == 4 && memcmp(buf, "done", 4) == 0) {
+ if (pip && pip->state == Waiting) {
+ pip->state = WaitingDone;
+ res_str = "wait";
+ }
+ else
+ res_str = "ok";
+ res_len = -1;
+ break;
+ }
if (pip == NULL) {
erl_drv_mutex_lock(smp_pipes_mtx);
if (smp_pipes) {
@@ -1014,7 +1044,6 @@ chkio_drv_control(ErlDrvData drv_data,
if (pip->wasSelected && (op & 1)) {
TRACEF(("%T: Close pipe [%d->%d]\n", cddp->id, pip->write_fd,
pip->read_fd));
- drv_use_singleton.fd_stop_select = -2; /* disable stop_select asserts */
if (driver_select(cddp->port, (ErlDrvEvent)(ErlDrvSInt)pip->read_fd,
DO_READ|ERL_DRV_USE, 0)
|| close(pip->write_fd)) {