diff options
author | Sverker Eriksson <[email protected]> | 2018-05-18 14:33:52 +0200 |
---|---|---|
committer | Sverker Eriksson <[email protected]> | 2018-05-18 14:33:52 +0200 |
commit | 6024dea0b587518aa222fa198f007d7e069a89b1 (patch) | |
tree | 44720659fa2061061095ad17956fda651af12a9d | |
parent | a01ffa40c55380a9749c17458d0156443e415b5e (diff) | |
parent | e7579f608a1bdb52271c57e9ad69a7752ea1371b (diff) | |
download | otp-6024dea0b587518aa222fa198f007d7e069a89b1.tar.gz otp-6024dea0b587518aa222fa198f007d7e069a89b1.tar.bz2 otp-6024dea0b587518aa222fa198f007d7e069a89b1.zip |
Merge branch 'sverker/ets-delete_all_objects-trap/OTP-15078'
* sverker/ets-delete_all_objects-trap/OTP-15078:
erts: Rename untrapping db_free_*empty*_table
erts: Make ets:delete_all_objects yield on fixed table
erts: Optimize ets delete all in fixed table
erts: Refactor ets select iteration code
erts: Cleanup ets code
erts: Optimize ets hash object deallocactions
erts: Refactor pseudo deleted ets objects
erts: Make atomic ets:delete_all_objects yield
erts: Fix reduction bump for ets:delete/1
-rw-r--r-- | bootstrap/lib/stdlib/ebin/ets.beam | bin | 22280 -> 22460 bytes | |||
-rw-r--r-- | erts/emulator/beam/bif.tab | 4 | ||||
-rw-r--r-- | erts/emulator/beam/erl_bif_info.c | 9 | ||||
-rw-r--r-- | erts/emulator/beam/erl_binary.h | 4 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db.c | 577 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db.h | 4 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_hash.c | 956 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_hash.h | 20 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_tree.c | 60 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_util.c | 2 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_util.h | 16 | ||||
-rw-r--r-- | lib/stdlib/src/ets.erl | 26 | ||||
-rw-r--r-- | lib/stdlib/test/ets_SUITE.erl | 58 |
13 files changed, 999 insertions, 737 deletions
diff --git a/bootstrap/lib/stdlib/ebin/ets.beam b/bootstrap/lib/stdlib/ebin/ets.beam Binary files differindex 3d103b1624..ab4996ef4e 100644 --- a/bootstrap/lib/stdlib/ebin/ets.beam +++ b/bootstrap/lib/stdlib/ebin/ets.beam diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index 21dda9ff03..7548924178 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -342,7 +342,6 @@ bif ets:internal_request_all/0 bif ets:new/2 bif ets:delete/1 bif ets:delete/2 -bif ets:delete_all_objects/1 bif ets:delete_object/2 bif ets:first/1 bif ets:is_compiled_ms/1 @@ -373,7 +372,6 @@ bif ets:select_count/2 bif ets:select_reverse/1 bif ets:select_reverse/2 bif ets:select_reverse/3 -bif ets:select_delete/2 bif ets:select_replace/2 bif ets:match_spec_compile/1 bif ets:match_spec_run_r/3 @@ -698,3 +696,5 @@ bif erts_internal:gather_alloc_histograms/1 bif erts_internal:gather_carrier_info/1 ubif erlang:map_get/2 ubif erlang:is_map_key/2 +bif ets:internal_delete_all/2 +bif ets:internal_select_delete/2 diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 55450e397f..8b2b1a58c7 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -4682,7 +4682,14 @@ BIF_RETTYPE erts_debug_set_internal_state_2(BIF_ALIST_2) refbin)); } } - + else if (ERTS_IS_ATOM_STR("ets_force_trap", BIF_ARG_1)) { +#ifdef ETS_DBG_FORCE_TRAP + erts_ets_dbg_force_trap = (BIF_ARG_2 == am_true) ? 1 : 0; + BIF_RET(am_ok); +#else + BIF_RET(am_notsup); +#endif + } } BIF_ERROR(BIF_P, BADARG); diff --git a/erts/emulator/beam/erl_binary.h b/erts/emulator/beam/erl_binary.h index 46653a8580..7dfd0c273a 100644 --- a/erts/emulator/beam/erl_binary.h +++ b/erts/emulator/beam/erl_binary.h @@ -146,9 +146,7 @@ typedef union { /* A "magic" binary flag */ #define BIN_FLAG_MAGIC 1 -#define BIN_FLAG_USR1 2 /* Reserved for use by different modules too mark */ -#define BIN_FLAG_USR2 4 /* certain binaries as special (used by ets) */ -#define BIN_FLAG_DRV 8 +#define BIN_FLAG_DRV 2 #endif /* ERL_BINARY_H__TYPES__ */ diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c index ca2ebb7c27..3a29f8cf56 100644 --- a/erts/emulator/beam/erl_db.c +++ b/erts/emulator/beam/erl_db.c @@ -50,6 +50,34 @@ erts_atomic_t erts_ets_misc_mem_size; ** Utility macros */ +#define DB_BIF_GET_TABLE(TB, WHAT, KIND, BIF_IX) \ + DB_GET_TABLE(TB, BIF_ARG_1, WHAT, KIND, BIF_IX, NULL, BIF_P) + +#define DB_TRAP_GET_TABLE(TB, TID, WHAT, KIND, BIF_EXP) \ + DB_GET_TABLE(TB, TID, WHAT, KIND, 0, BIF_EXP, BIF_P) + +#define DB_GET_TABLE(TB, TID, WHAT, KIND, BIF_IX, BIF_EXP, PROC) \ +do { \ + Uint freason__; \ + if (!(TB = db_get_table(PROC, TID, WHAT, KIND, &freason__))) { \ + return db_bif_fail(PROC, freason__, BIF_IX, BIF_EXP); \ + } \ +}while(0) + +static BIF_RETTYPE db_bif_fail(Process* p, Uint freason, + Uint bif_ix, Export* bif_exp) +{ + if (freason == TRAP) { + if (!bif_exp) + bif_exp = bif_export[bif_ix]; + p->arity = bif_exp->info.mfa.arity; + p->i = (BeamInstr*) bif_exp->addressv[erts_active_code_ix()]; + } + p->freason = freason; + return THE_NON_VALUE; +} + + /* Get a key from any table structure and a tagged object */ #define TERM_GETKEY(tb, obj) db_getkey((tb)->common.keypos, (obj)) @@ -326,8 +354,7 @@ struct meta_name_tab_entry* meta_name_tab_bucket(Eterm name, typedef enum { LCK_READ=1, /* read only access */ LCK_WRITE=2, /* exclusive table write access */ - LCK_WRITE_REC=3, /* record write access */ - LCK_NONE=4 + LCK_WRITE_REC=3 /* record write access */ } db_lock_kind_t; extern DbTableMethod db_hash; @@ -337,9 +364,6 @@ int user_requested_db_max_tabs; int erts_ets_realloc_always_moves; int erts_ets_always_compress; static int db_max_tabs; -static Eterm ms_delete_all; -static Eterm ms_delete_all_buff[8]; /* To compare with for deletion - of all objects */ /* ** Forward decls, static functions @@ -351,18 +375,19 @@ static void set_heir(Process* me, DbTable* tb, Eterm heir, UWord heir_data); static void free_heir_data(DbTable*); static SWord free_fixations_locked(Process* p, DbTable *tb); +static void delete_all_objects_continue(Process* p, DbTable* tb); static SWord free_table_continue(Process *p, DbTable *tb, SWord reds); static void print_table(fmtfn_t to, void *to_arg, int show, DbTable* tb); -static BIF_RETTYPE ets_select_delete_1(BIF_ALIST_1); +static BIF_RETTYPE ets_select_delete_trap_1(BIF_ALIST_1); static BIF_RETTYPE ets_select_count_1(BIF_ALIST_1); static BIF_RETTYPE ets_select_replace_1(BIF_ALIST_1); static BIF_RETTYPE ets_select_trap_1(BIF_ALIST_1); static BIF_RETTYPE ets_delete_trap(BIF_ALIST_1); static Eterm table_info(Process* p, DbTable* tb, Eterm What); -static BIF_RETTYPE ets_select1(Process* p, Eterm arg1); -static BIF_RETTYPE ets_select2(Process* p, Eterm arg1, Eterm arg2); -static BIF_RETTYPE ets_select3(Process* p, Eterm arg1, Eterm arg2, Eterm arg3); +static BIF_RETTYPE ets_select1(Process* p, int bif_ix, Eterm arg1); +static BIF_RETTYPE ets_select2(Process* p, DbTable*, Eterm tid, Eterm ms); +static BIF_RETTYPE ets_select3(Process* p, DbTable*, Eterm tid, Eterm ms, Sint chunk_size); /* @@ -636,15 +661,42 @@ static ERTS_INLINE void db_unlock(DbTable* tb, db_lock_kind_t kind) } } +static ERTS_INLINE int db_is_exclusive(DbTable* tb, db_lock_kind_t kind) +{ + return kind != LCK_READ && tb->common.is_thread_safe; +} + +static DbTable* handle_lacking_permission(Process* p, DbTable* tb, + db_lock_kind_t kind, + Uint* freason_p) +{ + if (tb->common.status & DB_BUSY) { + if (!db_is_exclusive(tb, kind)) { + db_unlock(tb, kind); + db_lock(tb, LCK_WRITE); + } + delete_all_objects_continue(p, tb); + db_unlock(tb, LCK_WRITE); + tb = NULL; + *freason_p = TRAP; + } + else if (p->common.id != tb->common.owner) { + db_unlock(tb, kind); + tb = NULL; + *freason_p = BADARG; + } + return tb; +} + static ERTS_INLINE DbTable* db_get_table_aux(Process *p, Eterm id, int what, db_lock_kind_t kind, - int meta_already_locked) + int meta_already_locked, + Uint* freason_p) { DbTable *tb; - erts_rwmtx_t *mtl = NULL; /* * IMPORTANT: Only scheduler threads are allowed @@ -654,13 +706,13 @@ DbTable* db_get_table_aux(Process *p, ASSERT(erts_get_scheduler_data()); if (is_atom(id)) { + erts_rwmtx_t *mtl; struct meta_name_tab_entry* bucket = meta_name_tab_bucket(id,&mtl); if (!meta_already_locked) erts_rwmtx_rlock(mtl); else{ ERTS_LC_ASSERT(erts_lc_rwmtx_is_rlocked(mtl) || erts_lc_rwmtx_is_rwlocked(mtl)); - mtl = NULL; } tb = NULL; if (bucket->pu.tb != NULL) { @@ -679,20 +731,29 @@ DbTable* db_get_table_aux(Process *p, } } } + if (!meta_already_locked) + erts_rwmtx_runlock(mtl); } else tb = tid2tab(id); if (tb) { db_lock(tb, kind); - if ((tb->common.status & what) == 0 - && p->common.id != tb->common.owner) { - db_unlock(tb, kind); - tb = NULL; - } +#ifdef ETS_DBG_FORCE_TRAP + if (erts_atomic_read_nob(&tb->common.dbg_force_trap) && + erts_atomic_add_read_nob(&tb->common.dbg_force_trap, 2) & 2) { + db_unlock(tb, kind); + tb = NULL; + *freason_p = TRAP; + } + else +#endif + if (ERTS_UNLIKELY(!(tb->common.status & what))) + tb = handle_lacking_permission(p, tb, kind, freason_p); } - if (mtl) - erts_rwmtx_runlock(mtl); + else + *freason_p = BADARG; + return tb; } @@ -700,9 +761,10 @@ static ERTS_INLINE DbTable* db_get_table(Process *p, Eterm id, int what, - db_lock_kind_t kind) + db_lock_kind_t kind, + Uint* freason_p) { - return db_get_table_aux(p, id, what, kind, 0); + return db_get_table_aux(p, id, what, kind, 0, freason_p); } static int insert_named_tab(Eterm name_atom, DbTable* tb, int have_lock) @@ -868,9 +930,7 @@ BIF_RETTYPE ets_safe_fixtable_2(BIF_ALIST_2) #endif kind = (BIF_ARG_2 == am_true) ? LCK_READ : LCK_WRITE_REC; - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, kind)) == NULL) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_READ, kind, BIF_ets_safe_fixtable_2); if (BIF_ARG_2 == am_true) { fix_table_locked(BIF_P, tb); @@ -900,11 +960,7 @@ BIF_RETTYPE ets_first_1(BIF_ALIST_1) CHECK_TABLES(); - tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ); - - if (!tb) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_first_1); cret = tb->common.meth->db_first(BIF_P, tb, &ret); @@ -927,11 +983,7 @@ BIF_RETTYPE ets_next_2(BIF_ALIST_2) CHECK_TABLES(); - tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ); - - if (!tb) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_next_2); cret = tb->common.meth->db_next(BIF_P, tb, BIF_ARG_2, &ret); @@ -954,11 +1006,7 @@ BIF_RETTYPE ets_last_1(BIF_ALIST_1) CHECK_TABLES(); - tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ); - - if (!tb) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_last_1); cret = tb->common.meth->db_last(BIF_P, tb, &ret); @@ -981,11 +1029,7 @@ BIF_RETTYPE ets_prev_2(BIF_ALIST_2) CHECK_TABLES(); - tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ); - - if (!tb) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_prev_2); cret = tb->common.meth->db_prev(BIF_P,tb,BIF_ARG_2,&ret); @@ -1003,21 +1047,15 @@ BIF_RETTYPE ets_prev_2(BIF_ALIST_2) BIF_RETTYPE ets_take_2(BIF_ALIST_2) { DbTable* tb; -#ifdef DEBUG int cret; -#endif Eterm ret; CHECK_TABLES(); - tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE_REC); - if (!tb) { - BIF_ERROR(BIF_P, BADARG); - } -#ifdef DEBUG - cret = -#endif - tb->common.meth->db_take(BIF_P, tb, BIF_ARG_2, &ret); - ASSERT(cret == DB_ERROR_NONE); + DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_take_2); + + cret = tb->common.meth->db_take(BIF_P, tb, BIF_ARG_2, &ret); + + ASSERT(cret == DB_ERROR_NONE); (void)cret; db_unlock(tb, LCK_WRITE_REC); BIF_RET(ret); } @@ -1035,9 +1073,8 @@ BIF_RETTYPE ets_update_element_3(BIF_ALIST_3) DeclareTmpHeap(cell,2,BIF_P); DbUpdateHandle handle; - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE_REC)) == NULL) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_update_element_3); + UseTmpHeap(2,BIF_P); if (!(tb->common.status & (DB_SET | DB_ORDERED_SET))) { goto bail_out; @@ -1108,9 +1145,9 @@ bail_out: } static BIF_RETTYPE -do_update_counter(Process *p, Eterm arg1, Eterm arg2, Eterm arg3, Eterm arg4) +do_update_counter(Process *p, DbTable* tb, + Eterm arg2, Eterm arg3, Eterm arg4) { - DbTable* tb; int cret = DB_ERROR_BADITEM; Eterm upop_list; int list_size; @@ -1126,10 +1163,6 @@ do_update_counter(Process *p, Eterm arg1, Eterm arg2, Eterm arg3, Eterm arg4) Eterm* hstart; Eterm* hend; - if ((tb = db_get_table(p, arg1, DB_WRITE, LCK_WRITE_REC)) == NULL) { - BIF_ERROR(p, BADARG); - } - UseTmpHeap(5, p); if (!(tb->common.status & (DB_SET | DB_ORDERED_SET))) { @@ -1303,7 +1336,11 @@ bail_out: */ BIF_RETTYPE ets_update_counter_3(BIF_ALIST_3) { - return do_update_counter(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, THE_NON_VALUE); + DbTable* tb; + + DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_update_counter_3); + + return do_update_counter(BIF_P, tb, BIF_ARG_2, BIF_ARG_3, THE_NON_VALUE); } /* @@ -1315,10 +1352,14 @@ BIF_RETTYPE ets_update_counter_3(BIF_ALIST_3) */ BIF_RETTYPE ets_update_counter_4(BIF_ALIST_4) { + DbTable* tb; + if (is_not_tuple(BIF_ARG_4)) { BIF_ERROR(BIF_P, BADARG); } - return do_update_counter(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, BIF_ARG_4); + DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_update_counter_4); + + return do_update_counter(BIF_P, tb, BIF_ARG_2, BIF_ARG_3, BIF_ARG_4); } @@ -1339,9 +1380,8 @@ BIF_RETTYPE ets_insert_2(BIF_ALIST_2) kind = ((is_list(BIF_ARG_2) && CDR(list_val(BIF_ARG_2)) != NIL) ? LCK_WRITE : LCK_WRITE_REC); - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, kind)) == NULL) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_WRITE, kind, BIF_ets_insert_2); + if (BIF_ARG_2 == NIL) { db_unlock(tb, kind); BIF_RET(am_true); @@ -1407,11 +1447,9 @@ BIF_RETTYPE ets_insert_new_2(BIF_ALIST_2) /* More than one object, use LCK_WRITE to keep atomicity */ kind = LCK_WRITE; - tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, kind); - if (tb == NULL) { - BIF_ERROR(BIF_P, BADARG); - } - meth = tb->common.meth; + DB_BIF_GET_TABLE(tb, DB_WRITE, kind, BIF_ets_insert_new_2); + + meth = tb->common.meth; for (lst = BIF_ARG_2; is_list(lst); lst = CDR(list_val(lst))) { if (is_not_tuple(CAR(list_val(lst))) || (arityval(*tuple_val(CAR(list_val(lst)))) @@ -1446,9 +1484,8 @@ BIF_RETTYPE ets_insert_new_2(BIF_ALIST_2) /* Only one object (or NIL) */ kind = LCK_WRITE_REC; - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, kind)) == NULL) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_WRITE, kind, BIF_ets_insert_new_2); + if (BIF_ARG_2 == NIL) { db_unlock(tb, kind); BIF_RET(am_true); @@ -1487,6 +1524,7 @@ BIF_RETTYPE ets_rename_2(BIF_ALIST_2) Eterm ret; Eterm old_name; erts_rwmtx_t *lck1, *lck2; + Uint freason; #ifdef HARDDEBUG erts_fprintf(stderr, @@ -1531,9 +1569,9 @@ BIF_RETTYPE ets_rename_2(BIF_ALIST_2) if (lck2) erts_rwmtx_rwlock(lck2); - tb = db_get_table_aux(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE, 1); + tb = db_get_table_aux(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE, 1, &freason); if (!tb) - goto badarg; + goto fail; if (is_table_named(tb)) { if (!insert_named_tab(BIF_ARG_2, tb, 1)) @@ -1553,13 +1591,18 @@ BIF_RETTYPE ets_rename_2(BIF_ALIST_2) if (lck2) erts_rwmtx_rwunlock(lck2); BIF_RET(ret); - badarg: + +badarg: + freason = BADARG; + +fail: if (tb) db_unlock(tb, LCK_WRITE); erts_rwmtx_rwunlock(lck1); if (lck2) erts_rwmtx_rwunlock(lck2); - BIF_ERROR(BIF_P, BADARG); + + return db_bif_fail(BIF_P, freason, BIF_ets_rename_2, NULL); } @@ -1580,9 +1623,7 @@ BIF_RETTYPE ets_new_2(BIF_ALIST_2) Sint keypos; int is_named, is_compressed; int is_fine_locked, frequent_read; -#ifdef DEBUG int cret; -#endif DbTableMethod* meth; if (is_not_atom(BIF_ARG_1)) { @@ -1708,7 +1749,7 @@ BIF_RETTYPE ets_new_2(BIF_ALIST_2) tb->common.meth = meth; tb->common.the_name = BIF_ARG_1; tb->common.status = status; - tb->common.type = status & ERTS_ETS_TABLE_TYPES; + tb->common.type = status; /* Note, 'type' is *read only* from now on... */ erts_refc_init(&tb->common.fix_count, 0); db_init_lock(tb, status & (DB_FINE_LOCKED|DB_FREQ_READ)); @@ -1720,12 +1761,12 @@ BIF_RETTYPE ets_new_2(BIF_ALIST_2) tb->common.fixing_procs = NULL; tb->common.compress = is_compressed; - -#ifdef DEBUG - cret = +#ifdef ETS_DBG_FORCE_TRAP + erts_atomic_init_nob(&tb->common.dbg_force_trap, erts_ets_dbg_force_trap); #endif - meth->db_create(BIF_P, tb); - ASSERT(cret == DB_ERROR_NONE); + + cret = meth->db_create(BIF_P, tb); + ASSERT(cret == DB_ERROR_NONE); (void)cret; make_btid(tb); @@ -1741,7 +1782,7 @@ BIF_RETTYPE ets_new_2(BIF_ALIST_2) db_lock(tb,LCK_WRITE); free_heir_data(tb); - tb->common.meth->db_free_table(tb); + tb->common.meth->db_free_empty_table(tb); db_unlock(tb,LCK_WRITE); table_dec_refc(tb, 0); BIF_ERROR(BIF_P, BADARG); @@ -1767,13 +1808,19 @@ BIF_RETTYPE ets_whereis_1(BIF_ALIST_1) { DbTable* tb; Eterm res; + Uint freason; if (is_not_atom(BIF_ARG_1)) { BIF_ERROR(BIF_P, BADARG); } - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ)) == NULL) { - BIF_RET(am_undefined); + if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ, &freason)) == NULL) { + if (freason == BADARG) + BIF_RET(am_undefined); + else { + //ToDo: Could we avoid this + return db_bif_fail(BIF_P, freason, BIF_ets_whereis_1, NULL); + } } res = make_tid(BIF_P, tb); @@ -1793,9 +1840,7 @@ BIF_RETTYPE ets_lookup_2(BIF_ALIST_2) CHECK_TABLES(); - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_lookup_2); cret = tb->common.meth->db_get(BIF_P, tb, BIF_ARG_2, &ret); @@ -1823,9 +1868,7 @@ BIF_RETTYPE ets_member_2(BIF_ALIST_2) CHECK_TABLES(); - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_member_2); cret = tb->common.meth->db_member(tb, BIF_ARG_2, &ret); @@ -1856,9 +1899,7 @@ BIF_RETTYPE ets_lookup_element_3(BIF_ALIST_3) CHECK_TABLES(); - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_lookup_element_3); if (is_not_small(BIF_ARG_3) || ((index = signed_val(BIF_ARG_3)) < 1)) { db_unlock(tb, LCK_READ); @@ -1896,9 +1937,7 @@ BIF_RETTYPE ets_delete_1(BIF_ALIST_1) CHECK_TABLES(); - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE, BIF_ets_delete_1); /* * Clear all access bits to prevent any ets operation to access the @@ -1941,7 +1980,8 @@ BIF_RETTYPE ets_delete_1(BIF_ALIST_1) reds -= free_fixations_locked(BIF_P, tb); db_unlock(tb, LCK_WRITE); - if (free_table_continue(BIF_P, tb, reds) < 0) { + reds = free_table_continue(BIF_P, tb, reds); + if (reds < 0) { /* * Package the DbTable* pointer into a bignum so that it can be safely * passed through a trap. We used to pass the DbTable* pointer directly @@ -1970,6 +2010,7 @@ BIF_RETTYPE ets_give_away_3(BIF_ALIST_3) Eterm to_pid = BIF_ARG_2; Eterm from_pid; DbTable* tb = NULL; + Uint freason; if (!is_internal_pid(to_pid)) { goto badarg; @@ -1979,10 +2020,11 @@ BIF_RETTYPE ets_give_away_3(BIF_ALIST_3) goto badarg; } - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL - || tb->common.owner != BIF_P->common.id) { - goto badarg; - } + if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE, &freason)) == NULL) + goto fail; + if (tb->common.owner != BIF_P->common.id) + goto badarg; + from_pid = tb->common.owner; if (to_pid == from_pid) { goto badarg; /* or should we be idempotent? return false maybe */ @@ -2001,9 +2043,12 @@ BIF_RETTYPE ets_give_away_3(BIF_ALIST_3) BIF_RET(am_true); badarg: + freason = BADARG; +fail: if (to_proc != NULL && to_proc != BIF_P) erts_proc_unlock(to_proc, to_locks); if (tb != NULL) db_unlock(tb, LCK_WRITE); - BIF_ERROR(BIF_P, BADARG); + + return db_bif_fail(BIF_P, freason, BIF_ets_give_away_3, NULL); } BIF_RETTYPE ets_setopts_2(BIF_ALIST_2) @@ -2054,11 +2099,13 @@ BIF_RETTYPE ets_setopts_2(BIF_ALIST_2) } } - if (tail != NIL - || (tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL - || tb->common.owner != BIF_P->common.id) { + if (tail != NIL) + goto badarg; + + DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE, BIF_ets_setopts_2); + + if (tb->common.owner != BIF_P->common.id) goto badarg; - } if (heir_data != THE_NON_VALUE) { free_heir_data(tb); @@ -2082,23 +2129,84 @@ badarg: } /* -** BIF to erase a whole table and release all memory it holds -*/ -BIF_RETTYPE ets_delete_all_objects_1(BIF_ALIST_1) + * Common for delete_all_objects and select_delete(DeleteAll). + */ +BIF_RETTYPE ets_internal_delete_all_2(BIF_ALIST_2) { + SWord initial_reds = ERTS_BIF_REDS_LEFT(BIF_P); + SWord reds = initial_reds; + Eterm nitems; DbTable* tb; CHECK_TABLES(); - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE, BIF_ets_internal_delete_all_2); - tb->common.meth->db_delete_all_objects(BIF_P, tb); + if (BIF_ARG_2 == am_undefined) { + nitems = erts_make_integer(erts_atomic_read_nob(&tb->common.nitems), + BIF_P); + + reds = tb->common.meth->db_delete_all_objects(BIF_P, tb, reds); + + ASSERT(!(tb->common.status & DB_BUSY)); + + if (reds < 0) { + /* + * Oboy, need to trap AND need to be atomic. + * Solved by cooperative trapping where every process trying to + * access this table (including this process) will "fail" to lookup + * the table and instead pitch in deleting objects + * (in delete_all_objects_continue) and then trap to self. + */ + ASSERT((tb->common.status & (DB_PRIVATE|DB_PROTECTED|DB_PUBLIC)) + == + (tb->common.type & (DB_PRIVATE|DB_PROTECTED|DB_PUBLIC))); + tb->common.status &= ~(DB_PRIVATE|DB_PROTECTED|DB_PUBLIC); + tb->common.status |= DB_BUSY; + db_unlock(tb, LCK_WRITE); + BUMP_ALL_REDS(BIF_P); + BIF_TRAP2(bif_export[BIF_ets_internal_delete_all_2], BIF_P, + BIF_ARG_1, nitems); + } + else { + /* Done, no trapping needed */ + BUMP_REDS(BIF_P, (initial_reds - reds)); + } + + } + else { + /* + * The table lookup succeeded and second argument is nitems + * and not 'undefined', which means we have trapped at least once + * and are now done. + */ + nitems = BIF_ARG_2; + } db_unlock(tb, LCK_WRITE); + BIF_RET(nitems); +} - BIF_RET(am_true); +static void delete_all_objects_continue(Process* p, DbTable* tb) +{ + SWord initial_reds = ERTS_BIF_REDS_LEFT(p); + SWord reds = initial_reds; + + ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock)); + + if ((tb->common.status & (DB_DELETE|DB_BUSY)) != DB_BUSY) + return; + + reds = tb->common.meth->db_delete_all_objects(p, tb, reds); + + if (reds < 0) { + BUMP_ALL_REDS(p); + } + else { + tb->common.status |= tb->common.type & (DB_PRIVATE|DB_PROTECTED|DB_PUBLIC); + tb->common.status &= ~DB_BUSY; + BUMP_REDS(p, (initial_reds - reds)); + } } /* @@ -2114,9 +2222,7 @@ BIF_RETTYPE ets_delete_2(BIF_ALIST_2) CHECK_TABLES(); - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE_REC)) == NULL) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_delete_2); cret = tb->common.meth->db_erase(tb,BIF_ARG_2,&ret); @@ -2143,9 +2249,8 @@ BIF_RETTYPE ets_delete_object_2(BIF_ALIST_2) CHECK_TABLES(); - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE_REC)) == NULL) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_delete_object_2); + if (is_not_tuple(BIF_ARG_2) || (arityval(*tuple_val(BIF_ARG_2)) < tb->common.keypos)) { db_unlock(tb, LCK_WRITE_REC); @@ -2168,7 +2273,7 @@ BIF_RETTYPE ets_delete_object_2(BIF_ALIST_2) /* ** This is for trapping, cannot be called directly. */ -static BIF_RETTYPE ets_select_delete_1(BIF_ALIST_1) +static BIF_RETTYPE ets_select_delete_trap_1(BIF_ALIST_1) { Process *p = BIF_P; Eterm a1 = BIF_ARG_1; @@ -2178,15 +2283,14 @@ static BIF_RETTYPE ets_select_delete_1(BIF_ALIST_1) Eterm ret; Eterm *tptr; db_lock_kind_t kind = LCK_WRITE_REC; - + CHECK_TABLES(); ASSERT(is_tuple(a1)); tptr = tuple_val(a1); ASSERT(arityval(*tptr) >= 1); - if ((tb = db_get_table(p, tptr[1], DB_WRITE, kind)) == NULL) { - BIF_ERROR(p,BADARG); - } + DB_TRAP_GET_TABLE(tb, tptr[1], DB_WRITE, kind, + &ets_select_delete_continue_exp); cret = tb->common.meth->db_select_delete_continue(p,tb,a1,&ret); @@ -2210,7 +2314,10 @@ static BIF_RETTYPE ets_select_delete_1(BIF_ALIST_1) } -BIF_RETTYPE ets_select_delete_2(BIF_ALIST_2) +/* + * ets:select_delete/2 without special case for "delete-all". + */ +BIF_RETTYPE ets_internal_select_delete_2(BIF_ALIST_2) { BIF_RETTYPE result; DbTable* tb; @@ -2220,20 +2327,8 @@ BIF_RETTYPE ets_select_delete_2(BIF_ALIST_2) CHECK_TABLES(); - if(eq(BIF_ARG_2, ms_delete_all)) { - int nitems; - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE)) == NULL) { - BIF_ERROR(BIF_P, BADARG); - } - nitems = erts_atomic_read_nob(&tb->common.nitems); - tb->common.meth->db_delete_all_objects(BIF_P, tb); - db_unlock(tb, LCK_WRITE); - BIF_RET(erts_make_integer(nitems,BIF_P)); - } + DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_internal_select_delete_2); - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE_REC)) == NULL) { - BIF_ERROR(BIF_P, BADARG); - } safety = ITERATION_SAFETY(BIF_P,tb); if (safety == ITER_UNSAFE) { local_fix_table(tb); @@ -2525,9 +2620,8 @@ BIF_RETTYPE ets_slot_2(BIF_ALIST_2) CHECK_TABLES(); - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_slot_2); + /* The slot number is checked in table specific code. */ cret = tb->common.meth->db_slot(BIF_P, tb, BIF_ARG_2, &ret); db_unlock(tb, LCK_READ); @@ -2547,41 +2641,53 @@ BIF_RETTYPE ets_slot_2(BIF_ALIST_2) BIF_RETTYPE ets_match_1(BIF_ALIST_1) { - return ets_select1(BIF_P, BIF_ARG_1); + return ets_select1(BIF_P, BIF_ets_match_1, BIF_ARG_1); } BIF_RETTYPE ets_match_2(BIF_ALIST_2) { + DbTable* tb; Eterm ms; DeclareTmpHeap(buff,8,BIF_P); Eterm *hp = buff; Eterm res; + DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_match_2); + UseTmpHeap(8,BIF_P); ms = CONS(hp, am_DollarDollar, NIL); hp += 2; ms = TUPLE3(hp, BIF_ARG_2, NIL, ms); hp += 4; ms = CONS(hp, ms, NIL); - res = ets_select2(BIF_P, BIF_ARG_1, ms); + res = ets_select2(BIF_P, tb, BIF_ARG_1, ms); UnUseTmpHeap(8,BIF_P); return res; } BIF_RETTYPE ets_match_3(BIF_ALIST_3) { + DbTable* tb; Eterm ms; + Sint chunk_size; DeclareTmpHeap(buff,8,BIF_P); Eterm *hp = buff; Eterm res; + /* Chunk size strictly greater than 0 */ + if (is_not_small(BIF_ARG_3) || (chunk_size = signed_val(BIF_ARG_3)) <= 0) { + BIF_ERROR(BIF_P, BADARG); + } + + DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_match_3); + UseTmpHeap(8,BIF_P); ms = CONS(hp, am_DollarDollar, NIL); hp += 2; ms = TUPLE3(hp, BIF_ARG_2, NIL, ms); hp += 4; ms = CONS(hp, ms, NIL); - res = ets_select3(BIF_P, BIF_ARG_1, ms, BIF_ARG_3); + res = ets_select3(BIF_P, tb, BIF_ARG_1, ms, chunk_size); UnUseTmpHeap(8,BIF_P); return res; } @@ -2589,34 +2695,35 @@ BIF_RETTYPE ets_match_3(BIF_ALIST_3) BIF_RETTYPE ets_select_3(BIF_ALIST_3) { - return ets_select3(BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3); + DbTable* tb; + Sint chunk_size; + + /* Chunk size strictly greater than 0 */ + if (is_not_small(BIF_ARG_3) || (chunk_size = signed_val(BIF_ARG_3)) <= 0) { + BIF_ERROR(BIF_P, BADARG); + } + + DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_3); + + return ets_select3(BIF_P, tb, BIF_ARG_1, BIF_ARG_2, chunk_size); } static BIF_RETTYPE -ets_select3(Process* p, Eterm arg1, Eterm arg2, Eterm arg3) +ets_select3(Process* p, DbTable* tb, Eterm tid, Eterm ms, Sint chunk_size) { BIF_RETTYPE result; - DbTable* tb; int cret; Eterm ret; - Sint chunk_size; enum DbIterSafety safety; CHECK_TABLES(); - /* Chunk size strictly greater than 0 */ - if (is_not_small(arg3) || (chunk_size = signed_val(arg3)) <= 0) { - BIF_ERROR(p, BADARG); - } - if ((tb = db_get_table(p, arg1, DB_READ, LCK_READ)) == NULL) { - BIF_ERROR(p, BADARG); - } safety = ITERATION_SAFETY(p,tb); if (safety == ITER_UNSAFE) { local_fix_table(tb); } - cret = tb->common.meth->db_select_chunk(p, tb, arg1, - arg2, chunk_size, + cret = tb->common.meth->db_select_chunk(p, tb, tid, + ms, chunk_size, 0 /* not reversed */, &ret); if (DID_TRAP(p,ret) && safety != ITER_SAFE) { @@ -2662,9 +2769,8 @@ static BIF_RETTYPE ets_select_trap_1(BIF_ALIST_1) tptr = tuple_val(a1); ASSERT(arityval(*tptr) >= 1); - if ((tb = db_get_table(p, tptr[1], DB_READ, kind)) == NULL) { - BIF_ERROR(p, BADARG); - } + DB_TRAP_GET_TABLE(tb, tptr[1], DB_READ, kind, + &ets_select_continue_exp); cret = tb->common.meth->db_select_continue(p, tb, a1, &ret); @@ -2694,10 +2800,10 @@ static BIF_RETTYPE ets_select_trap_1(BIF_ALIST_1) BIF_RETTYPE ets_select_1(BIF_ALIST_1) { - return ets_select1(BIF_P, BIF_ARG_1); + return ets_select1(BIF_P, BIF_ets_select_1, BIF_ARG_1); } -static BIF_RETTYPE ets_select1(Process *p, Eterm arg1) +static BIF_RETTYPE ets_select1(Process *p, int bif_ix, Eterm arg1) { BIF_RETTYPE result; DbTable* tb; @@ -2719,10 +2825,10 @@ static BIF_RETTYPE ets_select1(Process *p, Eterm arg1) BIF_ERROR(p, BADARG); } tptr = tuple_val(arg1); - if (arityval(*tptr) < 1 || - (tb = db_get_table(p, tptr[1], DB_READ, LCK_READ)) == NULL) { - BIF_ERROR(p, BADARG); - } + if (arityval(*tptr) < 1) + BIF_ERROR(p, BADARG); + + DB_GET_TABLE(tb, tptr[1], DB_READ, LCK_READ, bif_ix, NULL, p); safety = ITERATION_SAFETY(p,tb); if (safety == ITER_UNSAFE) { @@ -2758,33 +2864,27 @@ static BIF_RETTYPE ets_select1(Process *p, Eterm arg1) BIF_RETTYPE ets_select_2(BIF_ALIST_2) { - return ets_select2(BIF_P, BIF_ARG_1, BIF_ARG_2); + DbTable* tb; + DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_2); + return ets_select2(BIF_P, tb, BIF_ARG_1, BIF_ARG_2); } static BIF_RETTYPE -ets_select2(Process* p, Eterm arg1, Eterm arg2) +ets_select2(Process* p, DbTable* tb, Eterm tid, Eterm ms) { BIF_RETTYPE result; - DbTable* tb; int cret; enum DbIterSafety safety; Eterm ret; CHECK_TABLES(); - /* - * Make sure that the table exists. - */ - - if ((tb = db_get_table(p, arg1, DB_READ, LCK_READ)) == NULL) { - BIF_ERROR(p, BADARG); - } safety = ITERATION_SAFETY(p,tb); if (safety == ITER_UNSAFE) { local_fix_table(tb); } - cret = tb->common.meth->db_select(p, tb, arg1, arg2, 0, &ret); + cret = tb->common.meth->db_select(p, tb, tid, ms, 0, &ret); if (DID_TRAP(p,ret) && safety != ITER_SAFE) { fix_table_locked(p, tb); @@ -2827,9 +2927,9 @@ static BIF_RETTYPE ets_select_count_1(BIF_ALIST_1) tptr = tuple_val(a1); ASSERT(arityval(*tptr) >= 1); - if ((tb = db_get_table(p, tptr[1], DB_READ, kind)) == NULL) { - BIF_ERROR(p, BADARG); - } + + DB_TRAP_GET_TABLE(tb, tptr[1], DB_READ, kind, + &ets_select_count_continue_exp); cret = tb->common.meth->db_select_count_continue(p, tb, a1, &ret); @@ -2864,13 +2964,9 @@ BIF_RETTYPE ets_select_count_2(BIF_ALIST_2) Eterm ret; CHECK_TABLES(); - /* - * Make sure that the table exists. - */ - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_count_2); + safety = ITERATION_SAFETY(BIF_P,tb); if (safety == ITER_UNSAFE) { local_fix_table(tb); @@ -2920,9 +3016,8 @@ static BIF_RETTYPE ets_select_replace_1(BIF_ALIST_1) tptr = tuple_val(a1); ASSERT(arityval(*tptr) >= 1); - if ((tb = db_get_table(p, tptr[1], DB_WRITE, kind)) == NULL) { - BIF_ERROR(p,BADARG); - } + DB_TRAP_GET_TABLE(tb, tptr[1], DB_WRITE, kind, + &ets_select_replace_continue_exp); cret = tb->common.meth->db_select_replace_continue(p,tb,a1,&ret); @@ -2956,9 +3051,7 @@ BIF_RETTYPE ets_select_replace_2(BIF_ALIST_2) CHECK_TABLES(); - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_WRITE, LCK_WRITE_REC)) == NULL) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_select_replace_2); if (tb->common.status & DB_BAG) { /* Bag implementation presented both semantic consistency @@ -3009,13 +3102,8 @@ BIF_RETTYPE ets_select_reverse_3(BIF_ALIST_3) Sint chunk_size; CHECK_TABLES(); - /* - * Make sure that the table exists. - */ - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_reverse_3); /* Chunk size strictly greater than 0 */ if (is_not_small(BIF_ARG_3) || (chunk_size = signed_val(BIF_ARG_3)) <= 0) { @@ -3053,7 +3141,7 @@ BIF_RETTYPE ets_select_reverse_3(BIF_ALIST_3) BIF_RETTYPE ets_select_reverse_1(BIF_ALIST_1) { - return ets_select1(BIF_P, BIF_ARG_1); + return ets_select1(BIF_P, BIF_ets_select_reverse_1, BIF_ARG_1); } BIF_RETTYPE ets_select_reverse_2(BIF_ALIST_2) @@ -3065,13 +3153,9 @@ BIF_RETTYPE ets_select_reverse_2(BIF_ALIST_2) Eterm ret; CHECK_TABLES(); - /* - * Make sure that the table exists. - */ - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_READ, LCK_READ)) == NULL) { - BIF_ERROR(BIF_P, BADARG); - } + DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_reverse_2); + safety = ITERATION_SAFETY(BIF_P,tb); if (safety == ITER_UNSAFE) { local_fix_table(tb); @@ -3103,45 +3187,63 @@ BIF_RETTYPE ets_select_reverse_2(BIF_ALIST_2) /* -** ets:match_object(Continuation), ets:match_object(Table, Pattern), ets:match_object(Table,Pattern,ChunkSize) +** ets:match_object(Continuation) */ BIF_RETTYPE ets_match_object_1(BIF_ALIST_1) { - return ets_select1(BIF_P, BIF_ARG_1); + return ets_select1(BIF_P, BIF_ets_match_object_1, BIF_ARG_1); } +/* +** ets:match_object(Table, Pattern) +*/ BIF_RETTYPE ets_match_object_2(BIF_ALIST_2) { + DbTable* tb; Eterm ms; DeclareTmpHeap(buff,8,BIF_P); Eterm *hp = buff; Eterm res; + DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_match_object_2); + UseTmpHeap(8,BIF_P); ms = CONS(hp, am_DollarUnderscore, NIL); hp += 2; ms = TUPLE3(hp, BIF_ARG_2, NIL, ms); hp += 4; ms = CONS(hp, ms, NIL); - res = ets_select2(BIF_P, BIF_ARG_1, ms); + res = ets_select2(BIF_P, tb, BIF_ARG_1, ms); UnUseTmpHeap(8,BIF_P); return res; } +/* +** ets:match_object(Table,Pattern,ChunkSize) +*/ BIF_RETTYPE ets_match_object_3(BIF_ALIST_3) { + DbTable* tb; + Sint chunk_size; Eterm ms; DeclareTmpHeap(buff,8,BIF_P); Eterm *hp = buff; Eterm res; + /* Chunk size strictly greater than 0 */ + if (is_not_small(BIF_ARG_3) || (chunk_size = signed_val(BIF_ARG_3)) <= 0) { + BIF_ERROR(BIF_P, BADARG); + } + + DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_match_object_3); + UseTmpHeap(8,BIF_P); ms = CONS(hp, am_DollarUnderscore, NIL); hp += 2; ms = TUPLE3(hp, BIF_ARG_2, NIL, ms); hp += 4; ms = CONS(hp, ms, NIL); - res = ets_select3(BIF_P, BIF_ARG_1, ms, BIF_ARG_3); + res = ets_select3(BIF_P, tb, BIF_ARG_1, ms, chunk_size); UnUseTmpHeap(8,BIF_P); return res; } @@ -3162,16 +3264,17 @@ BIF_RETTYPE ets_info_1(BIF_ALIST_1) Eterm res; int i; Eterm* hp; + Uint freason; /*Process* rp = NULL;*/ /* If/when we implement lockless private tables: Eterm owner; */ - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ)) == NULL) { - if (is_atom(BIF_ARG_1) || is_ref(BIF_ARG_1)) { + if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ, &freason)) == NULL) { + if (freason == BADARG && (is_atom(BIF_ARG_1) || is_ref(BIF_ARG_1))) BIF_RET(am_undefined); - } - BIF_ERROR(BIF_P, BADARG); + else + return db_bif_fail(BIF_P, freason, BIF_ets_info_1, NULL); } /* If/when we implement lockless private tables: @@ -3228,12 +3331,13 @@ BIF_RETTYPE ets_info_2(BIF_ALIST_2) { DbTable* tb; Eterm ret = THE_NON_VALUE; + Uint freason; - if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ)) == NULL) { - if (is_atom(BIF_ARG_1) || is_ref(BIF_ARG_1)) { + if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ, &freason)) == NULL) { + if (freason == BADARG && (is_atom(BIF_ARG_1) || is_ref(BIF_ARG_1))) BIF_RET(am_undefined); - } - BIF_ERROR(BIF_P, BADARG); + else + return db_bif_fail(BIF_P, freason, BIF_ets_info_2, NULL); } ret = table_info(BIF_P, tb, BIF_ARG_2); db_unlock(tb, LCK_READ); @@ -3321,7 +3425,6 @@ int erts_ets_rwmtx_spin_count = -1; void init_db(ErtsDbSpinCount db_spin_count) { int i; - Eterm *hp; unsigned bits; size_t size; @@ -3403,7 +3506,7 @@ void init_db(ErtsDbSpinCount db_spin_count) /* Non visual BIF to trap to. */ erts_init_trap_export(&ets_select_delete_continue_exp, am_ets, am_atom_put("delete_trap",11), 1, - &ets_select_delete_1); + &ets_select_delete_trap_1); /* Non visual BIF to trap to. */ erts_init_trap_export(&ets_select_count_continue_exp, @@ -3424,13 +3527,6 @@ void init_db(ErtsDbSpinCount db_spin_count) erts_init_trap_export(&ets_delete_continue_exp, am_ets, am_atom_put("delete_trap",11), 1, &ets_delete_trap); - - hp = ms_delete_all_buff; - ms_delete_all = CONS(hp, am_true, NIL); - hp += 2; - ms_delete_all = TUPLE3(hp,am_Underscore,NIL,ms_delete_all); - hp +=4; - ms_delete_all = CONS(hp, ms_delete_all,NIL); } void @@ -3792,7 +3888,8 @@ unlocked: erts_rwmtx_runlock(&tb->common.rwlock); erts_rwmtx_rwlock(&tb->common.rwlock); *kind_p = LCK_WRITE; - if (tb->common.status & DB_DELETE) return; + if (tb->common.status & (DB_DELETE|DB_BUSY)) + return; } db_unfix_table_hash(&(tb->hash)); } @@ -3940,7 +4037,8 @@ static BIF_RETTYPE ets_delete_trap(BIF_ALIST_1) ASSERT(*ptr == make_pos_bignum_header(1)); - if (free_table_continue(BIF_P, tb, reds) < 0) { + reds = free_table_continue(BIF_P, tb, reds); + if (reds < 0) { BUMP_ALL_REDS(BIF_P); BIF_TRAP1(&ets_delete_continue_exp, BIF_P, cont); } @@ -4321,5 +4419,8 @@ void erts_lcnt_update_db_locks(int enable) { erts_schedule_multi_misc_aux_work(0, erts_no_schedulers, &lcnt_update_db_locks_per_sched, (void*)(UWord)enable); } - #endif /* ERTS_ENABLE_LOCK_COUNT */ + +#ifdef ETS_DBG_FORCE_TRAP +erts_aint_t erts_ets_dbg_force_trap = 0; +#endif diff --git a/erts/emulator/beam/erl_db.h b/erts/emulator/beam/erl_db.h index eb6da2c9fb..db86c81914 100644 --- a/erts/emulator/beam/erl_db.h +++ b/erts/emulator/beam/erl_db.h @@ -135,6 +135,10 @@ void erts_lcnt_enable_db_lock_count(DbTable *tb, int enable); void erts_lcnt_update_db_locks(int enable); #endif +#ifdef ETS_DBG_FORCE_TRAP +extern erts_aint_t erts_ets_dbg_force_trap; +#endif + #endif /* ERL_DB_H__ */ #if defined(ERTS_WANT_DB_INTERNAL__) && !defined(ERTS_HAVE_DB_INTERNAL__) diff --git a/erts/emulator/beam/erl_db_hash.c b/erts/emulator/beam/erl_db_hash.c index cb5c496e90..74d63325e6 100644 --- a/erts/emulator/beam/erl_db_hash.c +++ b/erts/emulator/beam/erl_db_hash.c @@ -21,6 +21,7 @@ /* ** Implementation of unordered ETS tables. ** The tables are implemented as linear dynamic hash tables. +** https://en.wikipedia.org/wiki/Linear_hashing */ /* SMP: @@ -148,20 +149,14 @@ static ERTS_INLINE Uint hash_to_ix(DbTableHash* tb, HashValue hval) return ix; } -/* Remember a slot containing a pseudo-deleted item (INVALID_HASH) - * Return false if we got raced by unfixing thread - * and the object should be deleted for real. - */ -static ERTS_INLINE int add_fixed_deletion(DbTableHash* tb, int ix, - erts_aint_t fixated_by_me) + +static ERTS_INLINE int link_fixdel(DbTableHash* tb, + FixedDeletion* fixd, + erts_aint_t fixated_by_me) { erts_aint_t was_next; erts_aint_t exp_next; - FixedDeletion* fixd = (FixedDeletion*) erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL, - (DbTable *) tb, - sizeof(FixedDeletion)); - ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion)); - fixd->slot = ix; + was_next = erts_atomic_read_acqb(&tb->fixdel); do { /* Lockless atomic insertion in linked list: */ if (NFIXED(tb) <= fixated_by_me) { @@ -178,14 +173,33 @@ static ERTS_INLINE int add_fixed_deletion(DbTableHash* tb, int ix, return 1; } +/* Remember a slot containing a pseudo-deleted item + * Return false if we got raced by unfixing thread + * and the object should be deleted for real. + */ +static int add_fixed_deletion(DbTableHash* tb, int ix, + erts_aint_t fixated_by_me) +{ + FixedDeletion* fixd = (FixedDeletion*) erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL, + (DbTable *) tb, + sizeof(FixedDeletion)); + ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion)); + fixd->slot = ix; + fixd->all = 0; + return link_fixdel(tb, fixd, fixated_by_me); +} + + +static ERTS_INLINE int is_pseudo_deleted(HashDbTerm* p) +{ + return p->pseudo_deleted; +} -#define MAX_HASH 0xEFFFFFFFUL -#define INVALID_HASH 0xFFFFFFFFUL /* optimised version of make_hash (normal case? atomic key) */ #define MAKE_HASH(term) \ ((is_atom(term) ? (atom_tab(atom_val(term))->slot.bucket.hvalue) : \ - make_internal_hash(term, 0)) % MAX_HASH) + make_internal_hash(term, 0)) & MAX_HASH_MASK) # define DB_HASH_LOCK_MASK (DB_HASH_LOCK_CNT-1) # define GET_LOCK(tb,hval) (&(tb)->locks->lck_vec[(hval) & DB_HASH_LOCK_MASK].lck) @@ -270,17 +284,22 @@ static ERTS_INLINE Sint next_slot_w(DbTableHash* tb, Uint ix, } -/* - * Some special binary flags - */ -#define BIN_FLAG_ALL_OBJECTS BIN_FLAG_USR1 - static ERTS_INLINE void free_term(DbTableHash *tb, HashDbTerm* p) { db_free_term((DbTable*)tb, p, offsetof(HashDbTerm, dbterm)); } +static ERTS_INLINE void free_term_list(DbTableHash *tb, HashDbTerm* p) +{ + while (p) { + HashDbTerm* next = p->next; + free_term(tb, p); + p = next; + } +} + + /* * Local types */ @@ -290,9 +309,6 @@ struct mp_prefound { }; struct mp_info { - int all_objects; /* True if complete objects are always - * returned from the match_spec (can use - * copy_shallow on the return value) */ int something_can_match; /* The match_spec is not "impossible" */ int key_given; struct mp_prefound dlists[10]; /* Default list of "pre-found" buckets */ @@ -402,7 +418,7 @@ static void db_print_hash(fmtfn_t to, void *to_arg, int show, DbTable *tbl); -static int db_free_table_hash(DbTable *tbl); +static int db_free_empty_table_hash(DbTable *tbl); static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds); @@ -411,7 +427,7 @@ static void db_foreach_offheap_hash(DbTable *, void (*)(ErlOffHeap *, void *), void *); -static int db_delete_all_objects_hash(Process* p, DbTable* tbl); +static SWord db_delete_all_objects_hash(Process* p, DbTable* tbl, SWord reds); #ifdef HARDDEBUG static void db_check_table_hash(DbTableHash *tb); #endif @@ -436,7 +452,8 @@ static ERTS_INLINE void try_shrink(DbTableHash* tb) static ERTS_INLINE int has_live_key(DbTableHash* tb, HashDbTerm* b, Eterm key, HashValue hval) { - if (b->hvalue != hval) return 0; + if (b->hvalue != hval || is_pseudo_deleted(b)) + return 0; else { Eterm itemKey = GETKEY(tb, b->dbterm.tpl); ASSERT(!is_header(itemKey)); @@ -449,7 +466,8 @@ static ERTS_INLINE int has_live_key(DbTableHash* tb, HashDbTerm* b, static ERTS_INLINE int has_key(DbTableHash* tb, HashDbTerm* b, Eterm key, HashValue hval) { - if (b->hvalue != hval && b->hvalue != INVALID_HASH) return 0; + if (b->hvalue != hval) + return 0; else { Eterm itemKey = GETKEY(tb, b->dbterm.tpl); ASSERT(!is_header(itemKey)); @@ -513,7 +531,7 @@ DbTableMethod db_hash = db_select_replace_continue_hash, db_take_hash, db_delete_all_objects_hash, - db_free_table_hash, + db_free_empty_table_hash, db_free_table_continue_hash, db_print_hash, db_foreach_offheap_hash, @@ -570,51 +588,61 @@ SWord db_unfix_table_hash(DbTableHash *tb) SWord work = 0; ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock) - || (erts_lc_rwmtx_is_rlocked(&tb->common.rwlock) - && !tb->common.is_thread_safe)); + || (erts_lc_rwmtx_is_rlocked(&tb->common.rwlock) + && !tb->common.is_thread_safe)); restart: fixdel = (FixedDeletion*) erts_atomic_xchg_mb(&tb->fixdel, - (erts_aint_t) NULL); - while (fixdel != NULL) { - FixedDeletion *fx = fixdel; - int ix = fx->slot; - HashDbTerm **bp; - HashDbTerm *b; - erts_rwmtx_t* lck = WLOCK_HASH(tb,ix); - - if (IS_FIXED(tb)) { /* interrupted by fixer */ - WUNLOCK_HASH(lck); - restore_fixdel(tb,fixdel); - if (!IS_FIXED(tb)) { - goto restart; /* unfixed again! */ - } - return work; - } - if (ix < NACTIVE(tb)) { - bp = &BUCKET(tb, ix); - b = *bp; - - while (b != NULL) { - if (b->hvalue == INVALID_HASH) { - *bp = b->next; - free_term(tb, b); - work++; - b = *bp; - } else { - bp = &b->next; - b = b->next; - } - } - } - /* else slot has been joined and purged by shrink() */ - WUNLOCK_HASH(lck); - fixdel = fx->next; - erts_db_free(ERTS_ALC_T_DB_FIX_DEL, - (DbTable *) tb, - (void *) fx, - sizeof(FixedDeletion)); - ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion)); - work++; + (erts_aint_t) NULL); + while (fixdel) { + FixedDeletion *free_me; + + do { + HashDbTerm **bp; + HashDbTerm *b; + HashDbTerm *free_us = NULL; + erts_rwmtx_t* lck; + + lck = WLOCK_HASH(tb, fixdel->slot); + + if (IS_FIXED(tb)) { /* interrupted by fixer */ + WUNLOCK_HASH(lck); + restore_fixdel(tb,fixdel); + if (!IS_FIXED(tb)) { + goto restart; /* unfixed again! */ + } + return work; + } + if (fixdel->slot < NACTIVE(tb)) { + bp = &BUCKET(tb, fixdel->slot); + b = *bp; + + while (b != NULL) { + if (is_pseudo_deleted(b)) { + HashDbTerm* nxt = b->next; + b->next = free_us; + free_us = b; + work++; + b = *bp = nxt; + } else { + bp = &b->next; + b = b->next; + } + } + } + /* else slot has been joined and purged by shrink() */ + WUNLOCK_HASH(lck); + free_term_list(tb, free_us); + + }while (fixdel->all && fixdel->slot-- > 0); + + free_me = fixdel; + fixdel = fixdel->next; + erts_db_free(ERTS_ALC_T_DB_FIX_DEL, + (DbTable *) tb, + (void *) free_me, + sizeof(FixedDeletion)); + ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion)); + work++; } /* ToDo: Maybe try grow/shrink the table as well */ @@ -764,8 +792,9 @@ int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail) */ if (tb->common.status & DB_SET) { HashDbTerm* bnext = b->next; - if (b->hvalue == INVALID_HASH) { + if (is_pseudo_deleted(b)) { erts_atomic_inc_nob(&tb->common.nitems); + b->pseudo_deleted = 0; } else if (key_clash_fail) { ret = DB_ERROR_BADKEY; @@ -773,14 +802,14 @@ int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail) } q = replace_dbterm(tb, b, obj); q->next = bnext; - q->hvalue = hval; /* In case of INVALID_HASH */ + ASSERT(q->hvalue == hval); *bp = q; goto Ldone; } else if (key_clash_fail) { /* && (DB_BAG || DB_DUPLICATE_BAG) */ q = b; do { - if (q->hvalue != INVALID_HASH) { + if (!is_pseudo_deleted(q)) { ret = DB_ERROR_BADKEY; goto Ldone; } @@ -792,9 +821,10 @@ int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail) q = b; do { if (db_eq(&tb->common,obj,&q->dbterm)) { - if (q->hvalue == INVALID_HASH) { + if (is_pseudo_deleted(q)) { erts_atomic_inc_nob(&tb->common.nitems); - q->hvalue = hval; + q->pseudo_deleted = 0; + ASSERT(q->hvalue == hval); if (q != b) { /* must move to preserve key insertion order */ *qp = q->next; q->next = b; @@ -812,6 +842,7 @@ int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail) Lnew: q = new_dbterm(tb, obj); q->hvalue = hval; + q->pseudo_deleted = 0; q->next = b; *bp = q; nitems = erts_atomic_inc_read_nob(&tb->common.nitems); @@ -839,7 +870,7 @@ get_term_list(Process *p, DbTableHash *tb, Eterm key, HashValue hval, if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) { while (b2 && has_key(tb, b2, key, hval)) { - if (b2->hvalue != INVALID_HASH) + if (!is_pseudo_deleted(b2)) sz += b2->dbterm.size + 2; b2 = b2->next; @@ -935,7 +966,7 @@ static int db_get_element_hash(Process *p, DbTable *tbl, while(b2 != NULL && has_key(tb,b2,key,hval)) { if (ndex > arityval(b2->dbterm.tpl[0]) - && b2->hvalue != INVALID_HASH) { + && !is_pseudo_deleted(b2)) { retval = DB_ERROR_BADITEM; goto done; } @@ -943,7 +974,7 @@ static int db_get_element_hash(Process *p, DbTable *tbl, } b = b1; while(b != b2) { - if (b->hvalue != INVALID_HASH) { + if (!is_pseudo_deleted(b)) { Eterm *hp; Eterm copy = db_copy_element_from_ets(&tb->common, p, &b->dbterm, ndex, &hp, 2); @@ -978,6 +1009,7 @@ int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret) int ix; HashDbTerm** bp; HashDbTerm* b; + HashDbTerm* free_us = NULL; erts_rwmtx_t* lck; int nitems_diff = 0; @@ -993,16 +1025,17 @@ int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret) if (nitems_diff == -1 && IS_FIXED(tb) && add_fixed_deletion(tb, ix, 0)) { /* Pseudo remove (no need to keep several of same key) */ - b->hvalue = INVALID_HASH; + b->pseudo_deleted = 1; } else { - *bp = b->next; - free_term(tb, b); - b = *bp; + HashDbTerm* next = b->next; + b->next = free_us; + free_us = b; + b = *bp = next; continue; } } else { - if (nitems_diff && b->hvalue != INVALID_HASH) + if (nitems_diff && !is_pseudo_deleted(b)) break; } bp = &b->next; @@ -1013,6 +1046,7 @@ int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret) erts_atomic_add_nob(&tb->common.nitems, nitems_diff); try_shrink(tb); } + free_term_list(tb, free_us); *ret = am_true; return DB_ERROR_NONE; } @@ -1027,6 +1061,7 @@ static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret) int ix; HashDbTerm** bp; HashDbTerm* b; + HashDbTerm* free_us = NULL; erts_rwmtx_t* lck; int nitems_diff = 0; int nkeys = 0; @@ -1045,13 +1080,14 @@ static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret) if (db_eq(&tb->common,object, &b->dbterm)) { --nitems_diff; if (nkeys==1 && IS_FIXED(tb) && add_fixed_deletion(tb,ix,0)) { - b->hvalue = INVALID_HASH; /* Pseudo remove */ + b->pseudo_deleted = 1; bp = &b->next; b = b->next; } else { - *bp = b->next; - free_term(tb, b); - b = *bp; + HashDbTerm* next = b->next; + b->next = free_us; + free_us = b; + b = *bp = next; } if (tb->common.status & (DB_DUPLICATE_BAG)) { continue; @@ -1060,7 +1096,7 @@ static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret) } } } - else if (nitems_diff && b->hvalue != INVALID_HASH) { + else if (nitems_diff && !is_pseudo_deleted(b)) { break; } bp = &b->next; @@ -1071,6 +1107,7 @@ static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret) erts_atomic_add_nob(&tb->common.nitems, nitems_diff); try_shrink(tb); } + free_term_list(tb, free_us); *ret = am_true; return DB_ERROR_NONE; } @@ -1106,28 +1143,19 @@ static int db_slot_hash(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret) /* - * This is just here so I can take care of the return value - * that is to be sent during a trap (the BIF_TRAP macros explicitly returns) - */ -static BIF_RETTYPE bif_trap1(Export *bif, - Process *p, - Eterm p1) -{ - BIF_TRAP1(bif, p, p1); -} - - -/* * Match traversal callbacks */ +typedef struct match_callbacks_t_ match_callbacks_t; +struct match_callbacks_t_ +{ /* Called when no match is possible. * context_ptr: Pointer to context * ret: Pointer to traversal function term return. * * Both the direct return value and 'ret' are used as the traversal function return values. */ -typedef int (*mtraversal_on_nothing_can_match_t)(void* context_ptr, Eterm* ret); + int (*on_nothing_can_match)(match_callbacks_t* ctx, Eterm* ret); /* Called for each match result. * context_ptr: Pointer to context @@ -1138,8 +1166,8 @@ typedef int (*mtraversal_on_nothing_can_match_t)(void* context_ptr, Eterm* ret); * * Should return 1 for successful match, 0 otherwise. */ -typedef int (*mtraversal_on_match_res_t)(void* context_ptr, Sint slot_ix, HashDbTerm*** current_ptr_ptr, - Eterm match_res); + int (*on_match_res)(match_callbacks_t* ctx, Sint slot_ix, + HashDbTerm*** current_ptr_ptr, Eterm match_res); /* Called when either we've matched enough elements in this cycle or EOT was reached. * context_ptr: Pointer to context @@ -1152,8 +1180,8 @@ typedef int (*mtraversal_on_match_res_t)(void* context_ptr, Sint slot_ix, HashDb * Both the direct return value and 'ret' are used as the traversal function return values. * If *mpp is set to NULL, it won't be deallocated (useful for trapping.) */ -typedef int (*mtraversal_on_loop_ended_t)(void* context_ptr, Sint slot_ix, Sint got, - Sint iterations_left, Binary** mpp, Eterm* ret); + int (*on_loop_ended)(match_callbacks_t* ctx, Sint slot_ix, Sint got, + Sint iterations_left, Binary** mpp, Eterm* ret); /* Called when it's time to trap * context_ptr: Pointer to context @@ -1165,7 +1193,11 @@ typedef int (*mtraversal_on_loop_ended_t)(void* context_ptr, Sint slot_ix, Sint * Both the direct return value and 'ret' are used as the traversal function return values. * If *mpp is set to NULL, it won't be deallocated (useful for trapping.) */ -typedef int (*mtraversal_on_trap_t)(void* context_ptr, Sint slot_ix, Sint got, Binary** mpp, Eterm* ret); + int (*on_trap)(match_callbacks_t* ctx, Sint slot_ix, Sint got, Binary** mpp, + Eterm* ret); + +}; + /* * Begin hash table match traversal @@ -1178,11 +1210,7 @@ static int match_traverse(Process* p, DbTableHash* tb, Eterm** hpp, /* Heap */ int lock_for_write, /* Set to 1 if we're going to delete or modify existing terms */ - mtraversal_on_nothing_can_match_t on_nothing_can_match, - mtraversal_on_match_res_t on_match_res, - mtraversal_on_loop_ended_t on_loop_ended, - mtraversal_on_trap_t on_trap, - void* context_ptr, /* State for callbacks above */ + match_callbacks_t* ctx, Eterm* ret) { Sint slot_ix; /* Slot index */ @@ -1212,14 +1240,10 @@ static int match_traverse(Process* p, DbTableHash* tb, if (!mpi.something_can_match) { /* Can't possibly match anything */ - ret_value = on_nothing_can_match(context_ptr, ret); + ret_value = ctx->on_nothing_can_match(ctx, ret); goto done; } - if (mpi.all_objects) { - mpi.mp->intern.flags |= BIN_FLAG_ALL_OBJECTS; - } - /* * Look for initial slot / bucket */ @@ -1235,7 +1259,8 @@ static int match_traverse(Process* p, DbTableHash* tb, } slot_ix = next_slot_function(tb,slot_ix,&lck); if (slot_ix == 0) { - ret_value = on_loop_ended(context_ptr, slot_ix, got, iterations_left, &mpi.mp, ret); + ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, + &mpi.mp, ret); goto done; } } @@ -1253,11 +1278,11 @@ static int match_traverse(Process* p, DbTableHash* tb, */ for(;;) { if (*current_ptr != NULL) { - if ((*current_ptr)->hvalue != INVALID_HASH) { - match_res = db_match_dbterm(&tb->common, p, mpi.mp, 0, + if (!is_pseudo_deleted(*current_ptr)) { + match_res = db_match_dbterm(&tb->common, p, mpi.mp, &(*current_ptr)->dbterm, hpp, 2); saved_current = *current_ptr; - if (on_match_res(context_ptr, slot_ix, ¤t_ptr, match_res)) { + if (ctx->on_match_res(ctx, slot_ix, ¤t_ptr, match_res)) { ++got; } --iterations_left; @@ -1271,7 +1296,7 @@ static int match_traverse(Process* p, DbTableHash* tb, else if (mpi.key_given) { /* Key is bound */ unlock_hash_function(lck); if (current_list_pos == mpi.num_lists) { - ret_value = on_loop_ended(context_ptr, -1, got, iterations_left, &mpi.mp, ret); + ret_value = ctx->on_loop_ended(ctx, -1, got, iterations_left, &mpi.mp, ret); goto done; } else { slot_ix = mpi.lists[current_list_pos].ix; @@ -1296,18 +1321,18 @@ static int match_traverse(Process* p, DbTableHash* tb, * Since many heap fragments will make the GC slower, trap and GC now. */ unlock_hash_function(lck); - ret_value = on_trap(context_ptr, slot_ix, got, &mpi.mp, ret); + ret_value = ctx->on_trap(ctx, slot_ix, got, &mpi.mp, ret); goto done; } current_ptr = &BUCKET(tb,slot_ix); } } - ret_value = on_loop_ended(context_ptr, slot_ix, got, iterations_left, &mpi.mp, ret); + ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, &mpi.mp, ret); done: /* We should only jump directly to this label if - * we've already called on_nothing_can_match / on_loop_ended / on_trap + * we've already called ctx->nothing_can_match / loop_ended / trap */ if (mpi.mp != NULL) { erts_bin_free(mpi.mp); @@ -1332,13 +1357,9 @@ static int match_traverse_continue(Process* p, DbTableHash* tb, Binary** mpp, /* Existing match program */ int lock_for_write, /* Set to 1 if we're going to delete or modify existing terms */ - mtraversal_on_match_res_t on_match_res, - mtraversal_on_loop_ended_t on_loop_ended, - mtraversal_on_trap_t on_trap, - void* context_ptr, /* For callbacks */ + match_callbacks_t* ctx, Eterm* ret) { - int all_objects = (*mpp)->intern.flags & BIN_FLAG_ALL_OBJECTS; HashDbTerm** current_ptr; /* Refers to either the bucket pointer or * the 'next' pointer in the previous term */ @@ -1362,7 +1383,7 @@ static int match_traverse_continue(Process* p, DbTableHash* tb, || (chunk_size && got >= chunk_size)) { /* Already got all or enough in the match_list */ - ret_value = on_loop_ended(context_ptr, slot_ix, got, iterations_left, mpp, ret); + ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, mpp, ret); goto done; } @@ -1380,11 +1401,11 @@ static int match_traverse_continue(Process* p, DbTableHash* tb, current_ptr = &BUCKET(tb,slot_ix); for(;;) { if (*current_ptr != NULL) { - if ((*current_ptr)->hvalue != INVALID_HASH) { - match_res = db_match_dbterm(&tb->common, p, *mpp, all_objects, + if (!is_pseudo_deleted(*current_ptr)) { + match_res = db_match_dbterm(&tb->common, p, *mpp, &(*current_ptr)->dbterm, hpp, 2); saved_current = *current_ptr; - if (on_match_res(context_ptr, slot_ix, ¤t_ptr, match_res)) { + if (ctx->on_match_res(ctx, slot_ix, ¤t_ptr, match_res)) { ++got; } --iterations_left; @@ -1410,14 +1431,14 @@ static int match_traverse_continue(Process* p, DbTableHash* tb, * Since many heap fragments will make the GC slower, trap and GC now. */ unlock_hash_function(lck); - ret_value = on_trap(context_ptr, slot_ix, got, mpp, ret); + ret_value = ctx->on_trap(ctx, slot_ix, got, mpp, ret); goto done; } current_ptr = &BUCKET(tb,slot_ix); } } - ret_value = on_loop_ended(context_ptr, slot_ix, got, iterations_left, mpp, ret); + ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, mpp, ret); done: /* We should only jump directly to this label if @@ -1434,7 +1455,7 @@ done: * as well as their continuation-handling counterparts. */ -static ERTS_INLINE int on_mtraversal_simple_trap(Export* trap_function, +static ERTS_INLINE int on_simple_trap(Export* trap_function, Process* p, DbTableHash* tb, Eterm tid, @@ -1480,16 +1501,16 @@ static ERTS_INLINE int on_mtraversal_simple_trap(Export* trap_function, make_small(slot_ix), mpb, egot); - *ret = bif_trap1(trap_function, p, continuation); + ERTS_BIF_PREP_TRAP1(*ret, trap_function, p, continuation); return DB_ERROR_NONE; } -static ERTS_INLINE int unpack_simple_mtraversal_continuation(Eterm continuation, - Eterm** tptr_ptr, - Eterm* tid_ptr, - Sint* slot_ix_p, - Binary** mpp, - Sint* got_p) +static ERTS_INLINE int unpack_simple_continuation(Eterm continuation, + Eterm** tptr_ptr, + Eterm* tid_ptr, + Sint* slot_ix_p, + Binary** mpp, + Sint* got_p) { Eterm* tptr; ASSERT(is_tuple(continuation)); @@ -1524,6 +1545,7 @@ static ERTS_INLINE int unpack_simple_mtraversal_continuation(Eterm continuation, #define MAX_SELECT_CHUNK_ITERATIONS 1000 typedef struct { + match_callbacks_t base; Process* p; DbTableHash* tb; Eterm tid; @@ -1531,83 +1553,86 @@ typedef struct { Sint chunk_size; Eterm match_list; Eterm* prev_continuation_tptr; -} mtraversal_select_chunk_context_t; +} select_chunk_context_t; -static int mtraversal_select_chunk_on_nothing_can_match(void* context_ptr, Eterm* ret) { - mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr; - *ret = (sc_context_ptr->chunk_size > 0 ? am_EOT : NIL); +static int select_chunk_on_nothing_can_match(match_callbacks_t* ctx_base, Eterm* ret) +{ + select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base; + *ret = (ctx->chunk_size > 0 ? am_EOT : NIL); return DB_ERROR_NONE; } -static int mtraversal_select_chunk_on_match_res(void* context_ptr, Sint slot_ix, - HashDbTerm*** current_ptr_ptr, - Eterm match_res) +static int select_chunk_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix, + HashDbTerm*** current_ptr_ptr, + Eterm match_res) { - mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr; + select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base; if (is_value(match_res)) { - sc_context_ptr->match_list = CONS(sc_context_ptr->hp, match_res, sc_context_ptr->match_list); + ctx->match_list = CONS(ctx->hp, match_res, ctx->match_list); return 1; } return 0; } -static int mtraversal_select_chunk_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got, - Sint iterations_left, Binary** mpp, Eterm* ret) +static int select_chunk_on_loop_ended(match_callbacks_t* ctx_base, + Sint slot_ix, Sint got, + Sint iterations_left, Binary** mpp, + Eterm* ret) { - mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr; + select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base; Eterm mpb; if (iterations_left == MAX_SELECT_CHUNK_ITERATIONS) { /* We didn't get to iterate a single time, which means EOT */ - ASSERT(sc_context_ptr->match_list == NIL); - *ret = (sc_context_ptr->chunk_size > 0 ? am_EOT : NIL); + ASSERT(ctx->match_list == NIL); + *ret = (ctx->chunk_size > 0 ? am_EOT : NIL); return DB_ERROR_NONE; } else { ASSERT(iterations_left < MAX_SELECT_CHUNK_ITERATIONS); - BUMP_REDS(sc_context_ptr->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left); - if (sc_context_ptr->chunk_size) { + BUMP_REDS(ctx->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left); + if (ctx->chunk_size) { Eterm continuation; Eterm rest = NIL; Sint rest_size = 0; - if (got > sc_context_ptr->chunk_size) { /* Split list in return value and 'rest' */ - Eterm tmp = sc_context_ptr->match_list; - rest = sc_context_ptr->match_list; - while (got-- > sc_context_ptr->chunk_size + 1) { + if (got > ctx->chunk_size) { /* Split list in return value and 'rest' */ + Eterm tmp = ctx->match_list; + rest = ctx->match_list; + while (got-- > ctx->chunk_size + 1) { tmp = CDR(list_val(tmp)); ++rest_size; } ++rest_size; - sc_context_ptr->match_list = CDR(list_val(tmp)); + ctx->match_list = CDR(list_val(tmp)); CDR(list_val(tmp)) = NIL; /* Destructive, the list has never been in 'user space' */ } if (rest != NIL || slot_ix >= 0) { /* Need more calls */ - Eterm tid = sc_context_ptr->tid; - sc_context_ptr->hp = HAllocX(sc_context_ptr->p, - 3 + 7 + ERTS_MAGIC_REF_THING_SIZE, - ERTS_MAGIC_REF_THING_SIZE); - mpb = erts_db_make_match_prog_ref(sc_context_ptr->p, *mpp, &sc_context_ptr->hp); + Eterm tid = ctx->tid; + ctx->hp = HAllocX(ctx->p, + 3 + 7 + ERTS_MAGIC_REF_THING_SIZE, + ERTS_MAGIC_REF_THING_SIZE); + mpb = erts_db_make_match_prog_ref(ctx->p, *mpp, &ctx->hp); if (is_atom(tid)) - tid = erts_db_make_tid(sc_context_ptr->p, - &sc_context_ptr->tb->common); + tid = erts_db_make_tid(ctx->p, + &ctx->tb->common); continuation = TUPLE6( - sc_context_ptr->hp, + ctx->hp, tid, make_small(slot_ix), - make_small(sc_context_ptr->chunk_size), + make_small(ctx->chunk_size), mpb, rest, make_small(rest_size)); *mpp = NULL; /* Otherwise the caller will destroy it */ - sc_context_ptr->hp += 7; - *ret = TUPLE2(sc_context_ptr->hp, sc_context_ptr->match_list, continuation); + ctx->hp += 7; + *ret = TUPLE2(ctx->hp, ctx->match_list, continuation); return DB_ERROR_NONE; } else { /* All data is exhausted */ - if (sc_context_ptr->match_list != NIL) { /* No more data to search but still a + if (ctx->match_list != NIL) { /* No more data to search but still a result to return to the caller */ - sc_context_ptr->hp = HAlloc(sc_context_ptr->p, 3); - *ret = TUPLE2(sc_context_ptr->hp, sc_context_ptr->match_list, am_EOT); + ctx->hp = HAlloc(ctx->p, 3); + *ret = TUPLE2(ctx->hp, ctx->match_list, am_EOT); return DB_ERROR_NONE; } else { /* Reached the end of the ttable with no data to return */ *ret = am_EOT; @@ -1615,82 +1640,88 @@ static int mtraversal_select_chunk_on_loop_ended(void* context_ptr, Sint slot_ix } } } - *ret = sc_context_ptr->match_list; + *ret = ctx->match_list; return DB_ERROR_NONE; } } -static int mtraversal_select_chunk_on_trap(void* context_ptr, Sint slot_ix, Sint got, - Binary** mpp, Eterm* ret) +static int select_chunk_on_trap(match_callbacks_t* ctx_base, + Sint slot_ix, Sint got, + Binary** mpp, Eterm* ret) { - mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr; + select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base; Eterm mpb; Eterm continuation; Eterm* hp; - BUMP_ALL_REDS(sc_context_ptr->p); + BUMP_ALL_REDS(ctx->p); - if (sc_context_ptr->prev_continuation_tptr == NULL) { - Eterm tid = sc_context_ptr->tid; + if (ctx->prev_continuation_tptr == NULL) { + Eterm tid = ctx->tid; /* First time we're trapping */ - hp = HAllocX(sc_context_ptr->p, 7 + ERTS_MAGIC_REF_THING_SIZE, + hp = HAllocX(ctx->p, 7 + ERTS_MAGIC_REF_THING_SIZE, ERTS_MAGIC_REF_THING_SIZE); if (is_atom(tid)) - tid = erts_db_make_tid(sc_context_ptr->p, &sc_context_ptr->tb->common); - mpb = erts_db_make_match_prog_ref(sc_context_ptr->p, *mpp, &hp); + tid = erts_db_make_tid(ctx->p, &ctx->tb->common); + mpb = erts_db_make_match_prog_ref(ctx->p, *mpp, &hp); continuation = TUPLE6( hp, tid, make_small(slot_ix), - make_small(sc_context_ptr->chunk_size), + make_small(ctx->chunk_size), mpb, - sc_context_ptr->match_list, + ctx->match_list, make_small(got)); *mpp = NULL; /* otherwise the caller will destroy it */ } else { /* Not the first time we're trapping; reuse continuation terms */ - hp = HAlloc(sc_context_ptr->p, 7); + hp = HAlloc(ctx->p, 7); continuation = TUPLE6( hp, - sc_context_ptr->prev_continuation_tptr[1], + ctx->prev_continuation_tptr[1], make_small(slot_ix), - sc_context_ptr->prev_continuation_tptr[3], - sc_context_ptr->prev_continuation_tptr[4], - sc_context_ptr->match_list, + ctx->prev_continuation_tptr[3], + ctx->prev_continuation_tptr[4], + ctx->match_list, make_small(got)); } - *ret = bif_trap1(&ets_select_continue_exp, sc_context_ptr->p, continuation); + ERTS_BIF_PREP_TRAP1(*ret, &ets_select_continue_exp, ctx->p, + continuation); return DB_ERROR_NONE; } -static int db_select_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, int reverse, Eterm *ret) { +static int db_select_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, + int reverse, Eterm *ret) +{ return db_select_chunk_hash(p, tbl, tid, pattern, 0, reverse, ret); } -static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, +static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Sint chunk_size, int reverse, Eterm *ret) { - mtraversal_select_chunk_context_t sc_context; - sc_context.p = p; - sc_context.tb = &tbl->hash; - sc_context.tid = tid; - sc_context.hp = NULL; - sc_context.chunk_size = chunk_size; - sc_context.match_list = NIL; - sc_context.prev_continuation_tptr = NULL; + select_chunk_context_t ctx; + + ctx.base.on_nothing_can_match = select_chunk_on_nothing_can_match; + ctx.base.on_match_res = select_chunk_on_match_res; + ctx.base.on_loop_ended = select_chunk_on_loop_ended; + ctx.base.on_trap = select_chunk_on_trap, + ctx.p = p; + ctx.tb = &tbl->hash; + ctx.tid = tid; + ctx.hp = NULL; + ctx.chunk_size = chunk_size; + ctx.match_list = NIL; + ctx.prev_continuation_tptr = NULL; return match_traverse( - sc_context.p, sc_context.tb, + ctx.p, ctx.tb, pattern, NULL, - sc_context.chunk_size, + ctx.chunk_size, MAX_SELECT_CHUNK_ITERATIONS, - &sc_context.hp, 0, - mtraversal_select_chunk_on_nothing_can_match, - mtraversal_select_chunk_on_match_res, - mtraversal_select_chunk_on_loop_ended, - mtraversal_select_chunk_on_trap, - &sc_context, ret); + &ctx.hp, 0, + &ctx.base, ret); } /* @@ -1699,47 +1730,50 @@ static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid, Eterm patte * */ -static int mtraversal_select_chunk_continue_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got, - Sint iterations_left, Binary** mpp, Eterm* ret) +static +int select_chunk_continue_on_loop_ended(match_callbacks_t* ctx_base, + Sint slot_ix, Sint got, + Sint iterations_left, Binary** mpp, + Eterm* ret) { - mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr; + select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base; Eterm continuation; Eterm rest = NIL; Eterm* hp; ASSERT(iterations_left <= MAX_SELECT_CHUNK_ITERATIONS); - BUMP_REDS(sc_context_ptr->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left); - if (sc_context_ptr->chunk_size) { + BUMP_REDS(ctx->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left); + if (ctx->chunk_size) { Sint rest_size = 0; - if (got > sc_context_ptr->chunk_size) { + if (got > ctx->chunk_size) { /* Cannot write destructively here, the list may have been in user space */ - hp = HAlloc(sc_context_ptr->p, (got - sc_context_ptr->chunk_size) * 2); - while (got-- > sc_context_ptr->chunk_size) { - rest = CONS(hp, CAR(list_val(sc_context_ptr->match_list)), rest); + hp = HAlloc(ctx->p, (got - ctx->chunk_size) * 2); + while (got-- > ctx->chunk_size) { + rest = CONS(hp, CAR(list_val(ctx->match_list)), rest); hp += 2; - sc_context_ptr->match_list = CDR(list_val(sc_context_ptr->match_list)); + ctx->match_list = CDR(list_val(ctx->match_list)); ++rest_size; } } if (rest != NIL || slot_ix >= 0) { - hp = HAlloc(sc_context_ptr->p, 3 + 7); + hp = HAlloc(ctx->p, 3 + 7); continuation = TUPLE6( hp, - sc_context_ptr->prev_continuation_tptr[1], + ctx->prev_continuation_tptr[1], make_small(slot_ix), - sc_context_ptr->prev_continuation_tptr[3], - sc_context_ptr->prev_continuation_tptr[4], + ctx->prev_continuation_tptr[3], + ctx->prev_continuation_tptr[4], rest, make_small(rest_size)); hp += 7; - *ret = TUPLE2(hp, sc_context_ptr->match_list, continuation); + *ret = TUPLE2(hp, ctx->match_list, continuation); return DB_ERROR_NONE; } else { - if (sc_context_ptr->match_list != NIL) { - hp = HAlloc(sc_context_ptr->p, 3); - *ret = TUPLE2(hp, sc_context_ptr->match_list, am_EOT); + if (ctx->match_list != NIL) { + hp = HAlloc(ctx->p, 3); + *ret = TUPLE2(hp, ctx->match_list, am_EOT); return DB_ERROR_NONE; } else { *ret = am_EOT; @@ -1747,15 +1781,17 @@ static int mtraversal_select_chunk_continue_on_loop_ended(void* context_ptr, Sin } } } - *ret = sc_context_ptr->match_list; + *ret = ctx->match_list; return DB_ERROR_NONE; } /* * This is called when select traps */ -static int db_select_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) { - mtraversal_select_chunk_context_t sc_context = {0}; +static int db_select_continue_hash(Process* p, DbTable* tbl, Eterm continuation, + Eterm* ret) +{ + select_chunk_context_t ctx; Eterm* tptr; Eterm tid; Binary* mp; @@ -1790,21 +1826,21 @@ static int db_select_continue_hash(Process* p, DbTable* tbl, Eterm continuation, match_list = tptr[5]; /* Proceed */ - sc_context.p = p; - sc_context.tb = &tbl->hash; - sc_context.tid = tid; - sc_context.hp = NULL; - sc_context.chunk_size = chunk_size; - sc_context.match_list = match_list; - sc_context.prev_continuation_tptr = tptr; + ctx.base.on_match_res = select_chunk_on_match_res; + ctx.base.on_loop_ended = select_chunk_continue_on_loop_ended; + ctx.base.on_trap = select_chunk_on_trap; + ctx.p = p; + ctx.tb = &tbl->hash; + ctx.tid = tid; + ctx.hp = NULL; + ctx.chunk_size = chunk_size; + ctx.match_list = match_list; + ctx.prev_continuation_tptr = tptr; return match_traverse_continue( - sc_context.p, sc_context.tb, sc_context.chunk_size, - iterations_left, &sc_context.hp, slot_ix, got, &mp, 0, - mtraversal_select_chunk_on_match_res, /* Reuse callback */ - mtraversal_select_chunk_continue_on_loop_ended, - mtraversal_select_chunk_on_trap, /* Reuse callback */ - &sc_context, ret); + ctx.p, ctx.tb, ctx.chunk_size, + iterations_left, &ctx.hp, slot_ix, got, &mp, 0, + &ctx.base, ret); badparam: *ret = NIL; @@ -1823,75 +1859,83 @@ badparam: #define MAX_SELECT_COUNT_ITERATIONS 1000 typedef struct { + match_callbacks_t base; Process* p; DbTableHash* tb; Eterm tid; - Eterm* hp; Eterm* prev_continuation_tptr; -} mtraversal_select_count_context_t; +} select_count_context_t; -static int mtraversal_select_count_on_nothing_can_match(void* context_ptr, Eterm* ret) { +static int select_count_on_nothing_can_match(match_callbacks_t* ctx_base, + Eterm* ret) +{ *ret = make_small(0); return DB_ERROR_NONE; } -static int mtraversal_select_count_on_match_res(void* context_ptr, Sint slot_ix, - HashDbTerm*** current_ptr_ptr, - Eterm match_res) +static int select_count_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix, + HashDbTerm*** current_ptr_ptr, + Eterm match_res) { return (match_res == am_true); } -static int mtraversal_select_count_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got, - Sint iterations_left, Binary** mpp, Eterm* ret) +static int select_count_on_loop_ended(match_callbacks_t* ctx_base, + Sint slot_ix, Sint got, + Sint iterations_left, Binary** mpp, + Eterm* ret) { - mtraversal_select_count_context_t* scnt_context_ptr = (mtraversal_select_count_context_t*) context_ptr; + select_count_context_t* ctx = (select_count_context_t*) ctx_base; ASSERT(iterations_left <= MAX_SELECT_COUNT_ITERATIONS); - BUMP_REDS(scnt_context_ptr->p, MAX_SELECT_COUNT_ITERATIONS - iterations_left); - *ret = erts_make_integer(got, scnt_context_ptr->p); + BUMP_REDS(ctx->p, MAX_SELECT_COUNT_ITERATIONS - iterations_left); + *ret = erts_make_integer(got, ctx->p); return DB_ERROR_NONE; } -static int mtraversal_select_count_on_trap(void* context_ptr, Sint slot_ix, Sint got, - Binary** mpp, Eterm* ret) +static int select_count_on_trap(match_callbacks_t* ctx_base, + Sint slot_ix, Sint got, + Binary** mpp, Eterm* ret) { - mtraversal_select_count_context_t* scnt_context_ptr = (mtraversal_select_count_context_t*) context_ptr; - return on_mtraversal_simple_trap( + select_count_context_t* ctx = (select_count_context_t*) ctx_base; + return on_simple_trap( &ets_select_count_continue_exp, - scnt_context_ptr->p, - scnt_context_ptr->tb, - scnt_context_ptr->tid, - scnt_context_ptr->prev_continuation_tptr, + ctx->p, + ctx->tb, + ctx->tid, + ctx->prev_continuation_tptr, slot_ix, got, mpp, ret); } -static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { - mtraversal_select_count_context_t scnt_context = {0}; +static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret) +{ + select_count_context_t ctx; Sint iterations_left = MAX_SELECT_COUNT_ITERATIONS; Sint chunk_size = 0; - scnt_context.p = p; - scnt_context.tb = &tbl->hash; - scnt_context.tid = tid; - scnt_context.hp = NULL; - scnt_context.prev_continuation_tptr = NULL; + ctx.base.on_nothing_can_match = select_count_on_nothing_can_match; + ctx.base.on_match_res = select_count_on_match_res; + ctx.base.on_loop_ended = select_count_on_loop_ended; + ctx.base.on_trap = select_count_on_trap; + ctx.p = p; + ctx.tb = &tbl->hash; + ctx.tid = tid; + ctx.prev_continuation_tptr = NULL; return match_traverse( - scnt_context.p, scnt_context.tb, + ctx.p, ctx.tb, pattern, NULL, chunk_size, iterations_left, NULL, 0, - mtraversal_select_count_on_nothing_can_match, - mtraversal_select_count_on_match_res, - mtraversal_select_count_on_loop_ended, - mtraversal_select_count_on_trap, - &scnt_context, ret); + &ctx.base, ret); } /* * This is called when select_count traps */ -static int db_select_count_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) { - mtraversal_select_count_context_t scnt_context = {0}; +static int db_select_count_continue_hash(Process* p, DbTable* tbl, + Eterm continuation, Eterm* ret) +{ + select_count_context_t ctx; Eterm* tptr; Eterm tid; Binary* mp; @@ -1900,25 +1944,24 @@ static int db_select_count_continue_hash(Process* p, DbTable* tbl, Eterm continu Sint chunk_size = 0; *ret = NIL; - if (unpack_simple_mtraversal_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) { + if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) { *ret = NIL; return DB_ERROR_BADPARAM; } - scnt_context.p = p; - scnt_context.tb = &tbl->hash; - scnt_context.tid = tid; - scnt_context.hp = NULL; - scnt_context.prev_continuation_tptr = tptr; + ctx.base.on_match_res = select_count_on_match_res; + ctx.base.on_loop_ended = select_count_on_loop_ended; + ctx.base.on_trap = select_count_on_trap; + ctx.p = p; + ctx.tb = &tbl->hash; + ctx.tid = tid; + ctx.prev_continuation_tptr = tptr; return match_traverse_continue( - scnt_context.p, scnt_context.tb, chunk_size, + ctx.p, ctx.tb, chunk_size, MAX_SELECT_COUNT_ITERATIONS, NULL, slot_ix, got, &mp, 0, - mtraversal_select_count_on_match_res, /* Reuse callback */ - mtraversal_select_count_on_loop_ended, /* Reuse callback */ - mtraversal_select_count_on_trap, /* Reuse callback */ - &scnt_context, ret); + &ctx.base, ret); } #undef MAX_SELECT_COUNT_ITERATIONS @@ -1933,104 +1976,119 @@ static int db_select_count_continue_hash(Process* p, DbTable* tbl, Eterm continu #define MAX_SELECT_DELETE_ITERATIONS 1000 typedef struct { + match_callbacks_t base; Process* p; DbTableHash* tb; Eterm tid; - Eterm* hp; Eterm* prev_continuation_tptr; erts_aint_t fixated_by_me; Uint last_pseudo_delete; -} mtraversal_select_delete_context_t; + HashDbTerm* free_us; +} select_delete_context_t; -static int mtraversal_select_delete_on_nothing_can_match(void* context_ptr, Eterm* ret) { +static int select_delete_on_nothing_can_match(match_callbacks_t* ctx_base, + Eterm* ret) +{ *ret = make_small(0); return DB_ERROR_NONE; } -static int mtraversal_select_delete_on_match_res(void* context_ptr, Sint slot_ix, - HashDbTerm*** current_ptr_ptr, - Eterm match_res) +static int select_delete_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix, + HashDbTerm*** current_ptr_ptr, + Eterm match_res) { HashDbTerm** current_ptr = *current_ptr_ptr; - mtraversal_select_delete_context_t* sd_context_ptr = (mtraversal_select_delete_context_t*) context_ptr; + select_delete_context_t* ctx = (select_delete_context_t*) ctx_base; HashDbTerm* del; if (match_res != am_true) return 0; - if (NFIXED(sd_context_ptr->tb) > sd_context_ptr->fixated_by_me) { /* fixated by others? */ - if (slot_ix != sd_context_ptr->last_pseudo_delete) { - if (!add_fixed_deletion(sd_context_ptr->tb, slot_ix, sd_context_ptr->fixated_by_me)) + if (NFIXED(ctx->tb) > ctx->fixated_by_me) { /* fixated by others? */ + if (slot_ix != ctx->last_pseudo_delete) { + if (!add_fixed_deletion(ctx->tb, slot_ix, ctx->fixated_by_me)) goto do_erase; - sd_context_ptr->last_pseudo_delete = slot_ix; + ctx->last_pseudo_delete = slot_ix; } - (*current_ptr)->hvalue = INVALID_HASH; + (*current_ptr)->pseudo_deleted = 1; } else { do_erase: del = *current_ptr; *current_ptr = (*current_ptr)->next; // replace pointer to term using next - free_term(sd_context_ptr->tb, del); + del->next = ctx->free_us; + ctx->free_us = del; } - erts_atomic_dec_nob(&sd_context_ptr->tb->common.nitems); + erts_atomic_dec_nob(&ctx->tb->common.nitems); return 1; } -static int mtraversal_select_delete_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got, - Sint iterations_left, Binary** mpp, Eterm* ret) +static int select_delete_on_loop_ended(match_callbacks_t* ctx_base, + Sint slot_ix, Sint got, + Sint iterations_left, Binary** mpp, + Eterm* ret) { - mtraversal_select_delete_context_t* sd_context_ptr = (mtraversal_select_delete_context_t*) context_ptr; + select_delete_context_t* ctx = (select_delete_context_t*) ctx_base; + free_term_list(ctx->tb, ctx->free_us); + ctx->free_us = NULL; ASSERT(iterations_left <= MAX_SELECT_DELETE_ITERATIONS); - BUMP_REDS(sd_context_ptr->p, MAX_SELECT_DELETE_ITERATIONS - iterations_left); + BUMP_REDS(ctx->p, MAX_SELECT_DELETE_ITERATIONS - iterations_left); if (got) { - try_shrink(sd_context_ptr->tb); + try_shrink(ctx->tb); } - *ret = erts_make_integer(got, sd_context_ptr->p); + *ret = erts_make_integer(got, ctx->p); return DB_ERROR_NONE; } -static int mtraversal_select_delete_on_trap(void* context_ptr, Sint slot_ix, Sint got, - Binary** mpp, Eterm* ret) +static int select_delete_on_trap(match_callbacks_t* ctx_base, + Sint slot_ix, Sint got, + Binary** mpp, Eterm* ret) { - mtraversal_select_delete_context_t* sd_context_ptr = (mtraversal_select_delete_context_t*) context_ptr; - return on_mtraversal_simple_trap( + select_delete_context_t* ctx = (select_delete_context_t*) ctx_base; + free_term_list(ctx->tb, ctx->free_us); + ctx->free_us = NULL; + return on_simple_trap( &ets_select_delete_continue_exp, - sd_context_ptr->p, - sd_context_ptr->tb, - sd_context_ptr->tid, - sd_context_ptr->prev_continuation_tptr, + ctx->p, + ctx->tb, + ctx->tid, + ctx->prev_continuation_tptr, slot_ix, got, mpp, ret); } -static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { - mtraversal_select_delete_context_t sd_context = {0}; +static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret) +{ + select_delete_context_t ctx; Sint chunk_size = 0; - sd_context.p = p; - sd_context.tb = &tbl->hash; - sd_context.tid = tid; - sd_context.hp = NULL; - sd_context.prev_continuation_tptr = NULL; - sd_context.fixated_by_me = sd_context.tb->common.is_thread_safe ? 0 : 1; /* TODO: something nicer */ - sd_context.last_pseudo_delete = (Uint) -1; + ctx.base.on_nothing_can_match = select_delete_on_nothing_can_match; + ctx.base.on_match_res = select_delete_on_match_res; + ctx.base.on_loop_ended = select_delete_on_loop_ended; + ctx.base.on_trap = select_delete_on_trap; + ctx.p = p; + ctx.tb = &tbl->hash; + ctx.tid = tid; + ctx.prev_continuation_tptr = NULL; + ctx.fixated_by_me = ctx.tb->common.is_thread_safe ? 0 : 1; /* TODO: something nicer */ + ctx.last_pseudo_delete = (Uint) -1; + ctx.free_us = NULL; return match_traverse( - sd_context.p, sd_context.tb, + ctx.p, ctx.tb, pattern, NULL, chunk_size, MAX_SELECT_DELETE_ITERATIONS, NULL, 1, - mtraversal_select_delete_on_nothing_can_match, - mtraversal_select_delete_on_match_res, - mtraversal_select_delete_on_loop_ended, - mtraversal_select_delete_on_trap, - &sd_context, ret); + &ctx.base, ret); } /* * This is called when select_delete traps */ -static int db_select_delete_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) { - mtraversal_select_delete_context_t sd_context = {0}; +static int db_select_delete_continue_hash(Process* p, DbTable* tbl, + Eterm continuation, Eterm* ret) +{ + select_delete_context_t ctx; Eterm* tptr; Eterm tid; Binary* mp; @@ -2038,27 +2096,27 @@ static int db_select_delete_continue_hash(Process* p, DbTable* tbl, Eterm contin Sint slot_ix; Sint chunk_size = 0; - if (unpack_simple_mtraversal_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) { + if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) { *ret = NIL; return DB_ERROR_BADPARAM; } - sd_context.p = p; - sd_context.tb = &tbl->hash; - sd_context.tid = tid; - sd_context.hp = NULL; - sd_context.prev_continuation_tptr = tptr; - sd_context.fixated_by_me = ONLY_WRITER(p, sd_context.tb) ? 0 : 1; /* TODO: something nicer */ - sd_context.last_pseudo_delete = (Uint) -1; + ctx.base.on_match_res = select_delete_on_match_res; + ctx.base.on_loop_ended = select_delete_on_loop_ended; + ctx.base.on_trap = select_delete_on_trap; + ctx.p = p; + ctx.tb = &tbl->hash; + ctx.tid = tid; + ctx.prev_continuation_tptr = tptr; + ctx.fixated_by_me = ONLY_WRITER(p, ctx.tb) ? 0 : 1; /* TODO: something nicer */ + ctx.last_pseudo_delete = (Uint) -1; + ctx.free_us = NULL; return match_traverse_continue( - sd_context.p, sd_context.tb, chunk_size, + ctx.p, ctx.tb, chunk_size, MAX_SELECT_DELETE_ITERATIONS, NULL, slot_ix, got, &mp, 1, - mtraversal_select_delete_on_match_res, /* Reuse callback */ - mtraversal_select_delete_on_loop_ended, /* Reuse callback */ - mtraversal_select_delete_on_trap, /* Reuse callback */ - &sd_context, ret); + &ctx.base, ret); } #undef MAX_SELECT_DELETE_ITERATIONS @@ -2073,24 +2131,26 @@ static int db_select_delete_continue_hash(Process* p, DbTable* tbl, Eterm contin #define MAX_SELECT_REPLACE_ITERATIONS 1000 typedef struct { + match_callbacks_t base; Process* p; DbTableHash* tb; Eterm tid; - Eterm* hp; Eterm* prev_continuation_tptr; -} mtraversal_select_replace_context_t; +} select_replace_context_t; -static int mtraversal_select_replace_on_nothing_can_match(void* context_ptr, Eterm* ret) { +static int select_replace_on_nothing_can_match(match_callbacks_t* ctx_base, + Eterm* ret) +{ *ret = make_small(0); return DB_ERROR_NONE; } -static int mtraversal_select_replace_on_match_res(void* context_ptr, Sint slot_ix, - HashDbTerm*** current_ptr_ptr, - Eterm match_res) +static int select_replace_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix, + HashDbTerm*** current_ptr_ptr, + Eterm match_res) { - mtraversal_select_replace_context_t* sr_context_ptr = (mtraversal_select_replace_context_t*) context_ptr; - DbTableHash* tb = sr_context_ptr->tb; + select_replace_context_t* ctx = (select_replace_context_t*) ctx_base; + DbTableHash* tb = ctx->tb; HashDbTerm* new; HashDbTerm* next; HashValue hval; @@ -2106,6 +2166,7 @@ static int mtraversal_select_replace_on_match_res(void* context_ptr, Sint slot_i new = new_dbterm(tb, match_res); new->next = next; new->hvalue = hval; + new->pseudo_deleted = 0; free_term(tb, **current_ptr_ptr); **current_ptr_ptr = new; /* replace 'next' pointer in previous object */ *current_ptr_ptr = &((**current_ptr_ptr)->next); /* advance to next object */ @@ -2114,35 +2175,37 @@ static int mtraversal_select_replace_on_match_res(void* context_ptr, Sint slot_i return 0; } -static int mtraversal_select_replace_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got, - Sint iterations_left, Binary** mpp, Eterm* ret) +static int select_replace_on_loop_ended(match_callbacks_t* ctx_base, Sint slot_ix, + Sint got, Sint iterations_left, + Binary** mpp, Eterm* ret) { - mtraversal_select_replace_context_t* sr_context_ptr = (mtraversal_select_replace_context_t*) context_ptr; + select_replace_context_t* ctx = (select_replace_context_t*) ctx_base; ASSERT(iterations_left <= MAX_SELECT_REPLACE_ITERATIONS); /* the more objects we've replaced, the more reductions we've consumed */ - BUMP_REDS(sr_context_ptr->p, + BUMP_REDS(ctx->p, MIN(MAX_SELECT_REPLACE_ITERATIONS * 2, (MAX_SELECT_REPLACE_ITERATIONS - iterations_left) + (int)got)); - *ret = erts_make_integer(got, sr_context_ptr->p); + *ret = erts_make_integer(got, ctx->p); return DB_ERROR_NONE; } -static int mtraversal_select_replace_on_trap(void* context_ptr, Sint slot_ix, Sint got, - Binary** mpp, Eterm* ret) +static int select_replace_on_trap(match_callbacks_t* ctx_base, + Sint slot_ix, Sint got, + Binary** mpp, Eterm* ret) { - mtraversal_select_replace_context_t* sr_context_ptr = (mtraversal_select_replace_context_t*) context_ptr; - return on_mtraversal_simple_trap( + select_replace_context_t* ctx = (select_replace_context_t*) ctx_base; + return on_simple_trap( &ets_select_replace_continue_exp, - sr_context_ptr->p, - sr_context_ptr->tb, - sr_context_ptr->tid, - sr_context_ptr->prev_continuation_tptr, + ctx->p, + ctx->tb, + ctx->tid, + ctx->prev_continuation_tptr, slot_ix, got, mpp, ret); } static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { - mtraversal_select_replace_context_t sr_context = {0}; + select_replace_context_t ctx; Sint chunk_size = 0; /* Bag implementation presented both semantic consistency and performance issues, @@ -2150,22 +2213,21 @@ static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pat */ ASSERT(!(tbl->hash.common.status & DB_BAG)); - sr_context.p = p; - sr_context.tb = &tbl->hash; - sr_context.tid = tid; - sr_context.hp = NULL; - sr_context.prev_continuation_tptr = NULL; + ctx.base.on_nothing_can_match = select_replace_on_nothing_can_match; + ctx.base.on_match_res = select_replace_on_match_res; + ctx.base.on_loop_ended = select_replace_on_loop_ended; + ctx.base.on_trap = select_replace_on_trap; + ctx.p = p; + ctx.tb = &tbl->hash; + ctx.tid = tid; + ctx.prev_continuation_tptr = NULL; return match_traverse( - sr_context.p, sr_context.tb, + ctx.p, ctx.tb, pattern, db_match_keeps_key, chunk_size, MAX_SELECT_REPLACE_ITERATIONS, NULL, 1, - mtraversal_select_replace_on_nothing_can_match, - mtraversal_select_replace_on_match_res, - mtraversal_select_replace_on_loop_ended, - mtraversal_select_replace_on_trap, - &sr_context, ret); + &ctx.base, ret); } /* @@ -2173,7 +2235,7 @@ static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pat */ static int db_select_replace_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) { - mtraversal_select_replace_context_t sr_context = {0}; + select_replace_context_t ctx; Eterm* tptr; Eterm tid ; Binary* mp; @@ -2182,26 +2244,25 @@ static int db_select_replace_continue_hash(Process* p, DbTable* tbl, Eterm conti Sint chunk_size = 0; *ret = NIL; - if (unpack_simple_mtraversal_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) { + if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) { *ret = NIL; return DB_ERROR_BADPARAM; } /* Proceed */ - sr_context.p = p; - sr_context.tb = &tbl->hash; - sr_context.tid = tid; - sr_context.hp = NULL; - sr_context.prev_continuation_tptr = tptr; + ctx.base.on_match_res = select_replace_on_match_res; + ctx.base.on_loop_ended = select_replace_on_loop_ended; + ctx.base.on_trap = select_replace_on_trap; + ctx.p = p; + ctx.tb = &tbl->hash; + ctx.tid = tid; + ctx.prev_continuation_tptr = tptr; return match_traverse_continue( - sr_context.p, sr_context.tb, chunk_size, + ctx.p, ctx.tb, chunk_size, MAX_SELECT_REPLACE_ITERATIONS, NULL, slot_ix, got, &mp, 1, - mtraversal_select_replace_on_match_res, /* Reuse callback */ - mtraversal_select_replace_on_loop_ended, /* Reuse callback */ - mtraversal_select_replace_on_trap, /* Reuse callback */ - &sr_context, ret); + &ctx.base, ret); } @@ -2209,6 +2270,7 @@ static int db_take_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret) { DbTableHash *tb = &tbl->hash; HashDbTerm **bp, *b; + HashDbTerm *free_us = NULL; HashValue hval = MAKE_HASH(key); erts_rwmtx_t *lck = WLOCK_HASH(tb, hval); int ix = hash_to_ix(tb, hval); @@ -2226,12 +2288,13 @@ static int db_take_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret) && add_fixed_deletion(tb, ix, 0)) { /* Pseudo remove (no need to keep several of same key) */ bp = &b->next; - b->hvalue = INVALID_HASH; + b->pseudo_deleted = 1; b = b->next; } else { - *bp = b->next; - free_term(tb, b); - b = *bp; + HashDbTerm* next = b->next; + b->next = free_us; + free_us = b; + b = *bp = next; } } break; @@ -2242,6 +2305,7 @@ static int db_take_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret) erts_atomic_add_nob(&tb->common.nitems, nitems_diff); try_shrink(tb); } + free_term_list(tb, free_us); return DB_ERROR_NONE; } @@ -2255,25 +2319,52 @@ void db_initialize_hash(void) } -int db_mark_all_deleted_hash(DbTable *tbl) +static SWord db_mark_all_deleted_hash(DbTable *tbl, SWord reds) { + const int LOOPS_PER_REDUCTION = 8; DbTableHash *tb = &tbl->hash; - HashDbTerm* list; + FixedDeletion* fixdel; + SWord loops = reds * LOOPS_PER_REDUCTION; int i; ERTS_LC_ASSERT(IS_TAB_WLOCKED(tb)); - for (i = 0; i < NACTIVE(tb); i++) { - if ((list = BUCKET(tb,i)) != NULL) { - add_fixed_deletion(tb, i, 0); - do { - list->hvalue = INVALID_HASH; - list = list->next; - }while(list != NULL); - } + fixdel = (FixedDeletion*) erts_atomic_read_nob(&tb->fixdel); + if (fixdel && fixdel->trap) { + /* Continue after trap */ + ASSERT(fixdel->all); + ASSERT(fixdel->slot < NACTIVE(tb)); + i = fixdel->slot; + } + else { + /* First call */ + fixdel = erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL, + (DbTable *) tb, + sizeof(FixedDeletion)); + ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion)); + link_fixdel(tb, fixdel, 0); + i = 0; } + + do { + HashDbTerm* b; + for (b = BUCKET(tb,i); b; b = b->next) + b->pseudo_deleted = 1; + } while (++i < NACTIVE(tb) && --loops > 0); + + if (i < NACTIVE(tb)) { + /* Yield */ + fixdel->slot = i; + fixdel->all = 0; + fixdel->trap = 1; + return -1; + } + + fixdel->slot = NACTIVE(tb) - 1; + fixdel->all = 1; + fixdel->trap = 0; erts_atomic_set_nob(&tb->common.nitems, 0); - return DB_ERROR_NONE; + return loops < 0 ? 0 : loops / LOOPS_PER_REDUCTION; } @@ -2316,7 +2407,7 @@ static void db_print_hash(fmtfn_t to, void *to_arg, int show, DbTable *tbl) continue; erts_print(to, to_arg, "%d: [", i); while(list != 0) { - if (list->hvalue == INVALID_HASH) + if (is_pseudo_deleted(list)) erts_print(to, to_arg, "*"); if (tb->common.compress) { Eterm key = GETKEY(tb, list->dbterm.tpl); @@ -2335,9 +2426,9 @@ static void db_print_hash(fmtfn_t to, void *to_arg, int show, DbTable *tbl) } } -/* release all memory occupied by a single table */ -static int db_free_table_hash(DbTable *tbl) +static int db_free_empty_table_hash(DbTable *tbl) { + ASSERT(NITEMS(tbl) == 0); while (db_free_table_continue_hash(tbl, ERTS_SWORD_MAX) < 0) ; return 0; @@ -2415,7 +2506,6 @@ static int analyze_pattern(DbTableHash *tb, Eterm pattern, mpi->num_lists = 0; mpi->key_given = 1; mpi->something_can_match = 0; - mpi->all_objects = 1; mpi->mp = NULL; for (lst = pattern; is_list(lst); lst = CDR(list_val(lst))) @@ -2468,7 +2558,6 @@ static int analyze_pattern(DbTableHash *tb, Eterm pattern, if (!is_list(body) || CDR(list_val(body)) != NIL || CAR(list_val(body)) != am_DollarUnderscore) { - mpi->all_objects = 0; } ++i; if (!(mpi->key_given)) { @@ -2682,7 +2771,7 @@ static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2, if (!sz) { ptr = ptr1; while(ptr != ptr2) { - if (ptr->hvalue != INVALID_HASH) + if (!is_pseudo_deleted(ptr)) sz += ptr->dbterm.size + 2; ptr = ptr->next; } @@ -2693,7 +2782,7 @@ static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2, ptr = ptr1; while(ptr != ptr2) { - if (ptr->hvalue != INVALID_HASH) { + if (!is_pseudo_deleted(ptr)) { copy = db_copy_object_from_ets(&tb->common, &ptr->dbterm, &hp, &MSO(p)); list = CONS(hp, copy, list); hp += 2; @@ -2786,7 +2875,7 @@ static void grow(DbTableHash* tb, int nitems) p = *pnext; to_pnext = &BUCKET(tb, to_ix); while (p != NULL) { - if (p->hvalue == INVALID_HASH) { /* rare but possible with fine locking */ + if (is_pseudo_deleted(p)) { /* rare but possible with fine locking */ *pnext = p->next; free_term(tb, p); p = *pnext; @@ -2859,7 +2948,7 @@ static void shrink(DbTableHash* tb, int nitems) * as we must step through "src" anyway to purge pseudo deleted. */ while(*bp != NULL) { - if ((*bp)->hvalue == INVALID_HASH) { + if (is_pseudo_deleted(*bp)) { HashDbTerm* deleted = *bp; *bp = deleted->next; free_term(tb, deleted); @@ -2917,7 +3006,7 @@ static HashDbTerm* next_live(DbTableHash *tb, Uint *iptr, erts_rwmtx_t** lck_ptr ERTS_LC_ASSERT(IS_HASH_RLOCKED(tb,*iptr)); for ( ; list != NULL; list = list->next) { - if (list->hvalue != INVALID_HASH) + if (!is_pseudo_deleted(list)) return list; } @@ -2926,7 +3015,7 @@ static HashDbTerm* next_live(DbTableHash *tb, Uint *iptr, erts_rwmtx_t** lck_ptr list = BUCKET(tb,i); while (list != NULL) { - if (list->hvalue != INVALID_HASH) { + if (!is_pseudo_deleted(list)) { *iptr = i; return list; } @@ -2959,7 +3048,7 @@ db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj, break; } if (has_key(tb, b, key, hval)) { - if (b->hvalue != INVALID_HASH) { + if (!is_pseudo_deleted(b)) { goto Ldone; } break; @@ -2989,16 +3078,18 @@ db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj, HashDbTerm *q = new_dbterm(tb, obj); q->hvalue = hval; + q->pseudo_deleted = 0; q->next = NULL; *bp = b = q; flags |= DB_INC_TRY_GROW; } else { HashDbTerm *q, *next = b->next; - ASSERT(b->hvalue == INVALID_HASH); + ASSERT(is_pseudo_deleted(b)); q = replace_dbterm(tb, b, obj); q->next = next; - q->hvalue = hval; + ASSERT(q->hvalue == hval); + q->pseudo_deleted = 0; *bp = b = q; erts_atomic_inc_nob(&tb->common.nitems); } @@ -3036,7 +3127,7 @@ db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle) if (handle->flags & DB_NEW_OBJECT && cret != DB_ERROR_NONE) { if (IS_FIXED(tb) && add_fixed_deletion(tb, hash_to_ix(tb, b->hvalue), 0)) { - b->hvalue = INVALID_HASH; + b->pseudo_deleted = 1; } else { *bp = b->next; free_me = b; @@ -3073,16 +3164,19 @@ db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle) return; } -static int db_delete_all_objects_hash(Process* p, DbTable* tbl) +static SWord db_delete_all_objects_hash(Process* p, DbTable* tbl, SWord reds) { if (IS_FIXED(tbl)) { - db_mark_all_deleted_hash(tbl); + reds = db_mark_all_deleted_hash(tbl, reds); } else { - db_free_table_hash(tbl); + reds = db_free_table_continue_hash(tbl, reds); + if (reds < 0) + return reds; + db_create_hash(p, tbl); erts_atomic_set_nob(&tbl->hash.common.nitems, 0); } - return 0; + return reds; } void db_foreach_offheap_hash(DbTable *tbl, @@ -3125,7 +3219,7 @@ void db_calc_stats_hash(DbTableHash* tb, DbHashStats* stats) len = 0; for (b = BUCKET(tb,ix); b!=NULL; b=b->next) { len++; - if (b->hvalue == INVALID_HASH) + if (is_pseudo_deleted(b)) ++kept_items; } sum += len; diff --git a/erts/emulator/beam/erl_db_hash.h b/erts/emulator/beam/erl_db_hash.h index 7d27609825..08e5b13db1 100644 --- a/erts/emulator/beam/erl_db_hash.h +++ b/erts/emulator/beam/erl_db_hash.h @@ -24,13 +24,26 @@ #include "erl_db_util.h" /* DbTerm & DbTableCommon */ typedef struct fixed_deletion { - int slot; + UWord slot : sizeof(UWord)*8 - 2; + UWord all : 1; + UWord trap : 1; struct fixed_deletion *next; } FixedDeletion; + +typedef Uint32 HashVal; + typedef struct hash_db_term { struct hash_db_term* next; /* next bucket */ - HashValue hvalue; /* stored hash value */ +#if SIZEOF_VOID_P == 4 + Uint32 hvalue : 31; /* stored hash value */ + Uint32 pseudo_deleted : 1; +# define MAX_HASH_MASK (((Uint32)1 << 31)-1) +#elif SIZEOF_VOID_P == 8 + Uint32 hvalue; + Uint32 pseudo_deleted; +# define MAX_HASH_MASK ((Uint32)(Sint32)-1) +#endif DbTerm dbterm; /* The actual term */ } HashDbTerm; @@ -86,9 +99,6 @@ int db_get_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret); int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret); -/* not yet in method table */ -int db_mark_all_deleted_hash(DbTable *tbl); - typedef struct { float avg_chain_len; float std_dev_chain_len; diff --git a/erts/emulator/beam/erl_db_tree.c b/erts/emulator/beam/erl_db_tree.c index 5a276b9d88..0692583dd4 100644 --- a/erts/emulator/beam/erl_db_tree.c +++ b/erts/emulator/beam/erl_db_tree.c @@ -170,11 +170,6 @@ static ERTS_INLINE TreeDbTerm* replace_dbterm(DbTableTree *tb, TreeDbTerm* old, #define DIR_END 2 /* - * Special binary flag - */ -#define BIN_FLAG_ALL_OBJECTS BIN_FLAG_USR1 - -/* * Number of records to delete before trapping. */ #define DELETE_RECORD_LIMIT 12000 @@ -218,9 +213,6 @@ static void do_dump_tree2(DbTableTree*, int to, void *to_arg, int show, * functions. */ struct mp_info { - int all_objects; /* True if complete objects are always - * returned from the match_spec (can use - * copy_shallow on the return value) */ int something_can_match; /* The match_spec is not "impossible" */ int some_limitation; /* There is some limitation on the search * area, i. e. least and/or most is set.*/ @@ -248,7 +240,6 @@ struct select_context { Eterm *lastobj; Sint32 max; int keypos; - int all_objects; Sint got; Sint chunk_size; }; @@ -263,7 +254,6 @@ struct select_count_context { Eterm *lastobj; Sint32 max; int keypos; - int all_objects; Sint got; }; @@ -293,7 +283,6 @@ struct select_replace_context { Eterm *lastobj; Sint32 max; int keypos; - int all_objects; Sint replaced; }; @@ -428,7 +417,7 @@ static int db_select_replace_continue_tree(Process *p, DbTable *tbl, static int db_take_tree(Process *, DbTable *, Eterm, Eterm *); static void db_print_tree(fmtfn_t to, void *to_arg, int show, DbTable *tbl); -static int db_free_table_tree(DbTable *tbl); +static int db_free_empty_table_tree(DbTable *tbl); static SWord db_free_table_continue_tree(DbTable *tbl, SWord); @@ -436,7 +425,7 @@ static void db_foreach_offheap_tree(DbTable *, void (*)(ErlOffHeap *, void *), void *); -static int db_delete_all_objects_tree(Process* p, DbTable* tbl); +static SWord db_delete_all_objects_tree(Process* p, DbTable* tbl, SWord reds); #ifdef HARDDEBUG static void db_check_table_tree(DbTable *tbl); @@ -481,7 +470,7 @@ DbTableMethod db_tree = db_select_replace_continue_tree, db_take_tree, db_delete_all_objects_tree, - db_free_table_tree, + db_free_empty_table_tree, db_free_table_continue_tree, db_print_tree, db_foreach_offheap_tree, @@ -992,7 +981,6 @@ static int db_select_continue_tree(Process *p, sc.lastobj = NULL; sc.max = 1000; sc.keypos = tb->common.keypos; - sc.all_objects = mp->intern.flags & BIN_FLAG_ALL_OBJECTS; sc.chunk_size = chunk_size; reverse = unsigned_val(tptr[7]); sc.got = signed_val(tptr[8]); @@ -1143,7 +1131,6 @@ static int db_select_tree(Process *p, DbTable *tbl, Eterm tid, } sc.mp = mpi.mp; - sc.all_objects = mpi.all_objects; if (!mpi.got_partial && mpi.some_limitation && CMP_EQ(mpi.least,mpi.most)) { @@ -1183,8 +1170,6 @@ static int db_select_tree(Process *p, DbTable *tbl, Eterm tid, sz = size_object(key); hp = HAlloc(p, 9 + sz + ERTS_MAGIC_REF_THING_SIZE); key = copy_struct(key, sz, &hp, &MSO(p)); - if (mpi.all_objects) - (mpi.mp)->intern.flags |= BIN_FLAG_ALL_OBJECTS; mpb= erts_db_make_match_prog_ref(p,mpi.mp,&hp); continuation = TUPLE8 @@ -1346,7 +1331,6 @@ static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid, } sc.mp = mpi.mp; - sc.all_objects = mpi.all_objects; if (!mpi.got_partial && mpi.some_limitation && CMP_EQ(mpi.least,mpi.most)) { @@ -1381,8 +1365,6 @@ static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid, hp += BIG_UINT_HEAP_SIZE; } key = copy_struct(key, sz, &hp, &MSO(p)); - if (mpi.all_objects) - (mpi.mp)->intern.flags |= BIN_FLAG_ALL_OBJECTS; mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp); continuation = TUPLE5 @@ -1449,7 +1431,6 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid, } sc.mp = mpi.mp; - sc.all_objects = mpi.all_objects; if (!mpi.got_partial && mpi.some_limitation && CMP_EQ(mpi.least,mpi.most)) { @@ -1506,8 +1487,6 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid, sz = size_object(key); hp = HAlloc(p, 9 + sz + ERTS_MAGIC_REF_THING_SIZE); key = copy_struct(key, sz, &hp, &MSO(p)); - if (mpi.all_objects) - (mpi.mp)->intern.flags |= BIN_FLAG_ALL_OBJECTS; mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp); continuation = TUPLE8 @@ -1532,8 +1511,6 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid, hp = HAlloc(p, 9 + sz + ERTS_MAGIC_REF_THING_SIZE); key = copy_struct(key, sz, &hp, &MSO(p)); - if (mpi.all_objects) - (mpi.mp)->intern.flags |= BIN_FLAG_ALL_OBJECTS; mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp); continuation = TUPLE8 (hp, @@ -1882,7 +1859,6 @@ static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid, } sc.mp = mpi.mp; - sc.all_objects = mpi.all_objects; stack = get_static_stack(tb); if (!mpi.got_partial && mpi.some_limitation && @@ -1928,8 +1904,6 @@ static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid, hp += BIG_UINT_HEAP_SIZE; } key = copy_struct(key, sz, &hp, &MSO(p)); - if (mpi.all_objects) - (mpi.mp)->intern.flags |= BIN_FLAG_ALL_OBJECTS; mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp); continuation = TUPLE5 @@ -1995,8 +1969,9 @@ static void db_print_tree(fmtfn_t to, void *to_arg, } /* release all memory occupied by a single table */ -static int db_free_table_tree(DbTable *tbl) +static int db_free_empty_table_tree(DbTable *tbl) { + ASSERT(tbl->tree.root == NULL); while (db_free_table_continue_tree(tbl, ERTS_SWORD_MAX) < 0) ; return 1; @@ -2023,12 +1998,14 @@ static SWord db_free_table_continue_tree(DbTable *tbl, SWord reds) return reds; } -static int db_delete_all_objects_tree(Process* p, DbTable* tbl) +static SWord db_delete_all_objects_tree(Process* p, DbTable* tbl, SWord reds) { - db_free_table_tree(tbl); + reds = db_free_table_continue_tree(tbl, reds); + if (reds < 0) + return reds; db_create_tree(p, tbl); erts_atomic_set_nob(&tbl->tree.common.nitems, 0); - return 0; + return reds; } static void do_db_tree_foreach_offheap(TreeDbTerm *, @@ -2214,7 +2191,6 @@ static int analyze_pattern(DbTableTree *tb, Eterm pattern, mpi->got_partial = 0; mpi->something_can_match = 0; mpi->mp = NULL; - mpi->all_objects = 1; mpi->save_term = NULL; for (lst = pattern; is_list(lst); lst = CDR(list_val(lst))) @@ -2264,7 +2240,6 @@ static int analyze_pattern(DbTableTree *tb, Eterm pattern, if (!is_list(body) || CDR(list_val(body)) != NIL || CAR(list_val(body)) != am_DollarUnderscore) { - mpi->all_objects = 0; } ++i; @@ -3339,8 +3314,7 @@ static int doit_select(DbTableTree *tb, TreeDbTerm *this, void *ptr, GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0))) { return 0; } - ret = db_match_dbterm(&tb->common,sc->p,sc->mp,sc->all_objects, - &this->dbterm, &hp, 2); + ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &this->dbterm, &hp, 2); if (is_value(ret)) { sc->accum = CONS(hp, ret, sc->accum); } @@ -3371,8 +3345,7 @@ static int doit_select_count(DbTableTree *tb, TreeDbTerm *this, void *ptr, GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0)) { return 0; } - ret = db_match_dbterm(&tb->common, sc->p, sc->mp, 0, - &this->dbterm, NULL, 0); + ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &this->dbterm, NULL, 0); if (ret == am_true) { ++(sc->got); } @@ -3401,8 +3374,7 @@ static int doit_select_chunk(DbTableTree *tb, TreeDbTerm *this, void *ptr, return 0; } - ret = db_match_dbterm(&tb->common, sc->p, sc->mp, sc->all_objects, - &this->dbterm, &hp, 2); + ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &this->dbterm, &hp, 2); if (is_value(ret)) { ++(sc->got); sc->accum = CONS(hp, ret, sc->accum); @@ -3437,8 +3409,7 @@ static int doit_select_delete(DbTableTree *tb, TreeDbTerm *this, void *ptr, cmp_partly_bound(sc->end_condition, GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0) return 0; - ret = db_match_dbterm(&tb->common, sc->p, sc->mp, 0, - &this->dbterm, NULL, 0); + ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &this->dbterm, NULL, 0); if (ret == am_true) { key = GETKEY(sc->tb, this->dbterm.tpl); linkout_tree(sc->tb, key); @@ -3465,8 +3436,7 @@ static int doit_select_replace(DbTableTree *tb, TreeDbTerm **this, void *ptr, GETKEY_WITH_POS(sc->keypos, (*this)->dbterm.tpl)) > 0)) { return 0; } - ret = db_match_dbterm(&tb->common, sc->p, sc->mp, 0, - &(*this)->dbterm, NULL, 0); + ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &(*this)->dbterm, NULL, 0); if (is_value(ret)) { TreeDbTerm* new; diff --git a/erts/emulator/beam/erl_db_util.c b/erts/emulator/beam/erl_db_util.c index ef22cda1f0..37d261d0df 100644 --- a/erts/emulator/beam/erl_db_util.c +++ b/erts/emulator/beam/erl_db_util.c @@ -5333,7 +5333,7 @@ void db_free_tmp_uncompressed(DbTerm* obj) } Eterm db_match_dbterm(DbTableCommon* tb, Process* c_p, Binary* bprog, - int all, DbTerm* obj, Eterm** hpp, Uint extra) + DbTerm* obj, Eterm** hpp, Uint extra) { enum erts_pam_run_flags flags; Uint32 dummy; diff --git a/erts/emulator/beam/erl_db_util.h b/erts/emulator/beam/erl_db_util.h index 6b126f35d6..73d242449e 100644 --- a/erts/emulator/beam/erl_db_util.h +++ b/erts/emulator/beam/erl_db_util.h @@ -32,6 +32,7 @@ ** DMC_DEBUG does NOT need DEBUG, but DEBUG needs DMC_DEBUG */ #define DMC_DEBUG 1 +#define ETS_DBG_FORCE_TRAP 1 #endif /* @@ -180,10 +181,9 @@ typedef struct db_table_method Eterm* ret); int (*db_take)(Process *, DbTable *, Eterm, Eterm *); - int (*db_delete_all_objects)(Process* p, - DbTable* db /* [in out] */ ); + SWord (*db_delete_all_objects)(Process* p, DbTable* db, SWord reds); - int (*db_free_table)(DbTable* db /* [in out] */ ); + int (*db_free_empty_table)(DbTable* db); SWord (*db_free_table_continue)(DbTable* db, SWord reds); void (*db_print)(fmtfn_t to, @@ -267,6 +267,10 @@ typedef struct db_table_common { Uint32 status; /* bit masks defined below */ int keypos; /* defaults to 1 */ int compress; + +#ifdef ETS_DBG_FORCE_TRAP + erts_atomic_t dbg_force_trap; /* &1 force enabled, &2 trap this call */ +#endif } DbTableCommon; /* These are status bit patterns */ @@ -281,9 +285,7 @@ typedef struct db_table_common { #define DB_FINE_LOCKED (1 << 8) /* write_concurrency */ #define DB_FREQ_READ (1 << 9) /* read_concurrency */ #define DB_NAMED_TABLE (1 << 10) - -#define ERTS_ETS_TABLE_TYPES (DB_BAG|DB_SET|DB_DUPLICATE_BAG|DB_ORDERED_SET\ - |DB_FINE_LOCKED|DB_FREQ_READ|DB_NAMED_TABLE) +#define DB_BUSY (1 << 11) #define IS_HASH_TABLE(Status) (!!((Status) & \ (DB_BAG | DB_SET | DB_DUPLICATE_BAG))) @@ -469,7 +471,7 @@ Binary *db_match_compile(Eterm *matchexpr, Eterm *guards, /* Returns newly allocated MatchProg binary with refc == 0*/ Eterm db_match_dbterm(DbTableCommon* tb, Process* c_p, Binary* bprog, - int all, DbTerm* obj, Eterm** hpp, Uint extra); + DbTerm* obj, Eterm** hpp, Uint extra); Eterm db_prog_match(Process *p, Process *self, Binary *prog, Eterm term, diff --git a/lib/stdlib/src/ets.erl b/lib/stdlib/src/ets.erl index 6a559f0be5..a35f79c0d9 100644 --- a/lib/stdlib/src/ets.erl +++ b/lib/stdlib/src/ets.erl @@ -77,7 +77,9 @@ whereis/1]). %% internal exports --export([internal_request_all/0]). +-export([internal_request_all/0, + internal_delete_all/2, + internal_select_delete/2]). -spec all() -> [Tab] when Tab :: tab(). @@ -116,7 +118,15 @@ delete(_, _) -> -spec delete_all_objects(Tab) -> true when Tab :: tab(). -delete_all_objects(_) -> +delete_all_objects(Tab) -> + _ = ets:internal_delete_all(Tab, undefined), + true. + +-spec internal_delete_all(Tab, undefined) -> NumDeleted when + Tab :: tab(), + NumDeleted :: non_neg_integer(). + +internal_delete_all(_, _) -> erlang:nif_error(undef). -spec delete_object(Tab, Object) -> true when @@ -378,7 +388,17 @@ select_count(_, _) -> MatchSpec :: match_spec(), NumDeleted :: non_neg_integer(). -select_delete(_, _) -> +select_delete(Tab, [{'_',[],[true]}]) -> + ets:internal_delete_all(Tab, undefined); +select_delete(Tab, MatchSpec) -> + ets:internal_select_delete(Tab, MatchSpec). + +-spec internal_select_delete(Tab, MatchSpec) -> NumDeleted when + Tab :: tab(), + MatchSpec :: match_spec(), + NumDeleted :: non_neg_integer(). + +internal_select_delete(_, _) -> erlang:nif_error(undef). -spec select_replace(Tab, MatchSpec) -> NumReplaced when diff --git a/lib/stdlib/test/ets_SUITE.erl b/lib/stdlib/test/ets_SUITE.erl index 02211fa8df..574aac96c8 100644 --- a/lib/stdlib/test/ets_SUITE.erl +++ b/lib/stdlib/test/ets_SUITE.erl @@ -87,6 +87,7 @@ -export([t_select_reverse/1]). +-include_lib("stdlib/include/ms_transform.hrl"). % ets:fun2ms -include_lib("common_test/include/ct.hrl"). -define(m(A,B), assert_eq(A,B)). @@ -173,10 +174,12 @@ groups() -> init_per_suite(Config) -> erts_debug:set_internal_state(available_internal_state, true), + erts_debug:set_internal_state(ets_force_trap, true), Config. end_per_suite(_Config) -> stop_spawn_logger(), + erts_debug:set_internal_state(ets_force_trap, false), catch erts_debug:set_internal_state(available_internal_state, false), ok. @@ -812,7 +815,60 @@ t_delete_all_objects_do(Opts) -> 4000 = ets:info(T,size), true = ets:delete_all_objects(T), 0 = ets:info(T,size), - ets:delete(T). + ets:delete(T), + + %% Test delete_all_objects is atomic + T2 = ets:new(t_delete_all_objects, [public | Opts]), + Self = self(), + Inserters = [spawn_link(fun() -> inserter(T2, 100*1000, 1, Self) end) || _ <- [1,2,3,4]], + [receive {Ipid, running} -> ok end || Ipid <- Inserters], + + ets:delete_all_objects(T2), + erlang:yield(), + [Ipid ! stop || Ipid <- Inserters], + Result = [receive {Ipid, stopped, Highest} -> {Ipid,Highest} end || Ipid <- Inserters], + + %% Verify unbroken sequences of objects inserted _after_ ets:delete_all_objects. + Sum = lists:foldl(fun({Ipid, Highest}, AccSum) -> + %% ets:fun2ms(fun({{K,Ipid}}) when K =< Highest -> true end), + AliveMS = [{{{'$1',Ipid}},[{'=<','$1',{const,Highest}}],[true]}], + Alive = ets:select_count(T2, AliveMS), + Lowest = Highest - (Alive-1), + + %% ets:fun2ms(fun({{K,Ipid}}) when K < Lowest -> true end) + DeletedMS = [{{{'$1',Ipid}},[{'<','$1',{const,Lowest}}],[true]}], + 0 = ets:select_count(T2, DeletedMS), + AccSum + Alive + end, + 0, + Result), + ok = case ets:info(T2, size) of + Sum -> ok; + Size -> + io:format("Sum = ~p\nSize = ~p\n", [Sum, Size]), + {Sum,Size} + end, + + ets:delete(T2). + +inserter(_, 0, _, _) -> + ok; +inserter(T, N, Next, Papa) -> + case Next of + 10*1000 -> + Papa ! {self(), running}; + _ -> + ok + end, + + ets:insert(T, {{Next, self()}}), + receive + stop -> + Papa ! {self(), stopped, Next}, + ok + after 0 -> + inserter(T, N-1, Next+1, Papa) + end. %% Test ets:delete_object/2. |