From ce9cad280e24984bad398f5707a17a8f716ca5d3 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Tue, 9 Nov 2010 23:21:12 +0100 Subject: New test suite containing stress tests of the rwmutex implementation --- erts/emulator/test/Makefile | 1 + erts/emulator/test/mtx_SUITE.erl | 473 +++++++++++++++++ erts/emulator/test/mtx_SUITE_data/Makefile.src | 30 ++ erts/emulator/test/mtx_SUITE_data/mtx_SUITE.c | 692 +++++++++++++++++++++++++ 4 files changed, 1196 insertions(+) create mode 100644 erts/emulator/test/mtx_SUITE.erl create mode 100644 erts/emulator/test/mtx_SUITE_data/Makefile.src create mode 100644 erts/emulator/test/mtx_SUITE_data/mtx_SUITE.c (limited to 'erts') diff --git a/erts/emulator/test/Makefile b/erts/emulator/test/Makefile index a4c02da626..7259e1b84d 100644 --- a/erts/emulator/test/Makefile +++ b/erts/emulator/test/Makefile @@ -83,6 +83,7 @@ MODULES= \ receive_SUITE \ ref_SUITE \ register_SUITE \ + mtx_SUITE \ save_calls_SUITE \ send_term_SUITE \ sensitive_SUITE \ diff --git a/erts/emulator/test/mtx_SUITE.erl b/erts/emulator/test/mtx_SUITE.erl new file mode 100644 index 0000000000..ae77fe4d89 --- /dev/null +++ b/erts/emulator/test/mtx_SUITE.erl @@ -0,0 +1,473 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2010. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +%% Stress tests of rwmutex implementation. +%% +%% Author: Rickard Green +%% +-module(mtx_SUITE). + +%%-define(line_trace,true). + +-include("test_server.hrl"). + +-export([all/1, init_per_suite/1, end_per_suite/1, init_per_testcase/2, fin_per_testcase/2]). + +-export([long_rwlock/1, + hammer_ets_rwlock/1, + hammer_rwlock/1, + hammer_rwlock_check/1, + hammer_tryrwlock/1, + hammer_tryrwlock_check/1, + hammer_sched_long_rwlock/1, + hammer_sched_long_rwlock_check/1, + hammer_sched_long_freqread_rwlock/1, + hammer_sched_long_freqread_rwlock_check/1, + hammer_sched_long_tryrwlock/1, + hammer_sched_long_tryrwlock_check/1, + hammer_sched_long_freqread_tryrwlock/1, + hammer_sched_long_freqread_tryrwlock_check/1, + hammer_sched_rwlock/1, + hammer_sched_rwlock_check/1, + hammer_sched_freqread_rwlock/1, + hammer_sched_freqread_rwlock_check/1, + hammer_sched_tryrwlock/1, + hammer_sched_tryrwlock_check/1, + hammer_sched_freqread_tryrwlock/1, + hammer_sched_freqread_tryrwlock_check/1]). + +init_per_suite(Config) when is_list(Config) -> + DataDir = ?config(data_dir, Config), + Lib = filename:join([DataDir, atom_to_list(?MODULE)]), + ok = erlang:load_nif(Lib, none), + Config. + +end_per_suite(Config) when is_list(Config) -> + Config. + +init_per_testcase(_Case, Config) -> + Dog = ?t:timetrap(?t:minutes(15)), + [{watchdog, Dog}|Config]. + +fin_per_testcase(_Func, Config) -> + Dog = ?config(watchdog, Config), + ?t:timetrap_cancel(Dog). + +all(suite) -> + [ + long_rwlock, + hammer_rwlock_check, + hammer_rwlock, + hammer_tryrwlock_check, + hammer_tryrwlock, + hammer_ets_rwlock, + hammer_sched_long_rwlock_check, + hammer_sched_long_rwlock, + hammer_sched_long_freqread_rwlock_check, + hammer_sched_long_freqread_rwlock, + hammer_sched_long_tryrwlock_check, + hammer_sched_long_tryrwlock, + hammer_sched_long_freqread_tryrwlock_check, + hammer_sched_long_freqread_tryrwlock, + hammer_sched_rwlock_check, + hammer_sched_rwlock, + hammer_sched_freqread_rwlock_check, + hammer_sched_freqread_rwlock, + hammer_sched_tryrwlock_check, + hammer_sched_tryrwlock, + hammer_sched_freqread_tryrwlock_check, + hammer_sched_freqread_tryrwlock + ]. + +long_rwlock(Config) when is_list(Config) -> + statistics(runtime), + LLRes = long_rw_test(), + {_, RunTime} = statistics(runtime), + %% A very short run time is expected, since + %% threads in the test mostly wait + ?t:format("RunTime=~p~n", [RunTime]), + ?line true = RunTime < 100, + ?line RunTimeStr = "Run-time during test was "++integer_to_list(RunTime)++" ms.", + case LLRes of + ok -> + {comment, RunTimeStr}; + {comment, Comment} -> + {comment, Comment ++ " " ++ RunTimeStr} + end. + +hammer_rwlock(Config) when is_list(Config) -> + hammer_rw_test(false). + +hammer_rwlock_check(Config) when is_list(Config) -> + hammer_rw_test(true). + +hammer_tryrwlock(Config) when is_list(Config) -> + hammer_tryrw_test(false). + +hammer_tryrwlock_check(Config) when is_list(Config) -> + hammer_tryrw_test(true). + +hammer_sched_rwlock(Config) when is_list(Config) -> + hammer_sched_rwlock_test(false, false, true, 0, 0). + +hammer_sched_rwlock_check(Config) when is_list(Config) -> + hammer_sched_rwlock_test(false, true, true, 0, 0). + +hammer_sched_freqread_rwlock(Config) when is_list(Config) -> + hammer_sched_rwlock_test(true, false, true, 0, 0). + +hammer_sched_freqread_rwlock_check(Config) when is_list(Config) -> + hammer_sched_rwlock_test(true, true, true, 0, 0). + +hammer_sched_tryrwlock(Config) when is_list(Config) -> + hammer_sched_rwlock_test(false, false, false, 0, 100). + +hammer_sched_tryrwlock_check(Config) when is_list(Config) -> + hammer_sched_rwlock_test(false, true, false, 0, 100). + +hammer_sched_freqread_tryrwlock(Config) when is_list(Config) -> + hammer_sched_rwlock_test(true, false, false, 0, 100). + +hammer_sched_freqread_tryrwlock_check(Config) when is_list(Config) -> + hammer_sched_rwlock_test(true, true, false, 0, 100). + +hammer_sched_long_rwlock(Config) when is_list(Config) -> + hammer_sched_rwlock_test(false, false, true, 100, 0). + +hammer_sched_long_rwlock_check(Config) when is_list(Config) -> + hammer_sched_rwlock_test(false, true, true, 100, 0). + +hammer_sched_long_freqread_rwlock(Config) when is_list(Config) -> + hammer_sched_rwlock_test(true, false, true, 100, 0). + +hammer_sched_long_freqread_rwlock_check(Config) when is_list(Config) -> + hammer_sched_rwlock_test(true, true, true, 100, 0). + +hammer_sched_long_tryrwlock(Config) when is_list(Config) -> + hammer_sched_rwlock_test(false, false, false, 100, 100). + +hammer_sched_long_tryrwlock_check(Config) when is_list(Config) -> + hammer_sched_rwlock_test(false, true, false, 100, 100). + +hammer_sched_long_freqread_tryrwlock(Config) when is_list(Config) -> + hammer_sched_rwlock_test(true, false, false, 100, 100). + +hammer_sched_long_freqread_tryrwlock_check(Config) when is_list(Config) -> + hammer_sched_rwlock_test(true, true, false, 100, 100). + +hammer_sched_rwlock_test(FreqRead, LockCheck, Blocking, WaitLocked, WaitUnlocked) -> + case create_rwlock(FreqRead, LockCheck) of + enotsup -> + {skipped, "Not supported."}; + RWLock -> + Onln = erlang:system_info(schedulers_online), + NWPs = case Onln div 3 of + 1 -> case Onln < 4 of + true -> 1; + false -> 2 + end; + X -> X + end, + NRPs = Onln - NWPs, + NoLockOps = ((((50000000 div Onln) + div case {Blocking, WaitLocked} of + {false, 0} -> 1; + _ -> 10 + end) + div (case WaitLocked == 0 of + true -> 1; + false -> WaitLocked*250 + end)) + div handicap()), + ?t:format("NoLockOps=~p~n", [NoLockOps]), + Sleep = case Blocking of + true -> NoLockOps; + false -> NoLockOps div 10 + end, + WPs = lists:map( + fun (Sched) -> + spawn_opt( + fun () -> + io:format("Writer on scheduler ~p.~n", + [Sched]), + Sched = erlang:system_info(scheduler_id), + receive go -> gone end, + hammer_sched_rwlock_proc(RWLock, + Blocking, + true, + WaitLocked, + WaitUnlocked, + NoLockOps, + Sleep), + Sched = erlang:system_info(scheduler_id) + end, + [link, {scheduler, Sched}]) + end, + lists:seq(1, NWPs)), + RPs = lists:map( + fun (Sched) -> + spawn_opt( + fun () -> + io:format("Reader on scheduler ~p.~n", + [Sched]), + Sched = erlang:system_info(scheduler_id), + receive go -> gone end, + hammer_sched_rwlock_proc(RWLock, + Blocking, + false, + WaitLocked, + WaitUnlocked, + NoLockOps, + Sleep), + Sched = erlang:system_info(scheduler_id) + end, + [link, {scheduler, Sched}]) + end, + lists:seq(NWPs + 1, NWPs + NRPs)), + Procs = WPs ++ RPs, + case {Blocking, WaitLocked} of + {_, 0} -> ok; + {false, _} -> ok; + _ -> statistics(runtime) + end, + lists:foreach(fun (P) -> P ! go end, Procs), + lists:foreach(fun (P) -> + M = erlang:monitor(process, P), + receive + {'DOWN', M, process, P, _} -> + ok + end + end, + Procs), + case {Blocking, WaitLocked} of + {_, 0} -> ok; + {false, _} -> ok; + _ -> + {_, RunTime} = statistics(runtime), + ?t:format("RunTime=~p~n", [RunTime]), + ?line true = RunTime < 500, + {comment, + "Run-time during test was " + ++ integer_to_list(RunTime) + ++ " ms."} + end + end. + +hammer_sched_rwlock_proc(_RWLock, + _Blocking, + _WriteOp, + _WaitLocked, + _WaitUnlocked, + 0, + _Sleep) -> + ok; +hammer_sched_rwlock_proc(RWLock, + Blocking, + WriteOp, + WaitLocked, + WaitUnlocked, + Times, + Sleep) when Times rem Sleep == 0 -> + rwlock_op(RWLock, Blocking, WriteOp, WaitLocked, WaitUnlocked), + hammer_sched_rwlock_proc(RWLock, + Blocking, + WriteOp, + WaitLocked, + WaitUnlocked, + Times - 1, + Sleep); +hammer_sched_rwlock_proc(RWLock, + Blocking, + WriteOp, + WaitLocked, + WaitUnlocked, + Times, + Sleep) -> + rwlock_op(RWLock, Blocking, WriteOp, WaitLocked, 0), + hammer_sched_rwlock_proc(RWLock, + Blocking, + WriteOp, + WaitLocked, + WaitUnlocked, + Times - 1, + Sleep). + +-define(HAMMER_ETS_RWLOCK_REPEAT_TIMES, 1). +-define(HAMMER_ETS_RWLOCK_TSIZE, 500). + +hammer_ets_rwlock(Config) when is_list(Config) -> + {Ops, Procs} = case handicap() of + 1 -> {20000, 500}; + 2 -> {20000, 50}; + 3 -> {2000, 50}; + _ -> {200, 50} + end, + ?t:format("Procs=~p~nOps=~p~n", [Procs, Ops]), + lists:foreach(fun (XOpts) -> + ?t:format("Running with extra opts: ~p", [XOpts]), + hammer_ets_rwlock_test(XOpts, true, 2, Ops, + Procs, false) + end, + [[], + [{read_concurrency, true}], + [{write_concurrency, true}], + [{read_concurrency, true},{write_concurrency, true}]]), + ok. + +%% Aux funcs + +long_rw_test() -> + exit(no_nif_implementation). + +hammer_rw_test(_Arg) -> + exit(no_nif_implementation). + +hammer_tryrw_test(_Arg) -> + exit(no_nif_implementation). + +create_rwlock(_FreqRead, _LockCheck) -> + exit(no_nif_implementation). + +rwlock_op(_RWLock, _Blocking, _WriteOp, _WaitLocked, _WaitUnlocked) -> + exit(no_nif_implementation). + +hammer_ets_rwlock_put_data() -> + put(?MODULE, {"here are some", data, "to store", make_ref()}). + +hammer_ets_rwlock_get_data() -> + get(?MODULE). + +hammer_ets_rwlock_ops(_T, _UW, _N, _C, _SC, 0) -> + ok; +hammer_ets_rwlock_ops(T, UW, N, C, SC, Tot) when N >= ?HAMMER_ETS_RWLOCK_TSIZE -> + hammer_ets_rwlock_ops(T, UW, 0, C, SC, Tot); +hammer_ets_rwlock_ops(T, UW, N, 0, SC, Tot) -> + case UW of + true -> + true = ets:insert(T, {N, Tot, hammer_ets_rwlock_get_data()}); + false -> + [{N, _, _}] = ets:lookup(T, N) + end, + hammer_ets_rwlock_ops(T, UW, N+1, SC, SC, Tot-1); +hammer_ets_rwlock_ops(T, UW, N, C, SC, Tot) -> + case UW of + false -> + true = ets:insert(T, {N, Tot, hammer_ets_rwlock_get_data()}); + true -> + [{N, _, _}] = ets:lookup(T, N) + end, + hammer_ets_rwlock_ops(T, UW, N+1, C-1, SC, Tot-1). + +hammer_ets_rwlock_init(T, N) when N < ?HAMMER_ETS_RWLOCK_TSIZE -> + ets:insert(T, {N, N, N}), + hammer_ets_rwlock_init(T, N+1); +hammer_ets_rwlock_init(_T, _N) -> + ok. + +hammer_ets_rwlock_test(XOpts, UW, C, N, NP, SC) -> + receive after 100 -> ok end, + {TP, TM} = spawn_monitor( + fun () -> + _L = repeat_list( + fun () -> + Caller = self(), + T = fun () -> + Parent = self(), + hammer_ets_rwlock_put_data(), + T=ets:new(x, [public | XOpts]), + hammer_ets_rwlock_init(T, 0), + Ps0 = repeat_list( + fun () -> + spawn_link( + fun () -> + hammer_ets_rwlock_put_data(), + receive go -> ok end, + hammer_ets_rwlock_ops(T, UW, N, C, C, N), + Parent ! {done, self()}, + receive after infinity -> ok end + end) + end, + NP - case SC of + false -> 0; + _ -> 1 + end), + Ps = case SC of + false -> Ps0; + _ -> [spawn_link(fun () -> + hammer_ets_rwlock_put_data(), + receive go -> ok end, + hammer_ets_rwlock_ops(T, UW, N, SC, SC, N), + Parent ! {done, self()}, + receive after infinity -> ok end + end) | Ps0] + end, + Start = now(), + lists:foreach(fun (P) -> P ! go end, Ps), + lists:foreach(fun (P) -> receive {done, P} -> ok end end, Ps), + Stop = now(), + lists:foreach(fun (P) -> + unlink(P), + exit(P, bang), + M = erlang:monitor(process, P), + receive + {'DOWN', M, process, P, _} -> ok + end + end, Ps), + Res = timer:now_diff(Stop, Start)/1000000, + Caller ! {?MODULE, self(), Res} + end, + TP = spawn_link(T), + receive + {?MODULE, TP, Res} -> + Res + end + end, + ?HAMMER_ETS_RWLOCK_REPEAT_TIMES) + end), + receive + {'DOWN', TM, process, TP, _} -> ok + end. + +repeat_list(Fun, N) -> + repeat_list(Fun, N, []). + +repeat_list(_Fun, 0, Acc) -> + Acc; +repeat_list(Fun, N, Acc) -> + repeat_list(Fun, N-1, [Fun()|Acc]). + + +handicap() -> + X0 = case catch (erlang:system_info(logical_processors_available) >= + erlang:system_info(schedulers_online)) of + true -> 1; + _ -> 2 + end, + case erlang:system_info(build_type) of + opt -> + X0; + ReallySlow when ReallySlow == debug; + ReallySlow == valgrind; + ReallySlow == purify -> + X0*3; + _Slow -> + X0*2 + end. + diff --git a/erts/emulator/test/mtx_SUITE_data/Makefile.src b/erts/emulator/test/mtx_SUITE_data/Makefile.src new file mode 100644 index 0000000000..b6c843269c --- /dev/null +++ b/erts/emulator/test/mtx_SUITE_data/Makefile.src @@ -0,0 +1,30 @@ +# +# %CopyrightBegin% +# +# Copyright Ericsson AB 2010. All Rights Reserved. +# +# The contents of this file are subject to the Erlang Public License, +# Version 1.1, (the "License"); you may not use this file except in +# compliance with the License. You should have received a copy of the +# Erlang Public License along with this software. If not, it can be +# retrieved online at http://www.erlang.org/. +# +# Software distributed under the License is distributed on an "AS IS" +# basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +# the License for the specific language governing rights and limitations +# under the License. +# +# %CopyrightEnd% +# + +include @erts_lib_include_internal_generated@@DS@ethread.mk +include @erts_lib_include_internal_generated@@DS@erts_internal.mk + +NIF_LIBS = mtx_SUITE@dll@ + +SHLIB_EXTRA_CFLAGS = $(ETHR_DEFS) -I@erts_lib_include_internal@ -I@erts_lib_include_internal_generated@ +LIBS = @ERTS_LIBS@ + +all: $(NIF_LIBS) + +@SHLIB_RULES@ diff --git a/erts/emulator/test/mtx_SUITE_data/mtx_SUITE.c b/erts/emulator/test/mtx_SUITE_data/mtx_SUITE.c new file mode 100644 index 0000000000..818023211c --- /dev/null +++ b/erts/emulator/test/mtx_SUITE_data/mtx_SUITE.c @@ -0,0 +1,692 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2010. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ + +/* + * Stress tests of rwmutex implementation. + * + * Author: Rickard Green + */ + +#include "erl_nif.h" + +#ifdef __WIN32__ +# ifndef WIN32_LEAN_AND_MEAN +# define WIN32_LEAN_AND_MEAN +# endif +# include +#else +# include "ethread.h" +# include "erl_misc_utils.h" +# include +#endif + +#include +#include + +static int +fail(const char *file, int line, const char *function, const char *assertion); + +#undef ASSERT +#define ASSERT(X) ((void) ((X) ? 1 : fail(__FILE__, __LINE__, __func__, #X))) + +#ifdef __WIN32__ +/* + * We cannot access the ethread symbols directly; test + * what we got in the nif api instead... + */ +#define HAVE_FREQREAD_SUPPORT 0 +#define RWMUTEX_T ErlNifRWLock +#define RWMUTEX_CREATE(FR) enif_rwlock_create("dummy") +#define RWMUTEX_DESTROY enif_rwlock_destroy +#define RWMUTEX_WLOCK enif_rwlock_rwlock +#define RWMUTEX_TRYWLOCK enif_rwlock_tryrwlock +#define RWMUTEX_WUNLOCK enif_rwlock_rwunlock +#define RWMUTEX_TRYRLOCK enif_rwlock_tryrlock +#define RWMUTEX_RLOCK enif_rwlock_rlock +#define RWMUTEX_RUNLOCK enif_rwlock_runlock +#define THR_ID ErlNifTid +#define THR_CREATE(A, B, C, D) enif_thread_create("dummy", (A), (B), (C), (D)) +#define THR_JOIN enif_thread_join +#define ATOMIC_T volatile LONG +#define ATOMIC_INIT(VarP, Val) (*(VarP) = (Val)) +#define ATOMIC_SET(VarP, Val) (*(VarP) = (Val)) +#define ATOMIC_READ(VarP) (*(VarP)) +#define ATOMIC_INC InterlockedIncrement +#define ATOMIC_DEC InterlockedDecrement + +#else + +#ifdef ETHR_USE_OWN_RWMTX_IMPL__ +# define HAVE_FREQREAD_SUPPORT 1 +#else +# define HAVE_FREQREAD_SUPPORT 0 +#endif + +#define RWMUTEX_T ethr_rwmutex +static ethr_rwmutex * +RWMUTEX_CREATE(int freqread) +{ + ethr_rwmutex *rwmtx = enif_alloc(sizeof(ethr_rwmutex)); + ethr_rwmutex_opt rwmtx_opt = ETHR_RWMUTEX_OPT_DEFAULT_INITER; + if (freqread) + rwmtx_opt.type = ETHR_RWMUTEX_TYPE_FREQUENT_READ; + ASSERT(rwmtx); + ASSERT(ethr_rwmutex_init_opt(rwmtx, &rwmtx_opt) == 0); + return rwmtx; +} +static void +RWMUTEX_DESTROY(ethr_rwmutex *rwmtx) +{ + ASSERT(ethr_rwmutex_destroy(rwmtx) == 0); + enif_free(rwmtx); +} +#define RWMUTEX_TRYWLOCK ethr_rwmutex_tryrwlock +#define RWMUTEX_WLOCK ethr_rwmutex_rwlock +#define RWMUTEX_WUNLOCK ethr_rwmutex_rwunlock +#define RWMUTEX_TRYRLOCK ethr_rwmutex_tryrlock +#define RWMUTEX_RLOCK ethr_rwmutex_rlock +#define RWMUTEX_RUNLOCK ethr_rwmutex_runlock +#define THR_ID ethr_tid +#define THR_CREATE ethr_thr_create +#define THR_JOIN ethr_thr_join +#define ATOMIC_T ethr_atomic_t +#define ATOMIC_INIT ethr_atomic_init +#define ATOMIC_SET ethr_atomic_set +#define ATOMIC_READ ethr_atomic_read +#define ATOMIC_INC ethr_atomic_inc +#define ATOMIC_DEC ethr_atomic_dec + +#endif + + +#if !defined(__func__) +# if !defined(__STDC_VERSION__) || __STDC_VERSION__ < 199901L +# if !defined(__GNUC__) || __GNUC__ < 2 +# define __func__ "[unknown_function]" +# else +# define __func__ __FUNCTION__ +# endif +# endif +#endif + +static void milli_sleep(int ms); +static int get_bool(ErlNifEnv* env, ERL_NIF_TERM term); + +/* + * Long rwlock testcase + */ + +#define LONG_RW_NO_W_THREADS 6 +#define LONG_RW_NO_THREADS 20 +#define LONG_RW_NO_WLOCK_COUNT 100 + +typedef struct { + RWMUTEX_T *rwlock; + ATOMIC_T *is_wlocked; + ATOMIC_T *is_rlocked; + int *stop; + int *count; + int sleep; +} long_rw_t; + +static void * +long_rw_w(void *varg) +{ + long_rw_t *arg = varg; + int stop = 0; + do { + RWMUTEX_WLOCK(arg->rwlock); + ASSERT(!ATOMIC_READ(arg->is_wlocked)); + ATOMIC_SET(arg->is_wlocked, 1); + ASSERT(!ATOMIC_READ(arg->is_rlocked)); + milli_sleep(arg->sleep); + if (++(*arg->count) > LONG_RW_NO_WLOCK_COUNT) + stop = *arg->stop = 1; + ATOMIC_SET(arg->is_wlocked, 0); + ASSERT(!ATOMIC_READ(arg->is_rlocked)); + RWMUTEX_WUNLOCK(arg->rwlock); + } while (!stop); + return NULL; +} + +static void * +long_rw_r(void *varg) +{ + long_rw_t *arg = varg; + int stop; + do { + RWMUTEX_RLOCK(arg->rwlock); + ASSERT(!ATOMIC_READ(arg->is_wlocked)); + ATOMIC_INC(arg->is_rlocked); + milli_sleep(arg->sleep); + stop = *arg->stop; + ATOMIC_DEC(arg->is_rlocked); + ASSERT(!ATOMIC_READ(arg->is_wlocked)); + RWMUTEX_RUNLOCK(arg->rwlock); + } while (!stop); + return NULL; +} + + +static ERL_NIF_TERM long_rw_test(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) +{ + int res, freqread, i, count, stop; + ATOMIC_T is_wlocked, is_rlocked; + THR_ID tid[LONG_RW_NO_THREADS]; + long_rw_t arg; + long_rw_t targ[LONG_RW_NO_THREADS]; + + ATOMIC_INIT(&is_wlocked, 0); + ATOMIC_INIT(&is_rlocked, 0); + + freqread = 0; + + arg.is_wlocked = &is_wlocked; + arg.is_rlocked = &is_rlocked; + arg.count = &count; + arg.stop = &stop; + + restart: + + stop = 0; + count = 0; + + arg.rwlock = RWMUTEX_CREATE(freqread); + + ASSERT(arg.rwlock); + + for (i = 0; i < LONG_RW_NO_W_THREADS; i++) { + targ[i] = arg; + targ[i].sleep = 100 + i*10; + ASSERT(THR_CREATE(&tid[i], long_rw_w, &targ[i], NULL) == 0); + } + for (; i < LONG_RW_NO_THREADS; i++) { + targ[i] = arg; + targ[i].sleep = 100; + ASSERT(THR_CREATE(&tid[i], long_rw_r, &targ[i], NULL) == 0); + } + for (i = 0; i < LONG_RW_NO_THREADS; i++) + ASSERT(THR_JOIN(tid[i], NULL) == 0); + + ASSERT(!ATOMIC_READ(arg.is_wlocked)); + ASSERT(!ATOMIC_READ(arg.is_rlocked)); + + RWMUTEX_DESTROY(arg.rwlock); + + if (HAVE_FREQREAD_SUPPORT && !freqread) { + freqread = 1; + goto restart; + } + + if (freqread) + return enif_make_atom(env, "ok"); + else + return enif_make_tuple2(env, + enif_make_atom(env, + "comment"), + enif_make_string(env, + "No frequent read test made.", + ERL_NIF_LATIN1)); +} + +/* + * Hammer rwlock testcase + */ + +#define HAMMER_RW_NO_W_THREADS 6 +#define HAMMER_RW_NO_THREADS 20 +#define HAMMER_RW_NO_WLOCK_COUNT 1000000 + +typedef struct { + RWMUTEX_T *rwlock; + ATOMIC_T is_locked; + int lock_check; + int stop; + int count; +} hammer_rw_t; + +static void * +hammer_rw_w(void *varg) +{ + hammer_rw_t *arg = varg; + int stop = 0; + do { + RWMUTEX_WLOCK(arg->rwlock); + if (arg->lock_check) { + ASSERT(!ATOMIC_READ(&arg->is_locked)); + ATOMIC_SET(&arg->is_locked, -1); + } + if (++arg->count > HAMMER_RW_NO_WLOCK_COUNT) + stop = arg->stop = 1; + if (arg->lock_check) { + ASSERT(ATOMIC_READ(&arg->is_locked) == -1); + ATOMIC_SET(&arg->is_locked, 0); + } + RWMUTEX_WUNLOCK(arg->rwlock); + } while (!stop); + return NULL; +} + +static void * +hammer_rw_r(void *varg) +{ + hammer_rw_t *arg = varg; + int stop; + do { + RWMUTEX_RLOCK(arg->rwlock); + if (arg->lock_check) { + ASSERT(ATOMIC_READ(&arg->is_locked) >= 0); + ATOMIC_INC(&arg->is_locked); + } + stop = arg->stop; + if (arg->lock_check) { + ASSERT(ATOMIC_READ(&arg->is_locked) > 0); + ATOMIC_DEC(&arg->is_locked); + } + RWMUTEX_RUNLOCK(arg->rwlock); + } while (!stop); + return NULL; +} + + +static ERL_NIF_TERM hammer_rw_test(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) +{ + hammer_rw_t arg; + char buf[10]; + int res, freqread, i; + THR_ID tid[HAMMER_RW_NO_THREADS]; + + if (argc != 1) + goto badarg; + + arg.lock_check = get_bool(env, argv[0]); + if (arg.lock_check < 0) + goto badarg; + + ATOMIC_INIT(&arg.is_locked, 0); + + freqread = 0; + + restart: + arg.stop = 0; + arg.count = 0; + + arg.rwlock = RWMUTEX_CREATE(freqread); + + ASSERT(arg.rwlock); + + for (i = 0; i < HAMMER_RW_NO_W_THREADS; i++) + ASSERT(THR_CREATE(&tid[i], hammer_rw_w, &arg, NULL) == 0); + for (; i < HAMMER_RW_NO_THREADS; i++) + ASSERT(THR_CREATE(&tid[i], hammer_rw_r, &arg, NULL) == 0); + for (i = 0; i < HAMMER_RW_NO_THREADS; i++) + ASSERT(THR_JOIN(tid[i], NULL) == 0); + + ASSERT(!ATOMIC_READ(&arg.is_locked)); + + RWMUTEX_DESTROY(arg.rwlock); + + if (HAVE_FREQREAD_SUPPORT && !freqread) { + freqread = 1; + goto restart; + } + + if (freqread) + return enif_make_atom(env, "ok"); + else + return enif_make_tuple2(env, + enif_make_atom(env, + "comment"), + enif_make_string(env, + "No frequent read test made.", + ERL_NIF_LATIN1)); + badarg: + return enif_make_badarg(env); +} + +/* + * Hammer try rwlock testcase + */ + +#define HAMMER_TRYRW_NO_W_THREADS 10 +#define HAMMER_TRYRW_NO_THREADS 20 +#define HAMMER_TRYRW_NO_WLOCK_COUNT 10000000 +#define HAMMER_TRYRW_NO_RLOCK_COUNT 10000000 +#define HAMMER_TRYRW_NO_WLOCK_WAIT_COUNT ((10*HAMMER_TRYRW_NO_WLOCK_COUNT)/8) +#define HAMMER_TRYRW_NO_RLOCK_WAIT_COUNT ((10*HAMMER_TRYRW_NO_RLOCK_COUNT)/8) + +typedef struct { + RWMUTEX_T *rwlock; + ATOMIC_T is_locked; + int lock_check; + int w_count; + ATOMIC_T r_count; +} hammer_tryrw_t; + +static void * +hammer_tryrw_w(void *varg) +{ + hammer_tryrw_t *arg = varg; + int stop = 0; + int wait = 0; + do { + while (EBUSY == RWMUTEX_TRYWLOCK(arg->rwlock)); + if (arg->lock_check) { + ASSERT(!ATOMIC_READ(&arg->is_locked)); + ATOMIC_SET(&arg->is_locked, -1); + } + if (++arg->w_count > HAMMER_TRYRW_NO_WLOCK_COUNT) + stop = 1; + else if (arg->w_count > HAMMER_TRYRW_NO_RLOCK_WAIT_COUNT) + wait = 1; + if (arg->lock_check) { + ASSERT(ATOMIC_READ(&arg->is_locked) == -1); + ATOMIC_SET(&arg->is_locked, 0); + } + RWMUTEX_WUNLOCK(arg->rwlock); + if (wait) + milli_sleep(1); + } while (!stop); + return NULL; +} + +static void * +hammer_tryrw_r(void *varg) +{ + hammer_tryrw_t *arg = varg; + long r_count; + int stop = 0; + int wait = 0; + do { + while (EBUSY == RWMUTEX_TRYRLOCK(arg->rwlock)); + if (arg->lock_check) { + ASSERT(ATOMIC_READ(&arg->is_locked) >= 0); + ATOMIC_INC(&arg->is_locked); + } + ATOMIC_INC(&arg->r_count); + r_count = ATOMIC_READ(&arg->r_count); + if (r_count > HAMMER_TRYRW_NO_RLOCK_COUNT) + stop = 1; + else if (r_count > HAMMER_TRYRW_NO_RLOCK_WAIT_COUNT) + wait = 1; + if (arg->lock_check) { + ASSERT(ATOMIC_READ(&arg->is_locked) > 0); + ATOMIC_DEC(&arg->is_locked); + } + RWMUTEX_RUNLOCK(arg->rwlock); + if (wait) + milli_sleep(1); + } while (!stop); + return NULL; +} + + +static ERL_NIF_TERM hammer_tryrw_test(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) +{ + hammer_tryrw_t arg; + char buf[10]; + int res, freqread, i; + THR_ID tid[HAMMER_TRYRW_NO_THREADS]; + + if (argc != 1) + goto badarg; + + arg.lock_check = get_bool(env, argv[0]); + if (arg.lock_check < 0) + goto badarg; + + ATOMIC_INIT(&arg.is_locked, 0); + freqread = 0; + + restart: + + arg.w_count = 0; + ATOMIC_INIT(&arg.r_count, 0); + + arg.rwlock = RWMUTEX_CREATE(freqread); + + ASSERT(arg.rwlock); + + for (i = 0; i < HAMMER_TRYRW_NO_W_THREADS; i++) + ASSERT(THR_CREATE(&tid[i], hammer_tryrw_w, &arg, NULL) == 0); + for (; i < HAMMER_TRYRW_NO_THREADS; i++) + ASSERT(THR_CREATE(&tid[i], hammer_tryrw_r, &arg, NULL) == 0); + for (i = 0; i < HAMMER_TRYRW_NO_THREADS; i++) + ASSERT(THR_JOIN(tid[i], NULL) == 0); + + ASSERT(!ATOMIC_READ(&arg.is_locked)); + + RWMUTEX_DESTROY(arg.rwlock); + + if (HAVE_FREQREAD_SUPPORT && !freqread) { + freqread = 1; + goto restart; + } + + if (freqread) + return enif_make_atom(env, "ok"); + else + return enif_make_tuple2(env, + enif_make_atom(env, + "comment"), + enif_make_string(env, + "No frequent read test made.", + ERL_NIF_LATIN1)); + badarg: + return enif_make_badarg(env); +} + +typedef struct { + int lock_check; + ATOMIC_T is_locked; + RWMUTEX_T *rwlock; +} rwlock_resource_t; + +static void +rwlock_destructor(ErlNifEnv* env, void* obj) +{ + rwlock_resource_t *rwlr = obj; + if (rwlr->lock_check) + ASSERT(!ATOMIC_READ(&rwlr->is_locked)); + RWMUTEX_DESTROY(rwlr->rwlock); +} + +/* + * create_rwlock(FreqRead, LockCheck) + */ + +static ERL_NIF_TERM +create_rwlock(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) +{ + int lock_check, freqread; + ERL_NIF_TERM rwlock_term; + rwlock_resource_t *rwlr; + char buf[100]; + + if (argc != 2) + goto badarg; + + freqread = get_bool(env, argv[0]); + if (freqread < 0) + goto badarg; + + if (!HAVE_FREQREAD_SUPPORT && freqread) + return enif_make_atom(env, "enotsup"); + + lock_check = get_bool(env, argv[1]); + if (lock_check < 0) + goto badarg; + + rwlr = enif_alloc_resource(enif_priv_data(env), sizeof(rwlock_resource_t)); + rwlr->lock_check = lock_check; + ATOMIC_INIT(&rwlr->is_locked, 0); + rwlr->rwlock = RWMUTEX_CREATE(freqread); + rwlock_term = enif_make_resource(env, rwlr); + enif_release_resource(rwlr); + return rwlock_term; + + badarg: + return enif_make_badarg(env); +} + +/* + * rwlock_op(RWLock, Blocking, WriteOp, WaitTime) + */ + +static ERL_NIF_TERM +rwlock_op(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) +{ + rwlock_resource_t *rwlr; + int blocking, write, wait_locked, wait_unlocked; + + if (argc != 5) + goto badarg; + + if (!enif_get_resource(env, argv[0], enif_priv_data(env), (void **) &rwlr)) + goto badarg; + + blocking = get_bool(env, argv[1]); + if (blocking < 0) + goto badarg; + + write = get_bool(env, argv[2]); + if (write < 0) + goto badarg; + + if (!enif_get_int(env, argv[3], &wait_locked)) + goto badarg; + if (wait_locked < 0) + goto badarg; + + if (!enif_get_int(env, argv[4], &wait_unlocked)) + goto badarg; + if (wait_unlocked < 0) + goto badarg; + + if (write) { + if (blocking) + RWMUTEX_WLOCK(rwlr->rwlock); + else + while (EBUSY == RWMUTEX_TRYWLOCK(rwlr->rwlock)); + if (rwlr->lock_check) { + ASSERT(!ATOMIC_READ(&rwlr->is_locked)); + ATOMIC_SET(&rwlr->is_locked, -1); + } + } + else { + if (blocking) + RWMUTEX_RLOCK(rwlr->rwlock); + else + while (EBUSY == RWMUTEX_TRYRLOCK(rwlr->rwlock)); + if (rwlr->lock_check) { + ASSERT(ATOMIC_READ(&rwlr->is_locked) >= 0); + ATOMIC_INC(&rwlr->is_locked); + } + } + + if (wait_locked) + milli_sleep(wait_locked); + + if (write) { + if (rwlr->lock_check) { + ASSERT(ATOMIC_READ(&rwlr->is_locked) == -1); + ATOMIC_SET(&rwlr->is_locked, 0); + } + RWMUTEX_WUNLOCK(rwlr->rwlock); + } + else { + if (rwlr->lock_check) { + ASSERT(ATOMIC_READ(&rwlr->is_locked) > 0); + ATOMIC_DEC(&rwlr->is_locked); + } + RWMUTEX_RUNLOCK(rwlr->rwlock); + } + + if (wait_unlocked) + milli_sleep(wait_unlocked); + + return enif_make_atom(env, "ok"); + badarg: + return enif_make_badarg(env); +} + +static int load_nif_lib(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) +{ + *priv_data = enif_open_resource_type(env, + NULL, + "rwlock_resource", + rwlock_destructor, + ERL_NIF_RT_CREATE, + NULL); + if (*priv_data) + return 0; + else + return -1; +} + +/* + * 0 -> false + * >0 -> true + * <0 -> error + */ + +static int +get_bool(ErlNifEnv* env, ERL_NIF_TERM term) +{ + int res; + char buf[10]; + + res = enif_get_atom(env, term, buf, sizeof(buf), ERL_NIF_LATIN1); + if (res == 0) + return -1; + if (strcmp("false", buf) == 0) + return 0; + else if (strcmp("true", buf) == 0) + return 1; + else + return -1; +} + +static int +fail(const char *file, int line, const char *function, const char *assertion) +{ + fprintf(stderr, "%s:%d: Assertion failed in %s(): %s\n", + file, line, function, assertion); + abort(); +} + +static void +milli_sleep(int ms) +{ +#ifdef __WIN32__ + Sleep(ms); +#else + while (erts_milli_sleep(ms) != 0); +#endif +} + +static ErlNifFunc nif_funcs[] = { + {"long_rw_test", 0, long_rw_test}, + {"hammer_rw_test", 1, hammer_rw_test}, + {"hammer_tryrw_test", 1, hammer_tryrw_test}, + {"create_rwlock", 2, create_rwlock}, + {"rwlock_op", 5, rwlock_op} +}; + +ERL_NIF_INIT(mtx_SUITE, nif_funcs, load_nif_lib, NULL, NULL, NULL) -- cgit v1.2.3 From f0fae4bebaa76a7608e09877da62ae84c365388d Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Fri, 5 Nov 2010 16:26:20 +0100 Subject: Don't use more reader groups than schedulers --- erts/emulator/beam/erl_init.c | 2 ++ 1 file changed, 2 insertions(+) (limited to 'erts') diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c index 4ae656a3ad..e2f2cccb7e 100644 --- a/erts/emulator/beam/erl_init.c +++ b/erts/emulator/beam/erl_init.c @@ -786,6 +786,8 @@ early_init(int *argc, char **argv) /* elid.mem.ll.free = ethr_ll_free; #ifdef ERTS_SMP + if (erts_max_main_threads > no_schedulers) + erts_max_main_threads = no_schedulers; elid.main_threads = erts_max_main_threads; #else elid.main_threads = 1; -- cgit v1.2.3 From 80570513a1f121d89543c4c5b11fa5041cc3df7f Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Fri, 5 Nov 2010 15:32:34 +0100 Subject: Miscellaneous rwmutex bug fixes and improvements The ERTS internal rwlock implementation could get into an inconsistent state. This bug was very seldom triggered, but could be during heavy contention. The bug was introduced in R14B (erts-5.8.1). The bug was most likely to be triggered when using the read_concurrency option on an ETS table that was frequently accessed from multiple processes doing lots of writes and reads. That is, in a situation where you typically don't want to use the read_concurrency option in the first place. --- erts/include/internal/ethr_mutex.h | 145 +++++++- erts/include/internal/i386/atomic.h | 49 ++- erts/include/internal/sparc32/atomic.h | 35 +- erts/lib_src/common/ethr_mutex.c | 631 +++++++++++++++++++-------------- 4 files changed, 588 insertions(+), 272 deletions(-) (limited to 'erts') diff --git a/erts/include/internal/ethr_mutex.h b/erts/include/internal/ethr_mutex.h index 8d9d5e3d08..636fdc1e2f 100644 --- a/erts/include/internal/ethr_mutex.h +++ b/erts/include/internal/ethr_mutex.h @@ -33,6 +33,13 @@ # define ETHR_MTX_HARD_DEBUG #endif +#if 0 +# define ETHR_MTX_CHK_EXCL +#if 1 +# define ETHR_MTX_CHK_NON_EXCL +#endif +#endif + #ifdef ETHR_MTX_HARD_DEBUG # ifdef __GNUC__ # warning ETHR_MTX_HARD_DEBUG @@ -49,6 +56,15 @@ #if defined(ETHR_USE_OWN_RWMTX_IMPL__) || defined(ETHR_USE_OWN_MTX_IMPL__) +#ifdef ETHR_DEBUG +# ifndef ETHR_MTX_CHK_EXCL +# define ETHR_MTX_CHK_EXCL +# endif +# ifndef ETHR_MTX_CHK_NON_EXCL +# define ETHR_MTX_CHK_NON_EXCL +# endif +#endif + #if 0 # define ETHR_MTX_Q_LOCK_SPINLOCK__ # define ETHR_MTX_QLOCK_TYPE__ ethr_spinlock_t @@ -68,8 +84,8 @@ /* frequent read kind */ #define ETHR_RWMTX_R_FLG__ (((long) 1) << 28) -#define ETHR_RWMTX_R_PEND_UNLCK_MASK__ (ETHR_RWMTX_R_FLG__ - 1) -#define ETHR_RWMTX_R_MASK__ (ETHR_RWMTX_R_WAIT_FLG__ - 1) +#define ETHR_RWMTX_R_ABRT_UNLCK_FLG__ (((long) 1) << 27) +#define ETHR_RWMTX_R_PEND_UNLCK_MASK__ (ETHR_RWMTX_R_ABRT_UNLCK_FLG__ - 1) /* normal kind */ #define ETHR_RWMTX_RS_MASK__ (ETHR_RWMTX_R_WAIT_FLG__ - 1) @@ -91,6 +107,12 @@ struct ethr_mutex_base_ { #ifdef ETHR_MTX_HARD_DEBUG_WSQ int ws; #endif +#ifdef ETHR_MTX_CHK_EXCL + ethr_atomic_t exclusive; +#endif +#ifdef ETHR_MTX_CHK_NON_EXCL + ethr_atomic_t non_exclusive; +#endif #ifdef ETHR_MTX_HARD_DEBUG_LFS ethr_atomic_t hdbg_lfs; #endif @@ -344,6 +366,116 @@ do { \ #define ETHR_MTX_HARD_DEBUG_FENCE_INIT(X) #endif +#ifdef ETHR_MTX_CHK_EXCL + +#if !defined(ETHR_DEBUG) && defined(__GNUC__) +#warning "check exclusive is enabled" +#endif + +# define ETHR_MTX_CHK_EXCL_INIT__(MTXB) \ + ethr_atomic_init(&(MTXB)->exclusive, 0) + +# define ETHR_MTX_CHK_EXCL_IS_EXCL(MTXB) \ +do { \ + ETHR_COMPILER_BARRIER; \ + if (!ethr_atomic_read(&(MTXB)->exclusive)) \ + ethr_assert_failed(__FILE__, __LINE__, __func__,\ + "is exclusive"); \ + ETHR_COMPILER_BARRIER; \ +} while (0) +# define ETHR_MTX_CHK_EXCL_IS_NOT_EXCL(MTXB) \ +do { \ + ETHR_COMPILER_BARRIER; \ + if (ethr_atomic_read(&(MTXB)->exclusive)) \ + ethr_assert_failed(__FILE__, __LINE__, __func__,\ + "is not exclusive"); \ + ETHR_COMPILER_BARRIER; \ +} while (0) +# define ETHR_MTX_CHK_EXCL_SET_EXCL(MTXB) \ +do { \ + ETHR_MTX_CHK_EXCL_IS_NOT_EXCL((MTXB)); \ + ethr_atomic_set(&(MTXB)->exclusive, 1); \ + ETHR_COMPILER_BARRIER; \ +} while (0) +# define ETHR_MTX_CHK_EXCL_UNSET_EXCL(MTXB) \ +do { \ + ETHR_MTX_CHK_EXCL_IS_EXCL((MTXB)); \ + ethr_atomic_set(&(MTXB)->exclusive, 0); \ + ETHR_COMPILER_BARRIER; \ +} while (0) + +#ifdef ETHR_MTX_CHK_NON_EXCL + +#if !defined(ETHR_DEBUG) && defined(__GNUC__) +#warning "check non-exclusive is enabled" +#endif + +# define ETHR_MTX_CHK_NON_EXCL_INIT__(MTXB) \ + ethr_atomic_init(&(MTXB)->non_exclusive, 0) +# define ETHR_MTX_CHK_EXCL_IS_NON_EXCL(MTXB) \ +do { \ + ETHR_COMPILER_BARRIER; \ + if (!ethr_atomic_read(&(MTXB)->non_exclusive)) \ + ethr_assert_failed(__FILE__, __LINE__, __func__,\ + "is non-exclusive"); \ + ETHR_COMPILER_BARRIER; \ +} while (0) +# define ETHR_MTX_CHK_EXCL_IS_NOT_NON_EXCL(MTXB) \ +do { \ + ETHR_COMPILER_BARRIER; \ + if (ethr_atomic_read(&(MTXB)->non_exclusive)) \ + ethr_assert_failed(__FILE__, __LINE__, __func__,\ + "is not non-exclusive"); \ + ETHR_COMPILER_BARRIER; \ +} while (0) +# define ETHR_MTX_CHK_EXCL_SET_NON_EXCL(MTXB) \ +do { \ + ETHR_COMPILER_BARRIER; \ + ethr_atomic_inc(&(MTXB)->non_exclusive); \ + ETHR_COMPILER_BARRIER; \ +} while (0) +# define ETHR_MTX_CHK_EXCL_SET_NON_EXCL_NO(MTXB, NO) \ +do { \ + ETHR_COMPILER_BARRIER; \ + ethr_atomic_add(&(MTXB)->non_exclusive, (NO)); \ + ETHR_COMPILER_BARRIER; \ +} while (0) +# define ETHR_MTX_CHK_EXCL_UNSET_NON_EXCL(MTXB) \ +do { \ + ETHR_COMPILER_BARRIER; \ + ethr_atomic_dec(&(MTXB)->non_exclusive); \ + ETHR_COMPILER_BARRIER; \ +} while (0) +#else +# define ETHR_MTX_CHK_NON_EXCL_INIT__(MTXB) +# define ETHR_MTX_CHK_EXCL_IS_NON_EXCL(MTXB) +# define ETHR_MTX_CHK_EXCL_IS_NOT_NON_EXCL(MTXB) +# define ETHR_MTX_CHK_EXCL_SET_NON_EXCL_NO(MTXB, NO) +# define ETHR_MTX_CHK_EXCL_SET_NON_EXCL(MTXB) +# define ETHR_MTX_CHK_EXCL_UNSET_NON_EXCL(MTXB) +#endif + +#else +# define ETHR_MTX_CHK_EXCL_INIT__(MTXB) +# define ETHR_MTX_CHK_EXCL_IS_EXCL(MTXB) +# define ETHR_MTX_CHK_EXCL_IS_NOT_EXCL(MTXB) +# define ETHR_MTX_CHK_EXCL_SET_EXCL(MTXB) +# define ETHR_MTX_CHK_EXCL_UNSET_EXCL(MTXB) +# define ETHR_MTX_CHK_NON_EXCL_INIT__(MTXB) +# define ETHR_MTX_CHK_EXCL_IS_NON_EXCL(MTXB) +# define ETHR_MTX_CHK_EXCL_IS_NOT_NON_EXCL(MTXB) +# define ETHR_MTX_CHK_EXCL_SET_NON_EXCL_NO(MTXB, NO) +# define ETHR_MTX_CHK_EXCL_SET_NON_EXCL(MTXB) +# define ETHR_MTX_CHK_EXCL_UNSET_NON_EXCL(MTXB) +#endif + +# define ETHR_MTX_CHK_EXCL_INIT(MTXB) \ +do { \ + ETHR_MTX_CHK_EXCL_INIT__((MTXB)); \ + ETHR_MTX_CHK_NON_EXCL_INIT__((MTXB)); \ +} while (0) + + #ifdef ETHR_USE_OWN_MTX_IMPL__ #define ETHR_MTX_DEFAULT_MAIN_SPINCOUNT_MAX 2000 @@ -369,6 +501,11 @@ ETHR_INLINE_FUNC_NAME_(ethr_mutex_trylock)(ethr_mutex *mtx) act = ethr_atomic_cmpxchg_acqb(&mtx->mtxb.flgs, ETHR_RWMTX_W_FLG__, 0); res = (act == 0) ? 0 : EBUSY; +#ifdef ETHR_MTX_CHK_EXCL + if (res == 0) + ETHR_MTX_CHK_EXCL_SET_EXCL(&mtx->mtxb); +#endif + ETHR_MTX_HARD_DEBUG_LFS_TRYRWLOCK(&mtx->mtxb, res); ETHR_MTX_HARD_DEBUG_FENCE_CHK(mtx); @@ -386,6 +523,8 @@ ETHR_INLINE_FUNC_NAME_(ethr_mutex_lock)(ethr_mutex *mtx) if (act != 0) ethr_mutex_lock_wait__(mtx, act); + ETHR_MTX_CHK_EXCL_SET_EXCL(&mtx->mtxb); + ETHR_MTX_HARD_DEBUG_LFS_RWLOCK(&mtx->mtxb); ETHR_MTX_HARD_DEBUG_FENCE_CHK(mtx); @@ -400,6 +539,8 @@ ETHR_INLINE_FUNC_NAME_(ethr_mutex_unlock)(ethr_mutex *mtx) ETHR_MTX_HARD_DEBUG_FENCE_CHK(mtx); ETHR_MTX_HARD_DEBUG_LFS_RWUNLOCK(&mtx->mtxb); + ETHR_MTX_CHK_EXCL_UNSET_EXCL(&mtx->mtxb); + act = ethr_atomic_cmpxchg_relb(&mtx->mtxb.flgs, 0, ETHR_RWMTX_W_FLG__); if (act != ETHR_RWMTX_W_FLG__) ethr_mutex_unlock_wake__(mtx, act); diff --git a/erts/include/internal/i386/atomic.h b/erts/include/internal/i386/atomic.h index f28258059f..52d01aab32 100644 --- a/erts/include/internal/i386/atomic.h +++ b/erts/include/internal/i386/atomic.h @@ -167,15 +167,52 @@ ethr_native_atomic_xchg(ethr_native_atomic_t *var, long val) * Atomic ops with at least specified barriers. */ -#define ethr_native_atomic_read_acqb ethr_native_atomic_read -#define ethr_native_atomic_inc_return_acqb ethr_native_atomic_inc_return +static ETHR_INLINE long +ethr_native_atomic_read_acqb(ethr_native_atomic_t *var) +{ + long val; +#if defined(__x86_64__) || !defined(ETHR_PRE_PENTIUM4_COMPAT) + val = var->counter; +#else + val = ethr_native_atomic_add_return(var, 0); +#endif + __asm__ __volatile__("" : : : "memory"); + return val; +} + +static ETHR_INLINE void +ethr_native_atomic_set_relb(ethr_native_atomic_t *var, long i) +{ + __asm__ __volatile__("" : : : "memory"); #if defined(__x86_64__) || !defined(ETHR_PRE_PENTIUM4_COMPAT) -#define ethr_native_atomic_set_relb ethr_native_atomic_set + var->counter = i; #else -#define ethr_native_atomic_set_relb ethr_native_atomic_xchg + (void) ethr_native_atomic_xchg(var, i); #endif -#define ethr_native_atomic_dec_relb ethr_native_atomic_dec -#define ethr_native_atomic_dec_return_relb ethr_native_atomic_dec_return +} + +static ETHR_INLINE long +ethr_native_atomic_inc_return_acqb(ethr_native_atomic_t *var) +{ + long res = ethr_native_atomic_inc_return(var); + __asm__ __volatile__("" : : : "memory"); + return res; +} + +static ETHR_INLINE void +ethr_native_atomic_dec_relb(ethr_native_atomic_t *var) +{ + __asm__ __volatile__("" : : : "memory"); + ethr_native_atomic_dec(var); +} + +static ETHR_INLINE long +ethr_native_atomic_dec_return_relb(ethr_native_atomic_t *var) +{ + __asm__ __volatile__("" : : : "memory"); + return ethr_native_atomic_dec_return(var); +} + #define ethr_native_atomic_cmpxchg_acqb ethr_native_atomic_cmpxchg #define ethr_native_atomic_cmpxchg_relb ethr_native_atomic_cmpxchg diff --git a/erts/include/internal/sparc32/atomic.h b/erts/include/internal/sparc32/atomic.h index 2a995d4465..2da6472393 100644 --- a/erts/include/internal/sparc32/atomic.h +++ b/erts/include/internal/sparc32/atomic.h @@ -176,38 +176,59 @@ ethr_native_atomic_cmpxchg(ethr_native_atomic_t *var, long new, long old) * Atomic ops with at least specified barriers. */ +/* TODO: relax acquire barriers */ + static ETHR_INLINE long ethr_native_atomic_read_acqb(ethr_native_atomic_t *var) { long res = ethr_native_atomic_read(var); - __asm__ __volatile__("membar #StoreLoad|#StoreStore"); + __asm__ __volatile__("membar #LoadLoad|#LoadStore|#StoreLoad|#StoreStore" : : : "memory"); return res; } static ETHR_INLINE void ethr_native_atomic_set_relb(ethr_native_atomic_t *var, long i) { - __asm__ __volatile__("membar #LoadStore|#StoreStore"); + __asm__ __volatile__("membar #LoadStore|#StoreStore" : : : "memory"); ethr_native_atomic_set(var, i); } +static ETHR_INLINE long +ethr_native_atomic_inc_return_acqb(ethr_native_atomic_t *var) +{ + long res = ethr_native_atomic_inc_return(var); + __asm__ __volatile__("membar #LoadLoad|#LoadStore" : : : "memory"); + return res; +} + static ETHR_INLINE void ethr_native_atomic_dec_relb(ethr_native_atomic_t *var) { - __asm__ __volatile__("membar #LoadStore|#StoreStore"); + __asm__ __volatile__("membar #LoadStore|#StoreStore" : : : "memory"); ethr_native_atomic_dec(var); } static ETHR_INLINE long ethr_native_atomic_dec_return_relb(ethr_native_atomic_t *var) { - __asm__ __volatile__("membar #LoadStore|#StoreStore"); + __asm__ __volatile__("membar #LoadStore|#StoreStore" : : : "memory"); return ethr_native_atomic_dec_return(var); } -#define ethr_native_atomic_inc_return_acqb ethr_native_atomic_inc_return -#define ethr_native_atomic_cmpxchg_acqb ethr_native_atomic_cmpxchg -#define ethr_native_atomic_cmpxchg_relb ethr_native_atomic_cmpxchg +static ETHR_INLINE long +ethr_native_atomic_cmpxchg_acqb(ethr_native_atomic_t *var, long new, long old) +{ + long res = ethr_native_atomic_cmpxchg(var, new, old); + __asm__ __volatile__("membar #LoadLoad|#LoadStore" : : : "memory"); + return res; +} + +static ETHR_INLINE long +ethr_native_atomic_cmpxchg_relb(ethr_native_atomic_t *var, long new, long old) +{ + __asm__ __volatile__("membar #LoadStore|#StoreStore" : : : "memory"); + return ethr_native_atomic_cmpxchg(var, new, old); +} #endif /* ETHR_TRY_INLINE_FUNCS */ diff --git a/erts/lib_src/common/ethr_mutex.c b/erts/lib_src/common/ethr_mutex.c index 78323b62a3..c5738c3d77 100644 --- a/erts/lib_src/common/ethr_mutex.c +++ b/erts/lib_src/common/ethr_mutex.c @@ -205,9 +205,14 @@ static void hard_debug_chk_q__(struct ethr_mutex_base_ *, int); #ifdef ETHR_USE_OWN_RWMTX_IMPL__ static void +rwmutex_transfer_read_lock(ethr_rwmutex *rwmtx, + long initial, + int q_locked); +static void rwmutex_unlock_wake(ethr_rwmutex *rwmtx, int have_w, - long initial); + long initial, + int transfer_read_lock); static int rwmutex_try_complete_runlock(ethr_rwmutex *rwmtx, long initial, @@ -334,7 +339,7 @@ rwmutex_freqread_rdrs_read(ethr_rwmutex *rwmtx, int ix) ETHR_ASSERT(res >= 0); break; case ETHR_RWMUTEX_TYPE_EXTREMELY_FREQUENT_READ: - ETHR_ASSERT(res == 0 || res == 1); + ETHR_ASSERT(ix == 0 ? res >= 0 : (res == 0 || res == 1)); break; default: ETHR_ASSERT(0); @@ -400,6 +405,7 @@ event_wait(struct ethr_mutex_base_ *mtxb, int locked = 0; long act; int need_try_complete_runlock = 0; + int transfer_read_lock = 0; /* Need to enqueue and wait... */ @@ -444,8 +450,8 @@ event_wait(struct ethr_mutex_base_ *mtxb, while (1) { long new, exp = act; - int freqread_tryrlock = 0; need_try_complete_runlock = 0; + transfer_read_lock = 0; if (type == ETHR_RWMTX_W_WAIT_FLG__) { if (is_freq_read && act == ETHR_RWMTX_R_FLG__) @@ -465,14 +471,11 @@ event_wait(struct ethr_mutex_base_ *mtxb, new = act + 1; /* Try to get it */ } else { - if (act & ~ETHR_RWMTX_R_FLG__) - new = act | ETHR_RWMTX_R_WAIT_FLG__; - else { /* Try to get it */ - ethr_rwmutex *rwmtx = (ethr_rwmutex *) mtxb; - rwmutex_freqread_rdrs_inc(rwmtx, tse); - ETHR_MEMORY_BARRIER; - new = act | ETHR_RWMTX_R_FLG__; - freqread_tryrlock = 1; + new = act | ETHR_RWMTX_R_WAIT_FLG__; + if ((act & (ETHR_RWMTX_W_FLG__ + | ETHR_RWMTX_W_WAIT_FLG__)) == 0) { + /* Transfer read lock to this thread. */ + transfer_read_lock = 1; } } } @@ -488,24 +491,6 @@ event_wait(struct ethr_mutex_base_ *mtxb, goto done; } } - - if (freqread_tryrlock) { - ethr_rwmutex *rwmtx = (ethr_rwmutex *) mtxb; - - /* We didn't set ETHR_RWMTX_R_FLG__, however someone - else might have */ - if (act == ETHR_RWMTX_R_FLG__) - goto done; /* Got it by help from someone else */ - - ETHR_ASSERT((act & ETHR_RWMTX_WAIT_FLGS__) == 0); - /* - * We know that no waiter flags have been set, i.e., - * we cannot get into a situation where we need to wake - * someone up here. Just restore the readers counter - * and do it over again... - */ - rwmutex_freqread_rdrs_dec(rwmtx, tse); - } } /* Enqueue */ @@ -535,20 +520,36 @@ event_wait(struct ethr_mutex_base_ *mtxb, /* Wait */ locked = 0; - ETHR_MTX_Q_UNLOCK(&mtxb->qlck); - if (need_try_complete_runlock) { + ETHR_ASSERT(!(transfer_read_lock && need_try_complete_runlock)); + + if (transfer_read_lock) { ETHR_ASSERT(((ethr_rwmutex *) mtxb)->type != ETHR_RWMUTEX_TYPE_NORMAL); /* - * We were the only one in queue when we enqueued, and it - * was seemingly read locked. We need to try to complete a - * runlock otherwise we might be hanging forever. If the - * runlock could be completed we will be dequeued and - * woken by ourselves. + * We are the only one in the queue and we are not write + * locked; rwmutex_transfer_read_lock() will: + * - transfer a read lock to us (since we're first in q) + * - unlock the Q-lock */ - rwmutex_try_complete_runlock((ethr_rwmutex *) mtxb, - act, tse, 0, 1, 0); + rwmutex_transfer_read_lock(((ethr_rwmutex *) mtxb), act, 1); + } + else { + ETHR_MTX_Q_UNLOCK(&mtxb->qlck); + + if (need_try_complete_runlock) { + ETHR_ASSERT(((ethr_rwmutex *) mtxb)->type + != ETHR_RWMUTEX_TYPE_NORMAL); + /* + * We were the only one in queue when we enqueued, and it + * was seemingly read locked. We need to try to complete a + * runlock otherwise we might be hanging forever. If the + * runlock could be completed we will be dequeued and + * woken by ourselves. + */ + rwmutex_try_complete_runlock((ethr_rwmutex *) mtxb, + act, tse, 0, 1, 0); + } } while (1) { @@ -653,8 +654,6 @@ write_lock_wait(struct ethr_mutex_base_ *mtxb, ethr_ts_event *tse = NULL; int until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; int res; - int freq_read_size = -1; - int freq_read_start_ix = -1; ETHR_ASSERT(!is_freq_read || is_rwmtx); @@ -666,44 +665,23 @@ write_lock_wait(struct ethr_mutex_base_ *mtxb, */ while (1) { - long exp; - while (act != 0) { if (is_freq_read && act == ETHR_RWMTX_R_FLG__) { ethr_rwmutex *rwmtx = (ethr_rwmutex *) mtxb; + scnt--; if (!tse) tse = ethr_get_ts_event(); - if (freq_read_size < 0) { - if (rwmtx->type == ETHR_RWMUTEX_TYPE_FREQUENT_READ) { - freq_read_size = reader_groups_array_size; - freq_read_start_ix = tse->rgix; - } - else { - freq_read_size = main_threads_array_size; - freq_read_start_ix = tse->mtix; - } - } - res = check_readers_array(rwmtx, - freq_read_start_ix, - freq_read_size); - scnt--; - if (res == 0) { - act = ethr_atomic_read(&mtxb->flgs); - if (act & ETHR_RWMTX_R_MASK__) { - res = rwmutex_try_complete_runlock(rwmtx, act, - tse, 0, 0, - 1); - if (res != EBUSY) - goto done; /* Got it */ - } - if (scnt <= 0) - goto chk_spin; - if (--until_yield == 0) { - until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; - ETHR_YIELD(); - } - continue; + res = rwmutex_try_complete_runlock(rwmtx, act, + tse, 0, 0, + 1); + if (res != EBUSY) + goto done; /* Got it */ + if (scnt <= 0) + goto chk_spin; + if (--until_yield == 0) { + until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; + ETHR_YIELD(); } } @@ -728,11 +706,9 @@ write_lock_wait(struct ethr_mutex_base_ *mtxb, scnt--; } - exp = act; - act = ethr_atomic_cmpxchg_acqb(&mtxb->flgs, ETHR_RWMTX_W_FLG__, - exp); + 0); if (act == 0) goto done; /* Got it */ } @@ -753,6 +729,7 @@ mtxb_init(struct ethr_mutex_base_ *mtxb, #ifdef ETHR_MTX_HARD_DEBUG_WSQ mtxb->ws = 0; #endif + ETHR_MTX_CHK_EXCL_INIT(mtxb); if (no_spin) { mtxb->main_scnt = 0; mtxb->aux_scnt = 0; @@ -1254,7 +1231,7 @@ ethr_cond_wait(ethr_cond *cnd, ethr_mutex *mtx) ETHR_MTX_HARD_DEBUG_LFS_RWLOCK(&mtx->mtxb); ETHR_MTX_HARD_DEBUG_FENCE_CHK(cnd); ETHR_MTX_HARD_DEBUG_FENCE_CHK(mtx); - + ETHR_MTX_CHK_EXCL_SET_EXCL(&mtx->mtxb); tse->udata = udata; ethr_leave_ts_event(tse); return 0; @@ -1499,7 +1476,62 @@ int check_readers_array(ethr_rwmutex *rwmtx, return 0; } -static ETHR_INLINE void +static void +rwmutex_freqread_rdrs_dec_chk_wakeup(ethr_rwmutex *rwmtx, + ethr_ts_event *tse, + long initial) +{ + long act = initial; + + if ((act & (ETHR_RWMTX_W_FLG__| + ETHR_RWMTX_R_ABRT_UNLCK_FLG__)) == 0) { + if ((act & ETHR_RWMTX_WAIT_FLGS__) == 0) { + if (act & ETHR_RWMTX_R_PEND_UNLCK_MASK__) { + /* + * We *need* to try to complete the runlock. + * A writer that just enqueued (not seen by us + * in flag field) may depend on someone else + * completing the runlock. We just took over + * that responsibilty since we modified reader + * groups. + */ + rwmutex_try_complete_runlock(rwmtx, act, tse, 1, 0, 0); + } + } + else if ((act & ETHR_RWMTX_WAIT_FLGS__) == ETHR_RWMTX_R_WAIT_FLG__) + rwmutex_transfer_read_lock(rwmtx, act, 0); + else if ((act & ETHR_RWMTX_WAIT_FLGS__) == ETHR_RWMTX_W_WAIT_FLG__) + rwmutex_try_complete_runlock(rwmtx, act, tse, 1, 0, 0); + else { + /* + * Don't know if we got readers or writers + * first in queue; need to peek + */ + ETHR_MTX_Q_LOCK(&rwmtx->mtxb.qlck); + if (!rwmtx->mtxb.q) + ETHR_MTX_Q_UNLOCK(&rwmtx->mtxb.qlck); + else if (is_w_waiter(rwmtx->mtxb.q)) { + act = ethr_atomic_read(&rwmtx->mtxb.flgs); + ETHR_MTX_Q_UNLOCK(&rwmtx->mtxb.qlck); + if ((act & ETHR_RWMTX_W_FLG__) == 0) + rwmutex_try_complete_runlock(rwmtx, act, tse, 1, 0, 0); + } + else { + /* + * rwmutex_transfer_read_lock() will + * unlock Q lock. + */ + act = ethr_atomic_read(&rwmtx->mtxb.flgs); + if (act & ETHR_RWMTX_W_FLG__) + ETHR_MTX_Q_UNLOCK(&rwmtx->mtxb.qlck); + else + rwmutex_transfer_read_lock(rwmtx, act, 1); + } + } + } +} + +static void rwmutex_freqread_restore_failed_tryrlock(ethr_rwmutex *rwmtx, ethr_ts_event *tse) { @@ -1509,24 +1541,11 @@ rwmutex_freqread_restore_failed_tryrlock(ethr_rwmutex *rwmtx, */ act = rwmutex_freqread_rdrs_dec_read(rwmtx, tse); - ETHR_WRITE_MEMORY_BARRIER; + ETHR_MEMORY_BARRIER; if (act == 0) { - -#ifndef ETHR_WRITE_MEMORY_BARRIER_IS_FULL - ETHR_READ_MEMORY_BARRIER; -#endif - act = ethr_atomic_read(&rwmtx->mtxb.flgs); - - if ((act & ETHR_RWMTX_W_FLG__) == 0 - && act & (ETHR_RWMTX_WAIT_FLGS__|ETHR_RWMTX_R_PEND_UNLCK_MASK__)) { - /* - * We either got waiters, or someone else trying - * to read unlock which we might have to help. - */ - rwmutex_try_complete_runlock(rwmtx, act, tse, 1, 1, 0); - } + rwmutex_freqread_rdrs_dec_chk_wakeup(rwmtx, tse, act); } } @@ -1542,12 +1561,16 @@ rwmutex_try_complete_runlock(ethr_rwmutex *rwmtx, long act = initial; int six, res, length; + ETHR_ASSERT((act & ETHR_RWMTX_W_FLG__) == 0); + + if (act & ETHR_RWMTX_R_ABRT_UNLCK_FLG__) + return try_write_lock ? EBUSY : 0; + tse_tmp = tse; if (!tse_tmp) tse_tmp = ethr_get_ts_event(); - if ((act & ETHR_RWMTX_WAIT_FLGS__) - && (act & ~ETHR_RWMTX_WAIT_FLGS__) == 0) + if ((act & ETHR_RWMTX_WAIT_FLGS__) && (act & ~ETHR_RWMTX_WAIT_FLGS__) == 0) goto check_waiters; if (rwmtx->type == ETHR_RWMUTEX_TYPE_FREQUENT_READ) { @@ -1569,14 +1592,21 @@ rwmutex_try_complete_runlock(ethr_rwmutex *rwmtx, if (check_before_try) { res = check_readers_array(rwmtx, six, length); + + ETHR_MEMORY_BARRIER; + if (res == EBUSY) return try_write_lock ? EBUSY : 0; } + restart: + while (1) { long exp = act; long new = act+1; + ETHR_ASSERT((act & ETHR_RWMTX_R_ABRT_UNLCK_FLG__) == 0); + ETHR_ASSERT((act & ETHR_RWMTX_R_PEND_UNLCK_MASK__) < ETHR_RWMTX_R_PEND_UNLCK_MASK__); @@ -1585,8 +1615,10 @@ rwmutex_try_complete_runlock(ethr_rwmutex *rwmtx, act = new; break; } + if (!try_write_lock) { - if (act == ETHR_RWMTX_W_FLG__ || act == 0) + if (act == 0 || (act & (ETHR_RWMTX_W_FLG__ + | ETHR_RWMTX_R_ABRT_UNLCK_FLG__))) return 0; if ((act & ETHR_RWMTX_WAIT_FLGS__) == 0) { if ((act & ETHR_RWMTX_R_FLG__) == 0) @@ -1601,33 +1633,50 @@ rwmutex_try_complete_runlock(ethr_rwmutex *rwmtx, else { if (act == 0) goto tryrwlock; - if (act & (ETHR_RWMTX_W_FLG__|ETHR_RWMTX_WAIT_FLGS__)) + if (act & (ETHR_RWMTX_W_FLG__ + | ETHR_RWMTX_R_ABRT_UNLCK_FLG__)) return EBUSY; } } res = check_readers_array(rwmtx, six, length); - if (res == EBUSY) { - act = ethr_atomic_dec_read(&rwmtx->mtxb.flgs); - if (act & ETHR_RWMTX_R_MASK__) - return try_write_lock ? EBUSY : 0; - } - else { - while (1) { - long exp = act; - long new = act; - new &= ~ETHR_RWMTX_R_FLG__; - new--; - ETHR_ASSERT(act & ETHR_RWMTX_R_PEND_UNLCK_MASK__); + ETHR_MEMORY_BARRIER; - act = ethr_atomic_cmpxchg(&rwmtx->mtxb.flgs, new, exp); - if (exp == act) { - if (new & ETHR_RWMTX_R_PEND_UNLCK_MASK__) - return try_write_lock ? EBUSY : 0; - act = new; - break; + ETHR_ASSERT((act & ETHR_RWMTX_W_FLG__) == 0); + + while (1) { + int finished_abort = 0; + long exp = act; + long new = act; + + new--; + if (act & ETHR_RWMTX_R_ABRT_UNLCK_FLG__) { + if ((new & ETHR_RWMTX_R_PEND_UNLCK_MASK__) == 0) { + new &= ~ETHR_RWMTX_R_ABRT_UNLCK_FLG__; + finished_abort = 1; } + ETHR_ASSERT(act & ETHR_RWMTX_R_FLG__); + } + else if ((act & ETHR_RWMTX_R_FLG__) && res != EBUSY) { + new &= ~ETHR_RWMTX_R_FLG__; + } + + ETHR_ASSERT(act & ETHR_RWMTX_R_PEND_UNLCK_MASK__); + + act = ethr_atomic_cmpxchg(&rwmtx->mtxb.flgs, new, exp); + if (exp == act) { + act = new; + if (act & ETHR_RWMTX_W_FLG__) + return try_write_lock ? EBUSY : 0; + if (finished_abort && (act & ETHR_RWMTX_WAIT_FLGS__)) + goto restart; + if (act & (ETHR_RWMTX_R_FLG__ + | ETHR_RWMTX_R_ABRT_UNLCK_FLG__ + | ETHR_RWMTX_R_PEND_UNLCK_MASK__)) + return try_write_lock ? EBUSY : 0; + /* Read unlock completed */ + break; } } @@ -1637,12 +1686,9 @@ rwmutex_try_complete_runlock(ethr_rwmutex *rwmtx, * to write lock it). */ - if (act & ETHR_RWMTX_W_FLG__) - return try_write_lock ? EBUSY : 0; - if (act & ETHR_RWMTX_WAIT_FLGS__) { check_waiters: - rwmutex_unlock_wake(rwmtx, 0, act); + rwmutex_unlock_wake(rwmtx, 0, act, 0); return try_write_lock ? EBUSY : 0; } @@ -1670,7 +1716,7 @@ rwmutex_incdec_restore_failed_tryrlock(ethr_rwmutex *rwmtx) act = ethr_atomic_dec_read(&rwmtx->mtxb.flgs); if ((act & ETHR_RWMTX_WAIT_FLGS__) && (act & ~ETHR_RWMTX_WAIT_FLGS__) == 0) { - rwmutex_unlock_wake(rwmtx, 0, act); + rwmutex_unlock_wake(rwmtx, 0, act, 0); } } @@ -1700,7 +1746,7 @@ rwmutex_normal_rlock_wait(ethr_rwmutex *rwmtx, #endif while (act & (ETHR_RWMTX_W_FLG__|ETHR_RWMTX_W_WAIT_FLG__)) { - if (scnt >= 0) { + if (scnt <= 0) { tse = ethr_get_ts_event(); if (update_spincount(&rwmtx->mtxb, tse, &start_scnt, &scnt)) { event_wait(&rwmtx->mtxb, tse, scnt, @@ -1736,10 +1782,83 @@ rwmutex_normal_rlock_wait(ethr_rwmutex *rwmtx, static void rwmutex_freqread_rlock_wait(ethr_rwmutex *rwmtx, - ethr_ts_event *tse, - long initial) + ethr_ts_event *tse); + +static int +rwmutex_freqread_rlock(ethr_rwmutex *rwmtx, ethr_ts_event *tse, int trylock) { - long act = initial; + int res = 0; + long act; + + rwmutex_freqread_rdrs_inc(rwmtx, tse); + + ETHR_MEMORY_BARRIER; + + act = ethr_atomic_read_acqb(&rwmtx->mtxb.flgs); + + if (act != ETHR_RWMTX_R_FLG__) { + int wake_other_readers; + + while (1) { + long exp, new; + + wake_other_readers = 0; + + if (act == 0) + new = act | ETHR_RWMTX_R_FLG__; + else if (act == ETHR_RWMTX_R_FLG__) + break; /* Got it */ + else if (act & (ETHR_RWMTX_W_FLG__|ETHR_RWMTX_W_WAIT_FLG__)) { + rwmutex_freqread_restore_failed_tryrlock(rwmtx, tse); + if (trylock) + res = EBUSY; + else + rwmutex_freqread_rlock_wait(rwmtx, tse); + break; + } + else if (act & ETHR_RWMTX_R_ABRT_UNLCK_FLG__) { + if ((act & ETHR_RWMTX_R_FLG__) == 0) + ETHR_FATAL_ERROR__(EFAULT); + /* + * An aborted runlock, not write locked, and no write + * waiters, i.e., we got it... + */ + if (act & ETHR_RWMTX_R_WAIT_FLG__) + wake_other_readers = 1; + break; + } + else { + new = act | ETHR_RWMTX_R_FLG__; + if (act & ETHR_RWMTX_R_PEND_UNLCK_MASK__) { + /* + * Someone is doing tryrwlock (no writer and no + * write waiters); we will try to abort that... + */ + new |= ETHR_RWMTX_R_ABRT_UNLCK_FLG__; + } + + if (act & ETHR_RWMTX_R_WAIT_FLG__) + wake_other_readers = 1; + } + + exp = act; + act = ethr_atomic_cmpxchg_acqb(&rwmtx->mtxb.flgs, new, exp); + if (act == exp) + break; + } + + if (wake_other_readers) + rwmutex_transfer_read_lock(rwmtx, act, 0); + } + + return res; +} + +static void +rwmutex_freqread_rlock_wait(ethr_rwmutex *rwmtx, + ethr_ts_event *tse) +{ + long act; int scnt, start_scnt; int until_yield = ETHR_YIELD_AFTER_BUSY_LOOPS; @@ -1752,12 +1871,10 @@ rwmutex_freqread_rlock_wait(ethr_rwmutex *rwmtx, while (1) { - rwmutex_freqread_restore_failed_tryrlock(rwmtx, tse); - act = ethr_atomic_read(&rwmtx->mtxb.flgs); - while (act & ~(ETHR_RWMTX_R_FLG__|ETHR_RWMTX_R_WAIT_FLG__)) { - if (scnt >= 0) { + while (act & (ETHR_RWMTX_W_FLG__|ETHR_RWMTX_W_WAIT_FLG__)) { + if (scnt <= 0) { if (update_spincount(&rwmtx->mtxb, tse, &start_scnt, &scnt)) { event_wait(&rwmtx->mtxb, tse, scnt, ETHR_RWMTX_R_WAIT_FLG__, 1, 1); @@ -1773,30 +1890,8 @@ rwmutex_freqread_rlock_wait(ethr_rwmutex *rwmtx, scnt--; } - rwmutex_freqread_rdrs_inc(rwmtx, tse); - - ETHR_MEMORY_BARRIER; - - act = ethr_atomic_read(&rwmtx->mtxb.flgs); - - if (act == ETHR_RWMTX_R_FLG__) - return; /* Got it */ - - while (1) { - long exp, new; - - if (act & ~(ETHR_RWMTX_R_FLG__|ETHR_RWMTX_R_WAIT_FLG__)) - break; /* Busy (need to restore inc) */ - - if (act & ETHR_RWMTX_R_FLG__) - return; /* Got it */ - - exp = act; - new = act | ETHR_RWMTX_R_FLG__; - act = ethr_atomic_cmpxchg(&rwmtx->mtxb.flgs, new, exp); - if (act == exp) - return; /* Got it */ - } + if (rwmutex_freqread_rlock(rwmtx, tse, 1) != EBUSY) + break; /* Got it */ } } @@ -1816,14 +1911,23 @@ static ETHR_INLINE void rwlock_wake_set_flags(ethr_rwmutex *rwmtx, long new_initial, int act_initial) { long act, act_mask; + int chk_abrt_flg; + + ETHR_MEMORY_BARRIER; + if (rwmtx->type != ETHR_RWMUTEX_TYPE_NORMAL) { /* r pend unlock mask may vary and must be retained */ act_mask = ETHR_RWMTX_R_PEND_UNLCK_MASK__; + if (new_initial & ETHR_RWMTX_R_FLG__) + chk_abrt_flg = 1; + else + chk_abrt_flg = 0; } else { #ifdef ETHR_RLOCK_WITH_INC_DEC /* rs mask may vary and must be retained */ act_mask = ETHR_RWMTX_RS_MASK__; + chk_abrt_flg = 0; #else /* rs mask always zero */ ETHR_ASSERT((act_initial & ETHR_RWMTX_RS_MASK__) == 0); @@ -1836,6 +1940,8 @@ rwlock_wake_set_flags(ethr_rwmutex *rwmtx, long new_initial, int act_initial) while (1) { long exp = act; long new = new_initial + (act & act_mask); + if (chk_abrt_flg && (act & act_mask)) + new |= ETHR_RWMTX_R_ABRT_UNLCK_FLG__; act = ethr_atomic_cmpxchg(&rwmtx->mtxb.flgs, new, exp); if (act == exp) break; @@ -1883,6 +1989,14 @@ dbg_unlock_wake(ethr_rwmutex *rwmtx, exp |= ETHR_RWMTX_R_WAIT_FLG__; if (rwmtx->rq_end->next != rwmtx->mtxb.q) exp |= ETHR_RWMTX_W_WAIT_FLG__; + else if (exp == ETHR_RWMTX_R_WAIT_FLG__) { + if (!have_w) { + if (rwmtx->type != ETHR_RWMUTEX_TYPE_NORMAL) + imask |= ETHR_RWMTX_R_FLG__; + else + imask |= ETHR_RWMTX_RS_MASK__; + } + } act = ethr_atomic_read(&rwmtx->mtxb.flgs); ETHR_ASSERT((exp & ~imask) == (act & ~imask)); @@ -1894,41 +2008,83 @@ dbg_unlock_wake(ethr_rwmutex *rwmtx, #endif static void -rwmutex_unlock_wake(ethr_rwmutex *rwmtx, int have_w, long initial) +rwmutex_transfer_read_lock(ethr_rwmutex *rwmtx, long initial, int q_locked) +{ + long act = initial; + + if (!q_locked) { + ethr_ts_event *tse; + ETHR_ASSERT(initial & ETHR_RWMTX_R_WAIT_FLG__); + ETHR_ASSERT((initial & ETHR_RWMTX_W_FLG__) == 0); + ETHR_MTX_Q_LOCK(&rwmtx->mtxb.qlck); + + act = ethr_atomic_read(&rwmtx->mtxb.flgs); + tse = rwmtx->mtxb.q; + if ((act & ETHR_RWMTX_W_FLG__) || !tse || is_w_waiter(tse)) { + /* Someone else woke the readers up... */ + ETHR_MTX_Q_UNLOCK(&rwmtx->mtxb.qlck); + return; + } + } + + rwmutex_unlock_wake(rwmtx, 0, initial, 1); +} + +static void +rwmutex_unlock_wake(ethr_rwmutex *rwmtx, int have_w, long initial, + int transfer_read_lock) { long new, act = initial; ethr_ts_event *tse; - if ((act & ETHR_RWMTX_WAIT_FLGS__) == 0) { - if (!have_w) - return; - else { - while ((act & ETHR_RWMTX_WAIT_FLGS__) == 0) { - long exp = act; - new = exp & ~ETHR_RWMTX_W_FLG__; - act = ethr_atomic_cmpxchg(&rwmtx->mtxb.flgs, new, exp); - if (act == exp) - return; + if (transfer_read_lock) { + /* + * - Q already locked + * - Got R waiters first in Q + * - Not W locked + */ + tse = rwmtx->mtxb.q; + + ETHR_ASSERT(act & ETHR_RWMTX_R_WAIT_FLG__); + ETHR_ASSERT((act & (ETHR_RWMTX_W_FLG__)) == 0); + ETHR_ASSERT(tse && !is_w_waiter(tse)); + } + else { + + if ((act & ETHR_RWMTX_WAIT_FLGS__) == 0) { + if (!have_w) + return; + else { + while ((act & ETHR_RWMTX_WAIT_FLGS__) == 0) { + long exp = act; + new = exp & ~ETHR_RWMTX_W_FLG__; + act = ethr_atomic_cmpxchg(&rwmtx->mtxb.flgs, new, exp); + if (act == exp) + return; + } } } - } - ETHR_MTX_Q_LOCK(&rwmtx->mtxb.qlck); - tse = rwmtx->mtxb.q; + ETHR_MTX_Q_LOCK(&rwmtx->mtxb.qlck); + tse = rwmtx->mtxb.q; - if (!have_w) { - if (!tse) { + if (!have_w) { + if (!tse) { #ifdef ETHR_DEBUG - act = ethr_atomic_read(&rwmtx->mtxb.flgs); - ETHR_ASSERT((act & ETHR_RWMTX_WAIT_FLGS__) == 0); + act = ethr_atomic_read(&rwmtx->mtxb.flgs); + ETHR_ASSERT((act & ETHR_RWMTX_WAIT_FLGS__) == 0); #endif - goto already_served; - } - act = ethr_atomic_read(&rwmtx->mtxb.flgs); - if (act & ~ETHR_RWMTX_WAIT_FLGS__) { - already_served: - ETHR_MTX_Q_UNLOCK(&rwmtx->mtxb.qlck); - return; + goto already_served; + } + act = ethr_atomic_read(&rwmtx->mtxb.flgs); + if (act == (ETHR_RWMTX_R_WAIT_FLG__|ETHR_RWMTX_R_FLG__)) { + ETHR_ASSERT(tse && !is_w_waiter(tse)); + } + else if (act & ~ETHR_RWMTX_WAIT_FLGS__) { + already_served: + ETHR_MTX_Q_UNLOCK(&rwmtx->mtxb.qlck); + return; + } } } @@ -1988,6 +2144,7 @@ rwmutex_unlock_wake(ethr_rwmutex *rwmtx, int have_w, long initial) rwmutex_freqread_rdrs_add(rwmtx, type, ix, wrs); } } + new = ETHR_RWMTX_R_FLG__; } @@ -1995,6 +2152,7 @@ rwmutex_unlock_wake(ethr_rwmutex *rwmtx, int have_w, long initial) new |= ETHR_RWMTX_W_WAIT_FLG__; rwlock_wake_set_flags(rwmtx, new, act); + wake_readers(rwmtx, rs); } } @@ -2225,39 +2383,19 @@ ethr_rwmutex_tryrlock(ethr_rwmutex *rwmtx) case ETHR_RWMUTEX_TYPE_FREQUENT_READ: case ETHR_RWMUTEX_TYPE_EXTREMELY_FREQUENT_READ: { ethr_ts_event *tse = ethr_get_ts_event(); - - rwmutex_freqread_rdrs_inc(rwmtx, tse); - - ETHR_MEMORY_BARRIER; - - act = ethr_atomic_read_acqb(&rwmtx->mtxb.flgs); - - if (act != ETHR_RWMTX_R_FLG__) { - while (1) { - long exp, new; - - if (act & ~(ETHR_RWMTX_R_FLG__|ETHR_RWMTX_R_WAIT_FLG__)) { - rwmutex_freqread_restore_failed_tryrlock(rwmtx, tse); - res = EBUSY; - break; - } - - if (act & ETHR_RWMTX_R_FLG__) - break; - - exp = act; - new = act | ETHR_RWMTX_R_FLG__; - act = ethr_atomic_cmpxchg_acqb(&rwmtx->mtxb.flgs, new, exp); - if (act == exp) - break; - } - } - + res = rwmutex_freqread_rlock(rwmtx, tse, 1); ethr_leave_ts_event(tse); break; } } +#ifdef ETHR_MTX_CHK_EXCL + if (res == 0) { + ETHR_MTX_CHK_EXCL_SET_NON_EXCL(&rwmtx->mtxb); + ETHR_MTX_CHK_EXCL_IS_NOT_EXCL(&rwmtx->mtxb); + } +#endif + ETHR_MTX_HARD_DEBUG_LFS_TRYRLOCK(&rwmtx->mtxb, res); ETHR_MTX_HARD_DEBUG_FENCE_CHK(rwmtx); @@ -2286,9 +2424,8 @@ ethr_rwmutex_rlock(ethr_rwmutex *rwmtx) while (1) { act = ethr_atomic_cmpxchg_acqb(&rwmtx->mtxb.flgs, exp+1, exp); - if (act == exp) { + if (act == exp) break; - } if (act & (ETHR_RWMTX_W_FLG__|ETHR_RWMTX_W_WAIT_FLG__)) { rwmutex_normal_rlock_wait(rwmtx, act); @@ -2303,38 +2440,14 @@ ethr_rwmutex_rlock(ethr_rwmutex *rwmtx) case ETHR_RWMUTEX_TYPE_FREQUENT_READ: case ETHR_RWMUTEX_TYPE_EXTREMELY_FREQUENT_READ: { ethr_ts_event *tse = ethr_get_ts_event(); - - rwmutex_freqread_rdrs_inc(rwmtx, tse); - - ETHR_MEMORY_BARRIER; - - act = ethr_atomic_read_acqb(&rwmtx->mtxb.flgs); - - if (act != ETHR_RWMTX_R_FLG__) { - while (1) { - long exp, new; - - if (act & ~(ETHR_RWMTX_R_FLG__|ETHR_RWMTX_R_WAIT_FLG__)) { - rwmutex_freqread_rlock_wait(rwmtx, tse, act); - break; - } - - if (act & ETHR_RWMTX_R_FLG__) - break; - - exp = act; - new = act | ETHR_RWMTX_R_FLG__; - act = ethr_atomic_cmpxchg_acqb(&rwmtx->mtxb.flgs, new, exp); - if (act == exp) - break; - } - } - + rwmutex_freqread_rlock(rwmtx, tse, 0); ethr_leave_ts_event(tse); break; } } + ETHR_MTX_CHK_EXCL_SET_NON_EXCL(&rwmtx->mtxb); + ETHR_MTX_CHK_EXCL_IS_NOT_EXCL(&rwmtx->mtxb); ETHR_MTX_HARD_DEBUG_LFS_RLOCK(&rwmtx->mtxb); ETHR_MTX_HARD_DEBUG_FENCE_CHK(rwmtx); } @@ -2344,6 +2457,8 @@ ethr_rwmutex_runlock(ethr_rwmutex *rwmtx) { long act; + ETHR_MTX_CHK_EXCL_IS_NOT_EXCL(&rwmtx->mtxb); + ETHR_MTX_CHK_EXCL_UNSET_NON_EXCL(&rwmtx->mtxb); ETHR_ASSERT(!ethr_not_inited__); ETHR_ASSERT(rwmtx); ETHR_ASSERT(rwmtx->initialized == ETHR_RWMUTEX_INITIALIZED); @@ -2357,7 +2472,7 @@ ethr_rwmutex_runlock(ethr_rwmutex *rwmtx) if ((act & ETHR_RWMTX_WAIT_FLGS__) && (act & ~ETHR_RWMTX_WAIT_FLGS__) == 0) { ETHR_ASSERT((act & ETHR_RWMTX_W_FLG__) == 0); - rwmutex_unlock_wake(rwmtx, 0, act); + rwmutex_unlock_wake(rwmtx, 0, act, 0); } break; @@ -2369,21 +2484,12 @@ ethr_rwmutex_runlock(ethr_rwmutex *rwmtx) ETHR_ASSERT(act >= 0); - ETHR_WRITE_MEMORY_BARRIER; + ETHR_MEMORY_BARRIER; if (act == 0) { - -#ifndef ETHR_WRITE_MEMORY_BARRIER_IS_FULL - ETHR_READ_MEMORY_BARRIER; -#endif act = ethr_atomic_read(&rwmtx->mtxb.flgs); - - if ((act & ETHR_RWMTX_W_FLG__) == 0 - && (act & (ETHR_RWMTX_WAIT_FLGS__ - | ETHR_RWMTX_R_PEND_UNLCK_MASK__))) { - rwmutex_try_complete_runlock(rwmtx, act, tse, 1, 0, 0); - } - + if (act != ETHR_RWMTX_R_FLG__) + rwmutex_freqread_rdrs_dec_chk_wakeup(rwmtx, tse, act); } ethr_leave_ts_event(tse); @@ -2422,25 +2528,31 @@ ethr_rwmutex_tryrwlock(ethr_rwmutex *rwmtx) do { - if (act & (ETHR_RWMTX_W_FLG__|ETHR_RWMTX_WAIT_FLGS__)) { - res = EBUSY; - break; - } - - if (act & ETHR_RWMTX_R_MASK__) { + if (act == 0) + act = ethr_atomic_cmpxchg_acqb(&rwmtx->mtxb.flgs, + ETHR_RWMTX_W_FLG__, 0); + else if (act == ETHR_RWMTX_R_FLG__) { res = rwmutex_try_complete_runlock(rwmtx, act, NULL, 0, 1, 1); break; } - - act = ethr_atomic_cmpxchg_acqb(&rwmtx->mtxb.flgs, - ETHR_RWMTX_W_FLG__, 0); + else { + res = EBUSY; + break; + } } while (act != 0); break; } +#ifdef ETHR_MTX_CHK_EXCL + if (res == 0) { + ETHR_MTX_CHK_EXCL_SET_EXCL(&rwmtx->mtxb); + ETHR_MTX_CHK_EXCL_IS_NOT_NON_EXCL(&rwmtx->mtxb); + } +#endif + ETHR_MTX_HARD_DEBUG_LFS_TRYRWLOCK(&rwmtx->mtxb, res); ETHR_MTX_HARD_DEBUG_FENCE_CHK(rwmtx); @@ -2485,6 +2597,8 @@ ethr_rwmutex_rwlock(ethr_rwmutex *rwmtx) break; } + ETHR_MTX_CHK_EXCL_SET_EXCL(&rwmtx->mtxb); + ETHR_MTX_CHK_EXCL_IS_NOT_NON_EXCL(&rwmtx->mtxb); ETHR_MTX_HARD_DEBUG_LFS_RWLOCK(&rwmtx->mtxb); ETHR_MTX_HARD_DEBUG_FENCE_CHK(rwmtx); @@ -2501,12 +2615,15 @@ ethr_rwmutex_rwunlock(ethr_rwmutex *rwmtx) ETHR_MTX_HARD_DEBUG_FENCE_CHK(rwmtx); ETHR_MTX_HARD_DEBUG_LFS_RWUNLOCK(&rwmtx->mtxb); + ETHR_MTX_CHK_EXCL_IS_NOT_NON_EXCL(&rwmtx->mtxb); + ETHR_MTX_CHK_EXCL_UNSET_EXCL(&rwmtx->mtxb); + switch (rwmtx->type) { case ETHR_RWMUTEX_TYPE_NORMAL: act = ethr_atomic_cmpxchg_relb(&rwmtx->mtxb.flgs, 0, ETHR_RWMTX_W_FLG__); if (act != ETHR_RWMTX_W_FLG__) - rwmutex_unlock_wake(rwmtx, 1, act); + rwmutex_unlock_wake(rwmtx, 1, act, 0); break; case ETHR_RWMUTEX_TYPE_FREQUENT_READ: @@ -2514,7 +2631,7 @@ ethr_rwmutex_rwunlock(ethr_rwmutex *rwmtx) act = ethr_atomic_cmpxchg_relb(&rwmtx->mtxb.flgs, 0, ETHR_RWMTX_W_FLG__); if (act != ETHR_RWMTX_W_FLG__) - rwmutex_unlock_wake(rwmtx, 1, act); + rwmutex_unlock_wake(rwmtx, 1, act, 0); break; } -- cgit v1.2.3