aboutsummaryrefslogtreecommitdiffstats
path: root/erts
diff options
context:
space:
mode:
authorSverker Eriksson <[email protected]>2018-03-12 11:50:42 +0100
committerSverker Eriksson <[email protected]>2018-05-08 14:38:41 +0200
commit3ab4ac0371b8646246c8b029dd89f39c3a6981b4 (patch)
tree2a1f87590829ec2a82d16f6afe2d604f0e8fa300 /erts
parentc26bfdd48c8deabe9cc0f67badb0d8a95a641845 (diff)
downloadotp-3ab4ac0371b8646246c8b029dd89f39c3a6981b4.tar.gz
otp-3ab4ac0371b8646246c8b029dd89f39c3a6981b4.tar.bz2
otp-3ab4ac0371b8646246c8b029dd89f39c3a6981b4.zip
erts: Make atomic ets:delete_all_objects yield
by using a cooperative strategy that will make any process accessing the table execute delelete_all_objects_continue until the table is empty. This is not an optimal solution as concurrent threads will still block on the table lock, but at least thread progress is made.
Diffstat (limited to 'erts')
-rw-r--r--erts/emulator/beam/bif.tab4
-rw-r--r--erts/emulator/beam/erl_bif_info.c9
-rw-r--r--erts/emulator/beam/erl_db.c554
-rw-r--r--erts/emulator/beam/erl_db.h4
-rw-r--r--erts/emulator/beam/erl_db_hash.c19
-rw-r--r--erts/emulator/beam/erl_db_hash.h3
-rw-r--r--erts/emulator/beam/erl_db_tree.c10
-rw-r--r--erts/emulator/beam/erl_db_util.h12
8 files changed, 368 insertions, 247 deletions
diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab
index 687fd39d58..1276048317 100644
--- a/erts/emulator/beam/bif.tab
+++ b/erts/emulator/beam/bif.tab
@@ -341,7 +341,6 @@ bif ets:internal_request_all/0
bif ets:new/2
bif ets:delete/1
bif ets:delete/2
-bif ets:delete_all_objects/1
bif ets:delete_object/2
bif ets:first/1
bif ets:is_compiled_ms/1
@@ -372,7 +371,6 @@ bif ets:select_count/2
bif ets:select_reverse/1
bif ets:select_reverse/2
bif ets:select_reverse/3
-bif ets:select_delete/2
bif ets:select_replace/2
bif ets:match_spec_compile/1
bif ets:match_spec_run_r/3
@@ -693,3 +691,5 @@ bif erts_internal:new_connection/1
bif erts_internal:abort_connection/2
bif erts_internal:map_next/3
bif ets:whereis/1
+bif ets:internal_delete_all/2
+bif ets:internal_select_delete/2
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index 89e3d3f43e..3c13991fb6 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -4731,7 +4731,14 @@ BIF_RETTYPE erts_debug_set_internal_state_2(BIF_ALIST_2)
refbin));
}
}
-
+ else if (ERTS_IS_ATOM_STR("ets_force_trap", BIF_ARG_1)) {
+#ifdef ETS_DBG_FORCE_TRAP
+ erts_ets_dbg_force_trap = (BIF_ARG_2 == am_true) ? 1 : 0;
+ BIF_RET(am_ok);
+#else
+ BIF_RET(am_notsup);
+#endif
+ }
}
BIF_ERROR(BIF_P, BADARG);
diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c
index 2d46e81b3b..322f5f1505 100644
--- a/erts/emulator/beam/erl_db.c
+++ b/erts/emulator/beam/erl_db.c
@@ -50,6 +50,34 @@ erts_atomic_t erts_ets_misc_mem_size;
** Utility macros
*/
+#define DB_BIF_GET_TABLE(TB, WHAT, KIND, BIF_IX) \
+ DB_GET_TABLE(TB, BIF_ARG_1, WHAT, KIND, BIF_IX, NULL, BIF_P)
+
+#define DB_TRAP_GET_TABLE(TB, TID, WHAT, KIND, BIF_EXP) \
+ DB_GET_TABLE(TB, TID, WHAT, KIND, 0, BIF_EXP, BIF_P)
+
+#define DB_GET_TABLE(TB, TID, WHAT, KIND, BIF_IX, BIF_EXP, PROC) \
+do { \
+ Uint freason__; \
+ if (!(TB = db_get_table(PROC, TID, WHAT, KIND, &freason__))) { \
+ return db_bif_fail(PROC, freason__, BIF_IX, BIF_EXP); \
+ } \
+}while(0)
+
+static BIF_RETTYPE db_bif_fail(Process* p, Uint freason,
+ Uint bif_ix, Export* bif_exp)
+{
+ if (freason == TRAP) {
+ if (!bif_exp)
+ bif_exp = bif_export[bif_ix];
+ p->arity = bif_exp->info.mfa.arity;
+ p->i = (BeamInstr*) bif_exp->addressv[erts_active_code_ix()];
+ }
+ p->freason = freason;
+ return THE_NON_VALUE;
+}
+
+
/* Get a key from any table structure and a tagged object */
#define TERM_GETKEY(tb, obj) db_getkey((tb)->common.keypos, (obj))
@@ -326,8 +354,7 @@ struct meta_name_tab_entry* meta_name_tab_bucket(Eterm name,
typedef enum {
LCK_READ=1, /* read only access */
LCK_WRITE=2, /* exclusive table write access */
- LCK_WRITE_REC=3, /* record write access */
- LCK_NONE=4
+ LCK_WRITE_REC=3 /* record write access */
} db_lock_kind_t;
extern DbTableMethod db_hash;
@@ -337,9 +364,6 @@ int user_requested_db_max_tabs;
int erts_ets_realloc_always_moves;
int erts_ets_always_compress;
static int db_max_tabs;
-static Eterm ms_delete_all;
-static Eterm ms_delete_all_buff[8]; /* To compare with for deletion
- of all objects */
/*
** Forward decls, static functions
@@ -351,6 +375,7 @@ static void set_heir(Process* me, DbTable* tb, Eterm heir, UWord heir_data);
static void free_heir_data(DbTable*);
static SWord free_fixations_locked(Process* p, DbTable *tb);
+static void delete_all_objects_continue(Process* p, DbTable* tb);
static SWord free_table_continue(Process *p, DbTable *tb, SWord reds);
static void print_table(fmtfn_t to, void *to_arg, int show, DbTable* tb);
static BIF_RETTYPE ets_select_delete_1(BIF_ALIST_1);
@@ -360,9 +385,9 @@ static BIF_RETTYPE ets_select_trap_1(BIF_ALIST_1);
static BIF_RETTYPE ets_delete_trap(BIF_ALIST_1);
static Eterm table_info(Process* p, DbTable* tb, Eterm What);
-static BIF_RETTYPE ets_select1(Process* p, Eterm arg1);
-static BIF_RETTYPE ets_select2(Process* p, Eterm arg1, Eterm arg2);
-static BIF_RETTYPE ets_select3(Process* p, Eterm arg1, Eterm arg2, Eterm arg3);
+static BIF_RETTYPE ets_select1(Process* p, int bif_ix, Eterm arg1);
+static BIF_RETTYPE ets_select2(Process* p, DbTable*, Eterm tid, Eterm ms);
+static BIF_RETTYPE ets_select3(Process* p, DbTable*, Eterm tid, Eterm ms, Sint chunk_size);
/*
@@ -636,15 +661,42 @@ static ERTS_INLINE void db_unlock(DbTable* tb, db_lock_kind_t kind)
}
}
+static ERTS_INLINE int db_is_exclusive(DbTable* tb, db_lock_kind_t kind)
+{
+ return kind != LCK_READ && tb->common.is_thread_safe;
+}
+
+static DbTable* handle_lacking_permission(Process* p, DbTable* tb,
+ db_lock_kind_t kind,
+ Uint* freason_p)
+{
+ if (tb->common.status & DB_BUSY) {
+ if (!db_is_exclusive(tb, kind)) {
+ db_unlock(tb, kind);
+ db_lock(tb, LCK_WRITE);
+ }
+ delete_all_objects_continue(p, tb);
+ db_unlock(tb, LCK_WRITE);
+ tb = NULL;
+ *freason_p = TRAP;
+ }
+ else if (p->common.id != tb->common.owner) {
+ db_unlock(tb, kind);
+ tb = NULL;
+ *freason_p = BADARG;
+ }
+ return tb;
+}
+
static ERTS_INLINE
DbTable* db_get_table_aux(Process *p,
Eterm id,
int what,
db_lock_kind_t kind,
- int meta_already_locked)
+ int meta_already_locked,
+ Uint* freason_p)
{
DbTable *tb;
- erts_rwmtx_t *mtl = NULL;
/*
* IMPORTANT: Only scheduler threads are allowed
@@ -654,13 +706,13 @@ DbTable* db_get_table_aux(Process *p,
ASSERT(erts_get_scheduler_data());
if (is_atom(id)) {
+ erts_rwmtx_t *mtl;
struct meta_name_tab_entry* bucket = meta_name_tab_bucket(id,&mtl);
if (!meta_already_locked)
erts_rwmtx_rlock(mtl);
else{
ERTS_LC_ASSERT(erts_lc_rwmtx_is_rlocked(mtl)
|| erts_lc_rwmtx_is_rwlocked(mtl));
- mtl = NULL;
}
tb = NULL;
if (bucket->pu.tb != NULL) {
@@ -679,20 +731,29 @@ DbTable* db_get_table_aux(Process *p,
}
}
}
+ if (!meta_already_locked)
+ erts_rwmtx_runlock(mtl);
}
else
tb = tid2tab(id);
if (tb) {
db_lock(tb, kind);
- if ((tb->common.status & what) == 0
- && p->common.id != tb->common.owner) {
- db_unlock(tb, kind);
- tb = NULL;
- }
+#ifdef ETS_DBG_FORCE_TRAP
+ if (erts_atomic_read_nob(&tb->common.dbg_force_trap) &&
+ erts_atomic_add_read_nob(&tb->common.dbg_force_trap, 2) & 2) {
+ db_unlock(tb, kind);
+ tb = NULL;
+ *freason_p = TRAP;
+ }
+ else
+#endif
+ if (ERTS_UNLIKELY(!(tb->common.status & what)))
+ tb = handle_lacking_permission(p, tb, kind, freason_p);
}
- if (mtl)
- erts_rwmtx_runlock(mtl);
+ else
+ *freason_p = BADARG;
+
return tb;
}
@@ -700,9 +761,10 @@ static ERTS_INLINE
DbTable* db_get_table(Process *p,
Eterm id,
int what,
- db_lock_kind_t kind)
+ db_lock_kind_t kind,
+ Uint* freason_p)
{
- return db_get_table_aux(p, id, what, kind, 0);
+ return db_get_table_aux(p, id, what, kind, 0, freason_p);
}
static int insert_named_tab(Eterm name_atom, DbTable* tb, int have_lock)
@@ -868,9 +930,7 @@ BIF_RETTYPE ets_safe_fixtable_2(BIF_ALIST_2)
#endif
kind = (BIF_ARG_2 == am_true) ? LCK_READ : LCK_WRITE_REC;
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, kind)) == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_READ, kind, BIF_ets_safe_fixtable_2);
if (BIF_ARG_2 == am_true) {
fix_table_locked(BIF_P, tb);
@@ -900,11 +960,7 @@ BIF_RETTYPE ets_first_1(BIF_ALIST_1)
CHECK_TABLES();
- tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ);
-
- if (!tb) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_first_1);
cret = tb->common.meth->db_first(BIF_P, tb, &ret);
@@ -927,11 +983,7 @@ BIF_RETTYPE ets_next_2(BIF_ALIST_2)
CHECK_TABLES();
- tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ);
-
- if (!tb) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_next_2);
cret = tb->common.meth->db_next(BIF_P, tb, BIF_ARG_2, &ret);
@@ -954,11 +1006,7 @@ BIF_RETTYPE ets_last_1(BIF_ALIST_1)
CHECK_TABLES();
- tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ);
-
- if (!tb) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_last_1);
cret = tb->common.meth->db_last(BIF_P, tb, &ret);
@@ -981,11 +1029,7 @@ BIF_RETTYPE ets_prev_2(BIF_ALIST_2)
CHECK_TABLES();
- tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ);
-
- if (!tb) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_prev_2);
cret = tb->common.meth->db_prev(BIF_P,tb,BIF_ARG_2,&ret);
@@ -1003,21 +1047,15 @@ BIF_RETTYPE ets_prev_2(BIF_ALIST_2)
BIF_RETTYPE ets_take_2(BIF_ALIST_2)
{
DbTable* tb;
-#ifdef DEBUG
int cret;
-#endif
Eterm ret;
CHECK_TABLES();
- tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE_REC);
- if (!tb) {
- BIF_ERROR(BIF_P, BADARG);
- }
-#ifdef DEBUG
- cret =
-#endif
- tb->common.meth->db_take(BIF_P, tb, BIF_ARG_2, &ret);
- ASSERT(cret == DB_ERROR_NONE);
+ DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_take_2);
+
+ cret = tb->common.meth->db_take(BIF_P, tb, BIF_ARG_2, &ret);
+
+ ASSERT(cret == DB_ERROR_NONE); (void)cret;
db_unlock(tb, LCK_WRITE_REC);
BIF_RET(ret);
}
@@ -1035,9 +1073,8 @@ BIF_RETTYPE ets_update_element_3(BIF_ALIST_3)
DeclareTmpHeap(cell,2,BIF_P);
DbUpdateHandle handle;
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE_REC)) == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_update_element_3);
+
UseTmpHeap(2,BIF_P);
if (!(tb->common.status & (DB_SET | DB_ORDERED_SET))) {
goto bail_out;
@@ -1108,9 +1145,9 @@ bail_out:
}
static BIF_RETTYPE
-do_update_counter(Process *p, Eterm arg1, Eterm arg2, Eterm arg3, Eterm arg4)
+do_update_counter(Process *p, DbTable* tb,
+ Eterm arg2, Eterm arg3, Eterm arg4)
{
- DbTable* tb;
int cret = DB_ERROR_BADITEM;
Eterm upop_list;
int list_size;
@@ -1126,10 +1163,6 @@ do_update_counter(Process *p, Eterm arg1, Eterm arg2, Eterm arg3, Eterm arg4)
Eterm* hstart;
Eterm* hend;
- if ((tb = db_get_table(p, arg1, DB_WRITE, LCK_WRITE_REC)) == NULL) {
- BIF_ERROR(p, BADARG);
- }
-
UseTmpHeap(5, p);
if (!(tb->common.status & (DB_SET | DB_ORDERED_SET))) {
@@ -1303,7 +1336,11 @@ bail_out:
*/
BIF_RETTYPE ets_update_counter_3(BIF_ALIST_3)
{
- return do_update_counter(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, THE_NON_VALUE);
+ DbTable* tb;
+
+ DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_update_counter_3);
+
+ return do_update_counter(BIF_P, tb, BIF_ARG_2, BIF_ARG_3, THE_NON_VALUE);
}
/*
@@ -1315,10 +1352,14 @@ BIF_RETTYPE ets_update_counter_3(BIF_ALIST_3)
*/
BIF_RETTYPE ets_update_counter_4(BIF_ALIST_4)
{
+ DbTable* tb;
+
if (is_not_tuple(BIF_ARG_4)) {
BIF_ERROR(BIF_P, BADARG);
}
- return do_update_counter(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, BIF_ARG_4);
+ DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_update_counter_4);
+
+ return do_update_counter(BIF_P, tb, BIF_ARG_2, BIF_ARG_3, BIF_ARG_4);
}
@@ -1339,9 +1380,8 @@ BIF_RETTYPE ets_insert_2(BIF_ALIST_2)
kind = ((is_list(BIF_ARG_2) && CDR(list_val(BIF_ARG_2)) != NIL)
? LCK_WRITE : LCK_WRITE_REC);
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, kind)) == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_WRITE, kind, BIF_ets_insert_2);
+
if (BIF_ARG_2 == NIL) {
db_unlock(tb, kind);
BIF_RET(am_true);
@@ -1407,11 +1447,9 @@ BIF_RETTYPE ets_insert_new_2(BIF_ALIST_2)
/* More than one object, use LCK_WRITE to keep atomicity */
kind = LCK_WRITE;
- tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, kind);
- if (tb == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
- meth = tb->common.meth;
+ DB_BIF_GET_TABLE(tb, DB_WRITE, kind, BIF_ets_insert_new_2);
+
+ meth = tb->common.meth;
for (lst = BIF_ARG_2; is_list(lst); lst = CDR(list_val(lst))) {
if (is_not_tuple(CAR(list_val(lst)))
|| (arityval(*tuple_val(CAR(list_val(lst))))
@@ -1446,9 +1484,8 @@ BIF_RETTYPE ets_insert_new_2(BIF_ALIST_2)
/* Only one object (or NIL)
*/
kind = LCK_WRITE_REC;
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, kind)) == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_WRITE, kind, BIF_ets_insert_new_2);
+
if (BIF_ARG_2 == NIL) {
db_unlock(tb, kind);
BIF_RET(am_true);
@@ -1487,6 +1524,7 @@ BIF_RETTYPE ets_rename_2(BIF_ALIST_2)
Eterm ret;
Eterm old_name;
erts_rwmtx_t *lck1, *lck2;
+ Uint freason;
#ifdef HARDDEBUG
erts_fprintf(stderr,
@@ -1531,9 +1569,9 @@ BIF_RETTYPE ets_rename_2(BIF_ALIST_2)
if (lck2)
erts_rwmtx_rwlock(lck2);
- tb = db_get_table_aux(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE, 1);
+ tb = db_get_table_aux(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE, 1, &freason);
if (!tb)
- goto badarg;
+ goto fail;
if (is_table_named(tb)) {
if (!insert_named_tab(BIF_ARG_2, tb, 1))
@@ -1553,13 +1591,18 @@ BIF_RETTYPE ets_rename_2(BIF_ALIST_2)
if (lck2)
erts_rwmtx_rwunlock(lck2);
BIF_RET(ret);
- badarg:
+
+badarg:
+ freason = BADARG;
+
+fail:
if (tb)
db_unlock(tb, LCK_WRITE);
erts_rwmtx_rwunlock(lck1);
if (lck2)
erts_rwmtx_rwunlock(lck2);
- BIF_ERROR(BIF_P, BADARG);
+
+ return db_bif_fail(BIF_P, freason, BIF_ets_rename_2, NULL);
}
@@ -1706,7 +1749,7 @@ BIF_RETTYPE ets_new_2(BIF_ALIST_2)
tb->common.meth = meth;
tb->common.the_name = BIF_ARG_1;
tb->common.status = status;
- tb->common.type = status & ERTS_ETS_TABLE_TYPES;
+ tb->common.type = status;
/* Note, 'type' is *read only* from now on... */
erts_refc_init(&tb->common.fix_count, 0);
db_init_lock(tb, status & (DB_FINE_LOCKED|DB_FREQ_READ));
@@ -1718,6 +1761,9 @@ BIF_RETTYPE ets_new_2(BIF_ALIST_2)
tb->common.fixing_procs = NULL;
tb->common.compress = is_compressed;
+#ifdef ETS_DBG_FORCE_TRAP
+ erts_atomic_init_nob(&tb->common.dbg_force_trap, erts_ets_dbg_force_trap);
+#endif
cret = meth->db_create(BIF_P, tb);
ASSERT(cret == DB_ERROR_NONE); (void)cret;
@@ -1762,13 +1808,19 @@ BIF_RETTYPE ets_whereis_1(BIF_ALIST_1)
{
DbTable* tb;
Eterm res;
+ Uint freason;
if (is_not_atom(BIF_ARG_1)) {
BIF_ERROR(BIF_P, BADARG);
}
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ)) == NULL) {
- BIF_RET(am_undefined);
+ if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ, &freason)) == NULL) {
+ if (freason == BADARG)
+ BIF_RET(am_undefined);
+ else {
+ //ToDo: Could we avoid this
+ return db_bif_fail(BIF_P, freason, BIF_ets_whereis_1, NULL);
+ }
}
res = make_tid(BIF_P, tb);
@@ -1788,9 +1840,7 @@ BIF_RETTYPE ets_lookup_2(BIF_ALIST_2)
CHECK_TABLES();
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_lookup_2);
cret = tb->common.meth->db_get(BIF_P, tb, BIF_ARG_2, &ret);
@@ -1818,9 +1868,7 @@ BIF_RETTYPE ets_member_2(BIF_ALIST_2)
CHECK_TABLES();
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_member_2);
cret = tb->common.meth->db_member(tb, BIF_ARG_2, &ret);
@@ -1851,9 +1899,7 @@ BIF_RETTYPE ets_lookup_element_3(BIF_ALIST_3)
CHECK_TABLES();
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_lookup_element_3);
if (is_not_small(BIF_ARG_3) || ((index = signed_val(BIF_ARG_3)) < 1)) {
db_unlock(tb, LCK_READ);
@@ -1891,9 +1937,7 @@ BIF_RETTYPE ets_delete_1(BIF_ALIST_1)
CHECK_TABLES();
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE, BIF_ets_delete_1);
/*
* Clear all access bits to prevent any ets operation to access the
@@ -1966,6 +2010,7 @@ BIF_RETTYPE ets_give_away_3(BIF_ALIST_3)
Eterm to_pid = BIF_ARG_2;
Eterm from_pid;
DbTable* tb = NULL;
+ Uint freason;
if (!is_internal_pid(to_pid)) {
goto badarg;
@@ -1975,10 +2020,11 @@ BIF_RETTYPE ets_give_away_3(BIF_ALIST_3)
goto badarg;
}
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL
- || tb->common.owner != BIF_P->common.id) {
- goto badarg;
- }
+ if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE, &freason)) == NULL)
+ goto fail;
+ if (tb->common.owner != BIF_P->common.id)
+ goto badarg;
+
from_pid = tb->common.owner;
if (to_pid == from_pid) {
goto badarg; /* or should we be idempotent? return false maybe */
@@ -1997,9 +2043,12 @@ BIF_RETTYPE ets_give_away_3(BIF_ALIST_3)
BIF_RET(am_true);
badarg:
+ freason = BADARG;
+fail:
if (to_proc != NULL && to_proc != BIF_P) erts_proc_unlock(to_proc, to_locks);
if (tb != NULL) db_unlock(tb, LCK_WRITE);
- BIF_ERROR(BIF_P, BADARG);
+
+ return db_bif_fail(BIF_P, freason, BIF_ets_give_away_3, NULL);
}
BIF_RETTYPE ets_setopts_2(BIF_ALIST_2)
@@ -2050,11 +2099,13 @@ BIF_RETTYPE ets_setopts_2(BIF_ALIST_2)
}
}
- if (tail != NIL
- || (tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL
- || tb->common.owner != BIF_P->common.id) {
+ if (tail != NIL)
+ goto badarg;
+
+ DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE, BIF_ets_setopts_2);
+
+ if (tb->common.owner != BIF_P->common.id)
goto badarg;
- }
if (heir_data != THE_NON_VALUE) {
free_heir_data(tb);
@@ -2078,23 +2129,84 @@ badarg:
}
/*
-** BIF to erase a whole table and release all memory it holds
-*/
-BIF_RETTYPE ets_delete_all_objects_1(BIF_ALIST_1)
+ * Common for delete_all_objects and select_delete(DeleteAll).
+ */
+BIF_RETTYPE ets_internal_delete_all_2(BIF_ALIST_2)
{
+ SWord initial_reds = ERTS_BIF_REDS_LEFT(BIF_P);
+ SWord reds = initial_reds;
+ Eterm nitems;
DbTable* tb;
CHECK_TABLES();
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE, BIF_ets_internal_delete_all_2);
+
+ if (BIF_ARG_2 == am_undefined) {
+ nitems = erts_make_integer(erts_atomic_read_nob(&tb->common.nitems),
+ BIF_P);
+
+ reds = tb->common.meth->db_delete_all_objects(BIF_P, tb, reds);
- tb->common.meth->db_delete_all_objects(BIF_P, tb);
+ ASSERT(!(tb->common.status & DB_BUSY));
+
+ if (reds < 0) {
+ /*
+ * Oboy, need to trap AND need to be atomic.
+ * Solved by cooperative trapping where every process trying to
+ * access this table (including this process) will "fail" to lookup
+ * the table and instead pitch in deleting objects
+ * (in delete_all_objects_continue) and then trap to self.
+ */
+ ASSERT((tb->common.status & (DB_PRIVATE|DB_PROTECTED|DB_PUBLIC))
+ ==
+ (tb->common.type & (DB_PRIVATE|DB_PROTECTED|DB_PUBLIC)));
+ tb->common.status &= ~(DB_PRIVATE|DB_PROTECTED|DB_PUBLIC);
+ tb->common.status |= DB_BUSY;
+ db_unlock(tb, LCK_WRITE);
+ BUMP_ALL_REDS(BIF_P);
+ BIF_TRAP2(bif_export[BIF_ets_internal_delete_all_2], BIF_P,
+ BIF_ARG_1, nitems);
+ }
+ else {
+ /* Done, no trapping needed */
+ BUMP_REDS(BIF_P, (initial_reds - reds));
+ }
+
+ }
+ else {
+ /*
+ * The table lookup succeeded and second argument is nitems
+ * and not 'undefined', which means we have trapped at least once
+ * and are now done.
+ */
+ nitems = BIF_ARG_2;
+ }
db_unlock(tb, LCK_WRITE);
+ BIF_RET(nitems);
+}
- BIF_RET(am_true);
+static void delete_all_objects_continue(Process* p, DbTable* tb)
+{
+ SWord initial_reds = ERTS_BIF_REDS_LEFT(p);
+ SWord reds = initial_reds;
+
+ ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock));
+
+ if ((tb->common.status & (DB_DELETE|DB_BUSY)) != DB_BUSY)
+ return;
+
+ reds = tb->common.meth->db_delete_all_objects(p, tb, reds);
+
+ if (reds < 0) {
+ BUMP_ALL_REDS(p);
+ }
+ else {
+ tb->common.status |= tb->common.type & (DB_PRIVATE|DB_PROTECTED|DB_PUBLIC);
+ tb->common.status &= ~DB_BUSY;
+ BUMP_REDS(p, (initial_reds - reds));
+ }
}
/*
@@ -2110,9 +2222,7 @@ BIF_RETTYPE ets_delete_2(BIF_ALIST_2)
CHECK_TABLES();
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE_REC)) == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_delete_2);
cret = tb->common.meth->db_erase(tb,BIF_ARG_2,&ret);
@@ -2139,9 +2249,8 @@ BIF_RETTYPE ets_delete_object_2(BIF_ALIST_2)
CHECK_TABLES();
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE_REC)) == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_delete_object_2);
+
if (is_not_tuple(BIF_ARG_2) ||
(arityval(*tuple_val(BIF_ARG_2)) < tb->common.keypos)) {
db_unlock(tb, LCK_WRITE_REC);
@@ -2174,15 +2283,14 @@ static BIF_RETTYPE ets_select_delete_1(BIF_ALIST_1)
Eterm ret;
Eterm *tptr;
db_lock_kind_t kind = LCK_WRITE_REC;
-
+
CHECK_TABLES();
ASSERT(is_tuple(a1));
tptr = tuple_val(a1);
ASSERT(arityval(*tptr) >= 1);
- if ((tb = db_get_table(p, tptr[1], DB_WRITE, kind)) == NULL) {
- BIF_ERROR(p,BADARG);
- }
+ DB_TRAP_GET_TABLE(tb, tptr[1], DB_WRITE, kind,
+ &ets_select_delete_continue_exp);
cret = tb->common.meth->db_select_delete_continue(p,tb,a1,&ret);
@@ -2206,7 +2314,10 @@ static BIF_RETTYPE ets_select_delete_1(BIF_ALIST_1)
}
-BIF_RETTYPE ets_select_delete_2(BIF_ALIST_2)
+/*
+ * ets:select_delete/2 without special case for "delete-all".
+ */
+BIF_RETTYPE ets_internal_select_delete_2(BIF_ALIST_2)
{
BIF_RETTYPE result;
DbTable* tb;
@@ -2216,20 +2327,8 @@ BIF_RETTYPE ets_select_delete_2(BIF_ALIST_2)
CHECK_TABLES();
- if(eq(BIF_ARG_2, ms_delete_all)) {
- int nitems;
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
- nitems = erts_atomic_read_nob(&tb->common.nitems);
- tb->common.meth->db_delete_all_objects(BIF_P, tb);
- db_unlock(tb, LCK_WRITE);
- BIF_RET(erts_make_integer(nitems,BIF_P));
- }
+ DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_internal_select_delete_2);
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE_REC)) == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
safety = ITERATION_SAFETY(BIF_P,tb);
if (safety == ITER_UNSAFE) {
local_fix_table(tb);
@@ -2521,9 +2620,8 @@ BIF_RETTYPE ets_slot_2(BIF_ALIST_2)
CHECK_TABLES();
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_slot_2);
+
/* The slot number is checked in table specific code. */
cret = tb->common.meth->db_slot(BIF_P, tb, BIF_ARG_2, &ret);
db_unlock(tb, LCK_READ);
@@ -2543,41 +2641,53 @@ BIF_RETTYPE ets_slot_2(BIF_ALIST_2)
BIF_RETTYPE ets_match_1(BIF_ALIST_1)
{
- return ets_select1(BIF_P, BIF_ARG_1);
+ return ets_select1(BIF_P, BIF_ets_match_1, BIF_ARG_1);
}
BIF_RETTYPE ets_match_2(BIF_ALIST_2)
{
+ DbTable* tb;
Eterm ms;
DeclareTmpHeap(buff,8,BIF_P);
Eterm *hp = buff;
Eterm res;
+ DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_match_2);
+
UseTmpHeap(8,BIF_P);
ms = CONS(hp, am_DollarDollar, NIL);
hp += 2;
ms = TUPLE3(hp, BIF_ARG_2, NIL, ms);
hp += 4;
ms = CONS(hp, ms, NIL);
- res = ets_select2(BIF_P, BIF_ARG_1, ms);
+ res = ets_select2(BIF_P, tb, BIF_ARG_1, ms);
UnUseTmpHeap(8,BIF_P);
return res;
}
BIF_RETTYPE ets_match_3(BIF_ALIST_3)
{
+ DbTable* tb;
Eterm ms;
+ Sint chunk_size;
DeclareTmpHeap(buff,8,BIF_P);
Eterm *hp = buff;
Eterm res;
+ /* Chunk size strictly greater than 0 */
+ if (is_not_small(BIF_ARG_3) || (chunk_size = signed_val(BIF_ARG_3)) <= 0) {
+ BIF_ERROR(BIF_P, BADARG);
+ }
+
+ DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_match_3);
+
UseTmpHeap(8,BIF_P);
ms = CONS(hp, am_DollarDollar, NIL);
hp += 2;
ms = TUPLE3(hp, BIF_ARG_2, NIL, ms);
hp += 4;
ms = CONS(hp, ms, NIL);
- res = ets_select3(BIF_P, BIF_ARG_1, ms, BIF_ARG_3);
+ res = ets_select3(BIF_P, tb, BIF_ARG_1, ms, chunk_size);
UnUseTmpHeap(8,BIF_P);
return res;
}
@@ -2585,34 +2695,35 @@ BIF_RETTYPE ets_match_3(BIF_ALIST_3)
BIF_RETTYPE ets_select_3(BIF_ALIST_3)
{
- return ets_select3(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
+ DbTable* tb;
+ Sint chunk_size;
+
+ /* Chunk size strictly greater than 0 */
+ if (is_not_small(BIF_ARG_3) || (chunk_size = signed_val(BIF_ARG_3)) <= 0) {
+ BIF_ERROR(BIF_P, BADARG);
+ }
+
+ DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_3);
+
+ return ets_select3(BIF_P, tb, BIF_ARG_1, BIF_ARG_2, chunk_size);
}
static BIF_RETTYPE
-ets_select3(Process* p, Eterm arg1, Eterm arg2, Eterm arg3)
+ets_select3(Process* p, DbTable* tb, Eterm tid, Eterm ms, Sint chunk_size)
{
BIF_RETTYPE result;
- DbTable* tb;
int cret;
Eterm ret;
- Sint chunk_size;
enum DbIterSafety safety;
CHECK_TABLES();
- /* Chunk size strictly greater than 0 */
- if (is_not_small(arg3) || (chunk_size = signed_val(arg3)) <= 0) {
- BIF_ERROR(p, BADARG);
- }
- if ((tb = db_get_table(p, arg1, DB_READ, LCK_READ)) == NULL) {
- BIF_ERROR(p, BADARG);
- }
safety = ITERATION_SAFETY(p,tb);
if (safety == ITER_UNSAFE) {
local_fix_table(tb);
}
- cret = tb->common.meth->db_select_chunk(p, tb, arg1,
- arg2, chunk_size,
+ cret = tb->common.meth->db_select_chunk(p, tb, tid,
+ ms, chunk_size,
0 /* not reversed */,
&ret);
if (DID_TRAP(p,ret) && safety != ITER_SAFE) {
@@ -2658,9 +2769,8 @@ static BIF_RETTYPE ets_select_trap_1(BIF_ALIST_1)
tptr = tuple_val(a1);
ASSERT(arityval(*tptr) >= 1);
- if ((tb = db_get_table(p, tptr[1], DB_READ, kind)) == NULL) {
- BIF_ERROR(p, BADARG);
- }
+ DB_TRAP_GET_TABLE(tb, tptr[1], DB_READ, kind,
+ &ets_select_continue_exp);
cret = tb->common.meth->db_select_continue(p, tb, a1,
&ret);
@@ -2690,10 +2800,10 @@ static BIF_RETTYPE ets_select_trap_1(BIF_ALIST_1)
BIF_RETTYPE ets_select_1(BIF_ALIST_1)
{
- return ets_select1(BIF_P, BIF_ARG_1);
+ return ets_select1(BIF_P, BIF_ets_select_1, BIF_ARG_1);
}
-static BIF_RETTYPE ets_select1(Process *p, Eterm arg1)
+static BIF_RETTYPE ets_select1(Process *p, int bif_ix, Eterm arg1)
{
BIF_RETTYPE result;
DbTable* tb;
@@ -2715,10 +2825,10 @@ static BIF_RETTYPE ets_select1(Process *p, Eterm arg1)
BIF_ERROR(p, BADARG);
}
tptr = tuple_val(arg1);
- if (arityval(*tptr) < 1 ||
- (tb = db_get_table(p, tptr[1], DB_READ, LCK_READ)) == NULL) {
- BIF_ERROR(p, BADARG);
- }
+ if (arityval(*tptr) < 1)
+ BIF_ERROR(p, BADARG);
+
+ DB_GET_TABLE(tb, tptr[1], DB_READ, LCK_READ, bif_ix, NULL, p);
safety = ITERATION_SAFETY(p,tb);
if (safety == ITER_UNSAFE) {
@@ -2754,33 +2864,27 @@ static BIF_RETTYPE ets_select1(Process *p, Eterm arg1)
BIF_RETTYPE ets_select_2(BIF_ALIST_2)
{
- return ets_select2(BIF_P, BIF_ARG_1, BIF_ARG_2);
+ DbTable* tb;
+ DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_2);
+ return ets_select2(BIF_P, tb, BIF_ARG_1, BIF_ARG_2);
}
static BIF_RETTYPE
-ets_select2(Process* p, Eterm arg1, Eterm arg2)
+ets_select2(Process* p, DbTable* tb, Eterm tid, Eterm ms)
{
BIF_RETTYPE result;
- DbTable* tb;
int cret;
enum DbIterSafety safety;
Eterm ret;
CHECK_TABLES();
- /*
- * Make sure that the table exists.
- */
-
- if ((tb = db_get_table(p, arg1, DB_READ, LCK_READ)) == NULL) {
- BIF_ERROR(p, BADARG);
- }
safety = ITERATION_SAFETY(p,tb);
if (safety == ITER_UNSAFE) {
local_fix_table(tb);
}
- cret = tb->common.meth->db_select(p, tb, arg1, arg2, 0, &ret);
+ cret = tb->common.meth->db_select(p, tb, tid, ms, 0, &ret);
if (DID_TRAP(p,ret) && safety != ITER_SAFE) {
fix_table_locked(p, tb);
@@ -2823,9 +2927,9 @@ static BIF_RETTYPE ets_select_count_1(BIF_ALIST_1)
tptr = tuple_val(a1);
ASSERT(arityval(*tptr) >= 1);
- if ((tb = db_get_table(p, tptr[1], DB_READ, kind)) == NULL) {
- BIF_ERROR(p, BADARG);
- }
+
+ DB_TRAP_GET_TABLE(tb, tptr[1], DB_READ, kind,
+ &ets_select_count_continue_exp);
cret = tb->common.meth->db_select_count_continue(p, tb, a1, &ret);
@@ -2860,13 +2964,9 @@ BIF_RETTYPE ets_select_count_2(BIF_ALIST_2)
Eterm ret;
CHECK_TABLES();
- /*
- * Make sure that the table exists.
- */
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_count_2);
+
safety = ITERATION_SAFETY(BIF_P,tb);
if (safety == ITER_UNSAFE) {
local_fix_table(tb);
@@ -2916,9 +3016,8 @@ static BIF_RETTYPE ets_select_replace_1(BIF_ALIST_1)
tptr = tuple_val(a1);
ASSERT(arityval(*tptr) >= 1);
- if ((tb = db_get_table(p, tptr[1], DB_WRITE, kind)) == NULL) {
- BIF_ERROR(p,BADARG);
- }
+ DB_TRAP_GET_TABLE(tb, tptr[1], DB_WRITE, kind,
+ &ets_select_replace_continue_exp);
cret = tb->common.meth->db_select_replace_continue(p,tb,a1,&ret);
@@ -2952,9 +3051,7 @@ BIF_RETTYPE ets_select_replace_2(BIF_ALIST_2)
CHECK_TABLES();
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE_REC)) == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_select_replace_2);
if (tb->common.status & DB_BAG) {
/* Bag implementation presented both semantic consistency
@@ -3005,13 +3102,8 @@ BIF_RETTYPE ets_select_reverse_3(BIF_ALIST_3)
Sint chunk_size;
CHECK_TABLES();
- /*
- * Make sure that the table exists.
- */
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_reverse_3);
/* Chunk size strictly greater than 0 */
if (is_not_small(BIF_ARG_3) || (chunk_size = signed_val(BIF_ARG_3)) <= 0) {
@@ -3049,7 +3141,7 @@ BIF_RETTYPE ets_select_reverse_3(BIF_ALIST_3)
BIF_RETTYPE ets_select_reverse_1(BIF_ALIST_1)
{
- return ets_select1(BIF_P, BIF_ARG_1);
+ return ets_select1(BIF_P, BIF_ets_select_reverse_1, BIF_ARG_1);
}
BIF_RETTYPE ets_select_reverse_2(BIF_ALIST_2)
@@ -3061,13 +3153,9 @@ BIF_RETTYPE ets_select_reverse_2(BIF_ALIST_2)
Eterm ret;
CHECK_TABLES();
- /*
- * Make sure that the table exists.
- */
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) {
- BIF_ERROR(BIF_P, BADARG);
- }
+ DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_reverse_2);
+
safety = ITERATION_SAFETY(BIF_P,tb);
if (safety == ITER_UNSAFE) {
local_fix_table(tb);
@@ -3099,45 +3187,63 @@ BIF_RETTYPE ets_select_reverse_2(BIF_ALIST_2)
/*
-** ets:match_object(Continuation), ets:match_object(Table, Pattern), ets:match_object(Table,Pattern,ChunkSize)
+** ets:match_object(Continuation)
*/
BIF_RETTYPE ets_match_object_1(BIF_ALIST_1)
{
- return ets_select1(BIF_P, BIF_ARG_1);
+ return ets_select1(BIF_P, BIF_ets_match_object_1, BIF_ARG_1);
}
+/*
+** ets:match_object(Table, Pattern)
+*/
BIF_RETTYPE ets_match_object_2(BIF_ALIST_2)
{
+ DbTable* tb;
Eterm ms;
DeclareTmpHeap(buff,8,BIF_P);
Eterm *hp = buff;
Eterm res;
+ DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_match_object_2);
+
UseTmpHeap(8,BIF_P);
ms = CONS(hp, am_DollarUnderscore, NIL);
hp += 2;
ms = TUPLE3(hp, BIF_ARG_2, NIL, ms);
hp += 4;
ms = CONS(hp, ms, NIL);
- res = ets_select2(BIF_P, BIF_ARG_1, ms);
+ res = ets_select2(BIF_P, tb, BIF_ARG_1, ms);
UnUseTmpHeap(8,BIF_P);
return res;
}
+/*
+** ets:match_object(Table,Pattern,ChunkSize)
+*/
BIF_RETTYPE ets_match_object_3(BIF_ALIST_3)
{
+ DbTable* tb;
+ Sint chunk_size;
Eterm ms;
DeclareTmpHeap(buff,8,BIF_P);
Eterm *hp = buff;
Eterm res;
+ /* Chunk size strictly greater than 0 */
+ if (is_not_small(BIF_ARG_3) || (chunk_size = signed_val(BIF_ARG_3)) <= 0) {
+ BIF_ERROR(BIF_P, BADARG);
+ }
+
+ DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_match_object_3);
+
UseTmpHeap(8,BIF_P);
ms = CONS(hp, am_DollarUnderscore, NIL);
hp += 2;
ms = TUPLE3(hp, BIF_ARG_2, NIL, ms);
hp += 4;
ms = CONS(hp, ms, NIL);
- res = ets_select3(BIF_P, BIF_ARG_1, ms, BIF_ARG_3);
+ res = ets_select3(BIF_P, tb, BIF_ARG_1, ms, chunk_size);
UnUseTmpHeap(8,BIF_P);
return res;
}
@@ -3158,16 +3264,17 @@ BIF_RETTYPE ets_info_1(BIF_ALIST_1)
Eterm res;
int i;
Eterm* hp;
+ Uint freason;
/*Process* rp = NULL;*/
/* If/when we implement lockless private tables:
Eterm owner;
*/
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ)) == NULL) {
- if (is_atom(BIF_ARG_1) || is_ref(BIF_ARG_1)) {
+ if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ, &freason)) == NULL) {
+ if (freason == BADARG && (is_atom(BIF_ARG_1) || is_ref(BIF_ARG_1)))
BIF_RET(am_undefined);
- }
- BIF_ERROR(BIF_P, BADARG);
+ else
+ return db_bif_fail(BIF_P, freason, BIF_ets_info_1, NULL);
}
/* If/when we implement lockless private tables:
@@ -3224,12 +3331,13 @@ BIF_RETTYPE ets_info_2(BIF_ALIST_2)
{
DbTable* tb;
Eterm ret = THE_NON_VALUE;
+ Uint freason;
- if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ)) == NULL) {
- if (is_atom(BIF_ARG_1) || is_ref(BIF_ARG_1)) {
+ if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ, &freason)) == NULL) {
+ if (freason == BADARG && (is_atom(BIF_ARG_1) || is_ref(BIF_ARG_1)))
BIF_RET(am_undefined);
- }
- BIF_ERROR(BIF_P, BADARG);
+ else
+ return db_bif_fail(BIF_P, freason, BIF_ets_info_2, NULL);
}
ret = table_info(BIF_P, tb, BIF_ARG_2);
db_unlock(tb, LCK_READ);
@@ -3317,7 +3425,6 @@ int erts_ets_rwmtx_spin_count = -1;
void init_db(ErtsDbSpinCount db_spin_count)
{
int i;
- Eterm *hp;
unsigned bits;
size_t size;
@@ -3420,13 +3527,6 @@ void init_db(ErtsDbSpinCount db_spin_count)
erts_init_trap_export(&ets_delete_continue_exp,
am_ets, am_atom_put("delete_trap",11), 1,
&ets_delete_trap);
-
- hp = ms_delete_all_buff;
- ms_delete_all = CONS(hp, am_true, NIL);
- hp += 2;
- ms_delete_all = TUPLE3(hp,am_Underscore,NIL,ms_delete_all);
- hp +=4;
- ms_delete_all = CONS(hp, ms_delete_all,NIL);
}
void
@@ -3788,7 +3888,8 @@ unlocked:
erts_rwmtx_runlock(&tb->common.rwlock);
erts_rwmtx_rwlock(&tb->common.rwlock);
*kind_p = LCK_WRITE;
- if (tb->common.status & DB_DELETE) return;
+ if (tb->common.status & (DB_DELETE|DB_BUSY))
+ return;
}
db_unfix_table_hash(&(tb->hash));
}
@@ -4318,5 +4419,8 @@ void erts_lcnt_update_db_locks(int enable) {
erts_schedule_multi_misc_aux_work(0, erts_no_schedulers,
&lcnt_update_db_locks_per_sched, (void*)(UWord)enable);
}
-
#endif /* ERTS_ENABLE_LOCK_COUNT */
+
+#ifdef ETS_DBG_FORCE_TRAP
+erts_aint_t erts_ets_dbg_force_trap = 0;
+#endif
diff --git a/erts/emulator/beam/erl_db.h b/erts/emulator/beam/erl_db.h
index eb6da2c9fb..db86c81914 100644
--- a/erts/emulator/beam/erl_db.h
+++ b/erts/emulator/beam/erl_db.h
@@ -135,6 +135,10 @@ void erts_lcnt_enable_db_lock_count(DbTable *tb, int enable);
void erts_lcnt_update_db_locks(int enable);
#endif
+#ifdef ETS_DBG_FORCE_TRAP
+extern erts_aint_t erts_ets_dbg_force_trap;
+#endif
+
#endif /* ERL_DB_H__ */
#if defined(ERTS_WANT_DB_INTERNAL__) && !defined(ERTS_HAVE_DB_INTERNAL__)
diff --git a/erts/emulator/beam/erl_db_hash.c b/erts/emulator/beam/erl_db_hash.c
index cb5c496e90..8bbd0cd9a3 100644
--- a/erts/emulator/beam/erl_db_hash.c
+++ b/erts/emulator/beam/erl_db_hash.c
@@ -411,7 +411,7 @@ static void db_foreach_offheap_hash(DbTable *,
void (*)(ErlOffHeap *, void *),
void *);
-static int db_delete_all_objects_hash(Process* p, DbTable* tbl);
+static SWord db_delete_all_objects_hash(Process* p, DbTable* tbl, SWord reds);
#ifdef HARDDEBUG
static void db_check_table_hash(DbTableHash *tb);
#endif
@@ -2255,7 +2255,7 @@ void db_initialize_hash(void)
}
-int db_mark_all_deleted_hash(DbTable *tbl)
+static SWord db_mark_all_deleted_hash(DbTable *tbl, SWord reds)
{
DbTableHash *tb = &tbl->hash;
HashDbTerm* list;
@@ -2270,10 +2270,11 @@ int db_mark_all_deleted_hash(DbTable *tbl)
list->hvalue = INVALID_HASH;
list = list->next;
}while(list != NULL);
+ reds--;
}
}
erts_atomic_set_nob(&tb->common.nitems, 0);
- return DB_ERROR_NONE;
+ return reds < 0 ? 0 : reds; /* ToDo: Yield! */
}
@@ -3073,16 +3074,20 @@ db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle)
return;
}
-static int db_delete_all_objects_hash(Process* p, DbTable* tbl)
+static SWord db_delete_all_objects_hash(Process* p, DbTable* tbl, SWord reds)
{
if (IS_FIXED(tbl)) {
- db_mark_all_deleted_hash(tbl);
+ /* ToDo: Yield! */
+ reds = db_mark_all_deleted_hash(tbl, reds);
} else {
- db_free_table_hash(tbl);
+ reds = db_free_table_continue_hash(tbl, reds);
+ if (reds < 0)
+ return reds;
+
db_create_hash(p, tbl);
erts_atomic_set_nob(&tbl->hash.common.nitems, 0);
}
- return 0;
+ return reds;
}
void db_foreach_offheap_hash(DbTable *tbl,
diff --git a/erts/emulator/beam/erl_db_hash.h b/erts/emulator/beam/erl_db_hash.h
index 7d27609825..5f88170ced 100644
--- a/erts/emulator/beam/erl_db_hash.h
+++ b/erts/emulator/beam/erl_db_hash.h
@@ -86,9 +86,6 @@ int db_get_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret);
int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret);
-/* not yet in method table */
-int db_mark_all_deleted_hash(DbTable *tbl);
-
typedef struct {
float avg_chain_len;
float std_dev_chain_len;
diff --git a/erts/emulator/beam/erl_db_tree.c b/erts/emulator/beam/erl_db_tree.c
index 5a276b9d88..5ef5451289 100644
--- a/erts/emulator/beam/erl_db_tree.c
+++ b/erts/emulator/beam/erl_db_tree.c
@@ -436,7 +436,7 @@ static void db_foreach_offheap_tree(DbTable *,
void (*)(ErlOffHeap *, void *),
void *);
-static int db_delete_all_objects_tree(Process* p, DbTable* tbl);
+static SWord db_delete_all_objects_tree(Process* p, DbTable* tbl, SWord reds);
#ifdef HARDDEBUG
static void db_check_table_tree(DbTable *tbl);
@@ -2023,12 +2023,14 @@ static SWord db_free_table_continue_tree(DbTable *tbl, SWord reds)
return reds;
}
-static int db_delete_all_objects_tree(Process* p, DbTable* tbl)
+static SWord db_delete_all_objects_tree(Process* p, DbTable* tbl, SWord reds)
{
- db_free_table_tree(tbl);
+ reds = db_free_table_continue_tree(tbl, reds);
+ if (reds < 0)
+ return reds;
db_create_tree(p, tbl);
erts_atomic_set_nob(&tbl->tree.common.nitems, 0);
- return 0;
+ return reds;
}
static void do_db_tree_foreach_offheap(TreeDbTerm *,
diff --git a/erts/emulator/beam/erl_db_util.h b/erts/emulator/beam/erl_db_util.h
index 6b126f35d6..db8dfa5be4 100644
--- a/erts/emulator/beam/erl_db_util.h
+++ b/erts/emulator/beam/erl_db_util.h
@@ -32,6 +32,7 @@
** DMC_DEBUG does NOT need DEBUG, but DEBUG needs DMC_DEBUG
*/
#define DMC_DEBUG 1
+#define ETS_DBG_FORCE_TRAP 1
#endif
/*
@@ -180,8 +181,7 @@ typedef struct db_table_method
Eterm* ret);
int (*db_take)(Process *, DbTable *, Eterm, Eterm *);
- int (*db_delete_all_objects)(Process* p,
- DbTable* db /* [in out] */ );
+ SWord (*db_delete_all_objects)(Process* p, DbTable* db, SWord reds);
int (*db_free_table)(DbTable* db /* [in out] */ );
SWord (*db_free_table_continue)(DbTable* db, SWord reds);
@@ -267,6 +267,10 @@ typedef struct db_table_common {
Uint32 status; /* bit masks defined below */
int keypos; /* defaults to 1 */
int compress;
+
+#ifdef ETS_DBG_FORCE_TRAP
+ erts_atomic_t dbg_force_trap; /* &1 force enabled, &2 trap this call */
+#endif
} DbTableCommon;
/* These are status bit patterns */
@@ -281,9 +285,7 @@ typedef struct db_table_common {
#define DB_FINE_LOCKED (1 << 8) /* write_concurrency */
#define DB_FREQ_READ (1 << 9) /* read_concurrency */
#define DB_NAMED_TABLE (1 << 10)
-
-#define ERTS_ETS_TABLE_TYPES (DB_BAG|DB_SET|DB_DUPLICATE_BAG|DB_ORDERED_SET\
- |DB_FINE_LOCKED|DB_FREQ_READ|DB_NAMED_TABLE)
+#define DB_BUSY (1 << 11)
#define IS_HASH_TABLE(Status) (!!((Status) & \
(DB_BAG | DB_SET | DB_DUPLICATE_BAG)))