aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_db_hash.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_db_hash.c')
-rw-r--r--erts/emulator/beam/erl_db_hash.c3138
1 files changed, 1683 insertions, 1455 deletions
diff --git a/erts/emulator/beam/erl_db_hash.c b/erts/emulator/beam/erl_db_hash.c
index 5e6fe4f460..752d3ae3a8 100644
--- a/erts/emulator/beam/erl_db_hash.c
+++ b/erts/emulator/beam/erl_db_hash.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 1998-2016. All Rights Reserved.
+ * Copyright Ericsson AB 1998-2018. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
/*
** Implementation of unordered ETS tables.
** The tables are implemented as linear dynamic hash tables.
+** https://en.wikipedia.org/wiki/Linear_hashing
*/
/* SMP:
@@ -84,46 +85,47 @@
#include "erl_db_hash.h"
-#ifdef MYDEBUG /* Will fail test case ets_SUITE:memory */
-# define IF_DEBUG(x) x
-# define MY_ASSERT(x) ASSERT(x)
-#else
-# define IF_DEBUG(x)
-# define MY_ASSERT(x)
-#endif
-
/*
* The following symbols can be manipulated to "tune" the linear hash array
*/
-#define CHAIN_LEN 6 /* Medium bucket chain len */
+#define GROW_LIMIT(NACTIVE) ((NACTIVE)*1)
+#define SHRINK_LIMIT(NACTIVE) ((NACTIVE) / 2)
+
+/*
+** We want the first mandatory segment to be small (to reduce minimal footprint)
+** and larger extra segments (to reduce number of alloc/free calls).
+*/
-/* Number of slots per segment */
-#define SEGSZ_EXP 8
-#define SEGSZ (1 << SEGSZ_EXP)
-#define SEGSZ_MASK (SEGSZ-1)
+/* Number of slots in first segment */
+#define FIRST_SEGSZ_EXP 8
+#define FIRST_SEGSZ (1 << FIRST_SEGSZ_EXP)
+#define FIRST_SEGSZ_MASK (FIRST_SEGSZ - 1)
-#define NSEG_1 2 /* Size of first segment table (must be at least 2) */
+/* Number of slots per extra segment */
+#define EXT_SEGSZ_EXP 11
+#define EXT_SEGSZ (1 << EXT_SEGSZ_EXP)
+#define EXT_SEGSZ_MASK (EXT_SEGSZ-1)
+
+#define NSEG_1 (ErtsSizeofMember(DbTableHash,first_segtab) / sizeof(struct segment*))
#define NSEG_2 256 /* Size of second segment table */
#define NSEG_INC 128 /* Number of segments to grow after that */
-#ifdef ERTS_SMP
# define DB_USING_FINE_LOCKING(TB) (((TB))->common.type & DB_FINE_LOCKED)
-#else
-# define DB_USING_FINE_LOCKING(TB) 0
-#endif
#ifdef ETHR_ORDERED_READ_DEPEND
-#define SEGTAB(tb) ((struct segment**) erts_smp_atomic_read_nob(&(tb)->segtab))
+#define SEGTAB(tb) ((struct segment**) erts_atomic_read_nob(&(tb)->segtab))
#else
#define SEGTAB(tb) \
(DB_USING_FINE_LOCKING(tb) \
- ? ((struct segment**) erts_smp_atomic_read_ddrb(&(tb)->segtab)) \
- : ((struct segment**) erts_smp_atomic_read_nob(&(tb)->segtab)))
+ ? ((struct segment**) erts_atomic_read_ddrb(&(tb)->segtab)) \
+ : ((struct segment**) erts_atomic_read_nob(&(tb)->segtab)))
#endif
-#define NACTIVE(tb) ((int)erts_smp_atomic_read_nob(&(tb)->nactive))
-#define NITEMS(tb) ((int)erts_smp_atomic_read_nob(&(tb)->common.nitems))
+#define NACTIVE(tb) ((int)erts_atomic_read_nob(&(tb)->nactive))
+#define NITEMS(tb) ((int)erts_atomic_read_nob(&(tb)->common.nitems))
-#define BUCKET(tb, i) SEGTAB(tb)[(i) >> SEGSZ_EXP]->buckets[(i) & SEGSZ_MASK]
+#define SLOT_IX_TO_SEG_IX(i) (((i)+(EXT_SEGSZ-FIRST_SEGSZ)) >> EXT_SEGSZ_EXP)
+
+#define BUCKET(tb, i) SEGTAB(tb)[SLOT_IX_TO_SEG_IX(i)]->buckets[(i) & EXT_SEGSZ_MASK]
/*
* When deleting a table, the number of records to delete.
@@ -137,110 +139,129 @@
static ERTS_INLINE Uint hash_to_ix(DbTableHash* tb, HashValue hval)
{
Uint mask = (DB_USING_FINE_LOCKING(tb)
- ? erts_smp_atomic_read_acqb(&tb->szm)
- : erts_smp_atomic_read_nob(&tb->szm));
+ ? erts_atomic_read_acqb(&tb->szm)
+ : erts_atomic_read_nob(&tb->szm));
Uint ix = hval & mask;
- if (ix >= erts_smp_atomic_read_nob(&tb->nactive)) {
+ if (ix >= erts_atomic_read_nob(&tb->nactive)) {
ix &= mask>>1;
- ASSERT(ix < erts_smp_atomic_read_nob(&tb->nactive));
+ ASSERT(ix < erts_atomic_read_nob(&tb->nactive));
}
return ix;
}
-/* Remember a slot containing a pseudo-deleted item (INVALID_HASH)
- * Return false if we got raced by unfixing thread
- * and the object should be deleted for real.
- */
-static ERTS_INLINE int add_fixed_deletion(DbTableHash* tb, int ix,
- erts_aint_t fixated_by_me)
+
+static ERTS_INLINE FixedDeletion* alloc_fixdel(DbTableHash* tb)
{
- erts_aint_t was_next;
- erts_aint_t exp_next;
FixedDeletion* fixd = (FixedDeletion*) erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL,
- (DbTable *) tb,
- sizeof(FixedDeletion));
+ (DbTable *) tb,
+ sizeof(FixedDeletion));
ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion));
- fixd->slot = ix;
- was_next = erts_smp_atomic_read_acqb(&tb->fixdel);
+ return fixd;
+}
+
+static ERTS_INLINE void free_fixdel(DbTableHash* tb, FixedDeletion* fixd)
+{
+ erts_db_free(ERTS_ALC_T_DB_FIX_DEL, (DbTable*)tb,
+ fixd, sizeof(FixedDeletion));
+ ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion));
+}
+
+static ERTS_INLINE int link_fixdel(DbTableHash* tb,
+ FixedDeletion* fixd,
+ erts_aint_t fixated_by_me)
+{
+ erts_aint_t was_next;
+ erts_aint_t exp_next;
+
+ was_next = erts_atomic_read_acqb(&tb->fixdel);
do { /* Lockless atomic insertion in linked list: */
if (NFIXED(tb) <= fixated_by_me) {
- erts_db_free(ERTS_ALC_T_DB_FIX_DEL, (DbTable*)tb,
- fixd, sizeof(FixedDeletion));
+ free_fixdel(tb, fixd);
return 0; /* raced by unfixer */
}
exp_next = was_next;
fixd->next = (FixedDeletion*) exp_next;
- was_next = erts_smp_atomic_cmpxchg_mb(&tb->fixdel,
+ was_next = erts_atomic_cmpxchg_mb(&tb->fixdel,
(erts_aint_t) fixd,
exp_next);
}while (was_next != exp_next);
return 1;
}
+/* Remember a slot containing a pseudo-deleted item
+ * Return false if we got raced by unfixing thread
+ * and the object should be deleted for real.
+ */
+static int add_fixed_deletion(DbTableHash* tb, int ix,
+ erts_aint_t fixated_by_me)
+{
+ FixedDeletion* fixd = alloc_fixdel(tb);
+ fixd->slot = ix;
+ fixd->all = 0;
+ return link_fixdel(tb, fixd, fixated_by_me);
+}
+
+
+static ERTS_INLINE int is_pseudo_deleted(HashDbTerm* p)
+{
+ return p->pseudo_deleted;
+}
-#define MAX_HASH 0xEFFFFFFFUL
-#define INVALID_HASH 0xFFFFFFFFUL
/* optimised version of make_hash (normal case? atomic key) */
#define MAKE_HASH(term) \
((is_atom(term) ? (atom_tab(atom_val(term))->slot.bucket.hvalue) : \
- make_internal_hash(term)) % MAX_HASH)
+ make_internal_hash(term, 0)) & MAX_HASH_MASK)
-#ifdef ERTS_SMP
# define DB_HASH_LOCK_MASK (DB_HASH_LOCK_CNT-1)
# define GET_LOCK(tb,hval) (&(tb)->locks->lck_vec[(hval) & DB_HASH_LOCK_MASK].lck)
+# define GET_LOCK_MAYBE(tb,hval) ((tb)->common.is_thread_safe ? NULL : GET_LOCK(tb,hval))
/* Fine grained read lock */
-static ERTS_INLINE erts_smp_rwmtx_t* RLOCK_HASH(DbTableHash* tb, HashValue hval)
+static ERTS_INLINE erts_rwmtx_t* RLOCK_HASH(DbTableHash* tb, HashValue hval)
{
if (tb->common.is_thread_safe) {
return NULL;
} else {
- erts_smp_rwmtx_t* lck = GET_LOCK(tb,hval);
+ erts_rwmtx_t* lck = GET_LOCK(tb,hval);
ASSERT(tb->common.type & DB_FINE_LOCKED);
- erts_smp_rwmtx_rlock(lck);
+ erts_rwmtx_rlock(lck);
return lck;
}
}
/* Fine grained write lock */
-static ERTS_INLINE erts_smp_rwmtx_t* WLOCK_HASH(DbTableHash* tb, HashValue hval)
+static ERTS_INLINE erts_rwmtx_t* WLOCK_HASH(DbTableHash* tb, HashValue hval)
{
if (tb->common.is_thread_safe) {
return NULL;
} else {
- erts_smp_rwmtx_t* lck = GET_LOCK(tb,hval);
+ erts_rwmtx_t* lck = GET_LOCK(tb,hval);
ASSERT(tb->common.type & DB_FINE_LOCKED);
- erts_smp_rwmtx_rwlock(lck);
+ erts_rwmtx_rwlock(lck);
return lck;
}
}
-static ERTS_INLINE void RUNLOCK_HASH(erts_smp_rwmtx_t* lck)
+static ERTS_INLINE void RUNLOCK_HASH(erts_rwmtx_t* lck)
{
if (lck != NULL) {
- erts_smp_rwmtx_runlock(lck);
+ erts_rwmtx_runlock(lck);
}
}
-static ERTS_INLINE void WUNLOCK_HASH(erts_smp_rwmtx_t* lck)
+static ERTS_INLINE void WUNLOCK_HASH(erts_rwmtx_t* lck)
{
if (lck != NULL) {
- erts_smp_rwmtx_rwunlock(lck);
+ erts_rwmtx_rwunlock(lck);
}
}
-#else /* ERTS_SMP */
-# define RLOCK_HASH(tb,hval) NULL
-# define WLOCK_HASH(tb,hval) NULL
-# define RUNLOCK_HASH(lck) ((void)lck)
-# define WUNLOCK_HASH(lck) ((void)lck)
-#endif /* ERTS_SMP */
#ifdef ERTS_ENABLE_LOCK_CHECK
# define IFN_EXCL(tb,cmd) (((tb)->common.is_thread_safe) || (cmd))
-# define IS_HASH_RLOCKED(tb,hval) IFN_EXCL(tb,erts_smp_lc_rwmtx_is_rlocked(GET_LOCK(tb,hval)))
-# define IS_HASH_WLOCKED(tb,lck) IFN_EXCL(tb,erts_smp_lc_rwmtx_is_rwlocked(lck))
-# define IS_TAB_WLOCKED(tb) erts_smp_lc_rwmtx_is_rwlocked(&(tb)->common.rwlock)
+# define IS_HASH_RLOCKED(tb,hval) IFN_EXCL(tb,erts_lc_rwmtx_is_rlocked(GET_LOCK(tb,hval)))
+# define IS_HASH_WLOCKED(tb,lck) IFN_EXCL(tb,erts_lc_rwmtx_is_rwlocked(lck))
+# define IS_TAB_WLOCKED(tb) erts_lc_rwmtx_is_rwlocked(&(tb)->common.rwlock)
#else
# define IS_HASH_RLOCKED(tb,hval) (1)
# define IS_HASH_WLOCKED(tb,hval) (1)
@@ -253,47 +274,44 @@ static ERTS_INLINE void WUNLOCK_HASH(erts_smp_rwmtx_t* lck)
** Slot READ locks updated accordingly, unlocked if EOT.
*/
static ERTS_INLINE Sint next_slot(DbTableHash* tb, Uint ix,
- erts_smp_rwmtx_t** lck_ptr)
+ erts_rwmtx_t** lck_ptr)
{
-#ifdef ERTS_SMP
ix += DB_HASH_LOCK_CNT;
if (ix < NACTIVE(tb)) return ix;
RUNLOCK_HASH(*lck_ptr);
ix = (ix + 1) & DB_HASH_LOCK_MASK;
if (ix != 0) *lck_ptr = RLOCK_HASH(tb,ix);
return ix;
-#else
- return (++ix < NACTIVE(tb)) ? ix : 0;
-#endif
}
/* Same as next_slot but with WRITE locking */
static ERTS_INLINE Sint next_slot_w(DbTableHash* tb, Uint ix,
- erts_smp_rwmtx_t** lck_ptr)
+ erts_rwmtx_t** lck_ptr)
{
-#ifdef ERTS_SMP
ix += DB_HASH_LOCK_CNT;
if (ix < NACTIVE(tb)) return ix;
WUNLOCK_HASH(*lck_ptr);
ix = (ix + 1) & DB_HASH_LOCK_MASK;
if (ix != 0) *lck_ptr = WLOCK_HASH(tb,ix);
return ix;
-#else
- return next_slot(tb,ix,lck_ptr);
-#endif
}
-/*
- * Some special binary flags
- */
-#define BIN_FLAG_ALL_OBJECTS BIN_FLAG_USR1
-
static ERTS_INLINE void free_term(DbTableHash *tb, HashDbTerm* p)
{
db_free_term((DbTable*)tb, p, offsetof(HashDbTerm, dbterm));
}
+static ERTS_INLINE void free_term_list(DbTableHash *tb, HashDbTerm* p)
+{
+ while (p) {
+ HashDbTerm* next = p->next;
+ free_term(tb, p);
+ p = next;
+ }
+}
+
+
/*
* Local types
*/
@@ -303,9 +321,6 @@ struct mp_prefound {
};
struct mp_info {
- int all_objects; /* True if complete objects are always
- * returned from the match_spec (can use
- * copy_shallow on the return value) */
int something_can_match; /* The match_spec is not "impossible" */
int key_given;
struct mp_prefound dlists[10]; /* Default list of "pre-found" buckets */
@@ -318,83 +333,52 @@ struct mp_info {
/* A table segment */
struct segment {
- HashDbTerm* buckets[SEGSZ];
-#ifdef MYDEBUG
- int is_ext_segment;
-#endif
+ HashDbTerm* buckets[1];
};
+#define SIZEOF_SEGMENT(N) \
+ (offsetof(struct segment,buckets) + sizeof(HashDbTerm*)*(N))
-/* A segment that also contains a segment table */
-struct ext_segment {
- struct segment s; /* The segment itself. Must be first */
-
+/* An extended segment table */
+struct ext_segtab {
+ ErtsThrPrgrLaterOp lop;
struct segment** prev_segtab; /* Used when table is shrinking */
- int nsegs; /* Size of segtab */
+ int prev_nsegs; /* Size of prev_segtab */
+ int nsegs; /* Size of this segtab */
struct segment* segtab[1]; /* The segment table */
};
-#define SIZEOF_EXTSEG(NSEGS) \
- (offsetof(struct ext_segment,segtab) + sizeof(struct segment*)*(NSEGS))
-
-#if defined(DEBUG) || defined(VALGRIND)
-# define EXTSEG(SEGTAB_PTR) \
- ((struct ext_segment*) (((char*)(SEGTAB_PTR)) - offsetof(struct ext_segment,segtab)))
-#endif
+#define SIZEOF_EXT_SEGTAB(NSEGS) \
+ (offsetof(struct ext_segtab,segtab) + sizeof(struct segment*)*(NSEGS))
static ERTS_INLINE void SET_SEGTAB(DbTableHash* tb,
struct segment** segtab)
{
if (DB_USING_FINE_LOCKING(tb))
- erts_smp_atomic_set_wb(&tb->segtab, (erts_aint_t) segtab);
+ erts_atomic_set_wb(&tb->segtab, (erts_aint_t) segtab);
else
- erts_smp_atomic_set_nob(&tb->segtab, (erts_aint_t) segtab);
-#ifdef VALGRIND
- tb->top_ptr_to_segment_with_active_segtab = EXTSEG(segtab);
-#endif
+ erts_atomic_set_nob(&tb->segtab, (erts_aint_t) segtab);
}
-
-/* How the table segments relate to each other:
-
- ext_segment: ext_segment: "plain" segment
- #=================# #================# #=============#
- | bucket[0] |<--+ +------->| bucket[256] | +->| bucket[512] |
- | bucket[1] | | | | [257] | | | [513] |
- : : | | : : | : :
- | bucket[255] | | | | [511] | | | [767] |
- |-----------------| | | |----------------| | #=============#
- | prev_segtab=NULL| | | +--<---prev_segtab | |
- | nsegs = 2 | | | | | nsegs = 256 | |
-+->| segtab[0] -->-------+---|---|--<---segtab[0] |<-+ |
-| | segtab[1] -->-----------+---|--<---segtab[1] | | |
-| #=================# | | segtab[2] -->-----|--+ ext_segment:
-| | : : | #================#
-+----------------<---------------+ | segtab[255] ->----|----->| bucket[255*256]|
- #================# | | |
- | : :
- | |----------------|
- +----<---prev_segtab |
- : :
-*/
-
+/* Used by select_replace on analyze_pattern */
+typedef int (*extra_match_validator_t)(int keypos, Eterm match, Eterm guard, Eterm body);
/*
** Forward decl's (static functions)
*/
-static struct ext_segment* alloc_ext_seg(DbTableHash* tb, unsigned seg_ix,
- struct segment** old_segtab);
-static int alloc_seg(DbTableHash *tb);
+static struct ext_segtab* alloc_ext_segtab(DbTableHash* tb, unsigned seg_ix);
+static void alloc_seg(DbTableHash *tb);
static int free_seg(DbTableHash *tb, int free_records);
-static HashDbTerm* next(DbTableHash *tb, Uint *iptr, erts_smp_rwmtx_t** lck_ptr,
- HashDbTerm *list);
+static HashDbTerm* next_live(DbTableHash *tb, Uint *iptr, erts_rwmtx_t** lck_ptr,
+ HashDbTerm *list);
static HashDbTerm* search_list(DbTableHash* tb, Eterm key,
HashValue hval, HashDbTerm *list);
-static void shrink(DbTableHash* tb, int nactive);
-static void grow(DbTableHash* tb, int nactive);
+static void shrink(DbTableHash* tb, int nitems);
+static void grow(DbTableHash* tb, int nitems);
static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2,
Uint sz, DbTableHash*);
-static int analyze_pattern(DbTableHash *tb, Eterm pattern,
- struct mp_info *mpi);
+static int analyze_pattern(DbTableHash *tb, Eterm pattern,
+ extra_match_validator_t extra_validator, /* Optional callback */
+ struct mp_info *mpi);
/*
* Method interface functions
@@ -418,39 +402,44 @@ static int db_erase_object_hash(DbTable *tbl, Eterm object,Eterm *ret);
static int db_slot_hash(Process *p, DbTable *tbl,
Eterm slot_term, Eterm *ret);
-static int db_select_chunk_hash(Process *p, DbTable *tbl,
+static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Sint chunk_size,
int reverse, Eterm *ret);
-static int db_select_hash(Process *p, DbTable *tbl,
+static int db_select_hash(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, int reverse, Eterm *ret);
-static int db_select_count_hash(Process *p, DbTable *tbl,
- Eterm pattern, Eterm *ret);
-static int db_select_delete_hash(Process *p, DbTable *tbl,
- Eterm pattern, Eterm *ret);
-
-static int db_select_continue_hash(Process *p, DbTable *tbl,
+static int db_select_continue_hash(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret);
-static int db_select_count_continue_hash(Process *p, DbTable *tbl,
+static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret);
+static int db_select_count_continue_hash(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret);
+static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret);
static int db_select_delete_continue_hash(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret);
+
+static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret);
+static int db_select_replace_continue_hash(Process *p, DbTable *tbl,
+ Eterm continuation, Eterm *ret);
+
static int db_take_hash(Process *, DbTable *, Eterm, Eterm *);
-static void db_print_hash(int to,
+static void db_print_hash(fmtfn_t to,
void *to_arg,
int show,
DbTable *tbl);
-static int db_free_table_hash(DbTable *tbl);
+static int db_free_empty_table_hash(DbTable *tbl);
-static int db_free_table_continue_hash(DbTable *tbl);
+static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds);
static void db_foreach_offheap_hash(DbTable *,
void (*)(ErlOffHeap *, void *),
void *);
-static int db_delete_all_objects_hash(Process* p, DbTable* tbl);
+static SWord db_delete_all_objects_hash(Process* p, DbTable* tbl, SWord reds);
#ifdef HARDDEBUG
static void db_check_table_hash(DbTableHash *tb);
#endif
@@ -463,9 +452,10 @@ db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle);
static ERTS_INLINE void try_shrink(DbTableHash* tb)
{
int nactive = NACTIVE(tb);
- if (nactive > SEGSZ && NITEMS(tb) < (nactive * CHAIN_LEN)
+ int nitems = NITEMS(tb);
+ if (nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive)
&& !IS_FIXED(tb)) {
- shrink(tb, nactive);
+ shrink(tb, nitems);
}
}
@@ -474,7 +464,8 @@ static ERTS_INLINE void try_shrink(DbTableHash* tb)
static ERTS_INLINE int has_live_key(DbTableHash* tb, HashDbTerm* b,
Eterm key, HashValue hval)
{
- if (b->hvalue != hval) return 0;
+ if (b->hvalue != hval || is_pseudo_deleted(b))
+ return 0;
else {
Eterm itemKey = GETKEY(tb, b->dbterm.tpl);
ASSERT(!is_header(itemKey));
@@ -487,7 +478,8 @@ static ERTS_INLINE int has_live_key(DbTableHash* tb, HashDbTerm* b,
static ERTS_INLINE int has_key(DbTableHash* tb, HashDbTerm* b,
Eterm key, HashValue hval)
{
- if (b->hvalue != hval && b->hvalue != INVALID_HASH) return 0;
+ if (b->hvalue != hval)
+ return 0;
else {
Eterm itemKey = GETKEY(tb, b->dbterm.tpl);
ASSERT(!is_header(itemKey));
@@ -547,17 +539,14 @@ DbTableMethod db_hash =
db_select_delete_continue_hash,
db_select_count_hash,
db_select_count_continue_hash,
+ db_select_replace_hash,
+ db_select_replace_continue_hash,
db_take_hash,
db_delete_all_objects_hash,
- db_free_table_hash,
+ db_free_empty_table_hash,
db_free_table_continue_hash,
db_print_hash,
db_foreach_offheap_hash,
-#ifdef HARDDEBUG
- db_check_table_hash,
-#else
- NULL,
-#endif
db_lookup_dbterm_hash,
db_finalize_dbterm_hash
};
@@ -579,7 +568,7 @@ static void restore_fixdel(DbTableHash* tb, FixedDeletion* fixdel)
{
/*int tries = 0;*/
DEBUG_WAIT();
- if (erts_smp_atomic_cmpxchg_relb(&tb->fixdel,
+ if (erts_atomic_cmpxchg_relb(&tb->fixdel,
(erts_aint_t) fixdel,
(erts_aint_t) NULL) != (erts_aint_t) NULL) {
/* Oboy, must join lists */
@@ -588,13 +577,13 @@ static void restore_fixdel(DbTableHash* tb, FixedDeletion* fixdel)
erts_aint_t exp_tail;
while (last->next != NULL) last = last->next;
- was_tail = erts_smp_atomic_read_acqb(&tb->fixdel);
+ was_tail = erts_atomic_read_acqb(&tb->fixdel);
do { /* Lockless atomic list insertion */
exp_tail = was_tail;
last->next = (FixedDeletion*) exp_tail;
/*++tries;*/
DEBUG_WAIT();
- was_tail = erts_smp_atomic_cmpxchg_relb(&tb->fixdel,
+ was_tail = erts_atomic_cmpxchg_relb(&tb->fixdel,
(erts_aint_t) fixdel,
exp_tail);
}while (was_tail != exp_tail);
@@ -605,97 +594,110 @@ static void restore_fixdel(DbTableHash* tb, FixedDeletion* fixdel)
** Table interface routines ie what's called by the bif's
*/
-void db_unfix_table_hash(DbTableHash *tb)
+SWord db_unfix_table_hash(DbTableHash *tb)
{
FixedDeletion* fixdel;
+ SWord work = 0;
- ERTS_SMP_LC_ASSERT(erts_smp_lc_rwmtx_is_rwlocked(&tb->common.rwlock)
- || (erts_smp_lc_rwmtx_is_rlocked(&tb->common.rwlock)
- && !tb->common.is_thread_safe));
+ ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock)
+ || (erts_lc_rwmtx_is_rlocked(&tb->common.rwlock)
+ && !tb->common.is_thread_safe));
restart:
- fixdel = (FixedDeletion*) erts_smp_atomic_xchg_mb(&tb->fixdel,
- (erts_aint_t) NULL);
- while (fixdel != NULL) {
- FixedDeletion *fx = fixdel;
- int ix = fx->slot;
- HashDbTerm **bp;
- HashDbTerm *b;
- erts_smp_rwmtx_t* lck = WLOCK_HASH(tb,ix);
-
- if (IS_FIXED(tb)) { /* interrupted by fixer */
- WUNLOCK_HASH(lck);
- restore_fixdel(tb,fixdel);
- if (!IS_FIXED(tb)) {
- goto restart; /* unfixed again! */
- }
- return;
- }
- if (ix < NACTIVE(tb)) {
- bp = &BUCKET(tb, ix);
- b = *bp;
-
- while (b != NULL) {
- if (b->hvalue == INVALID_HASH) {
- *bp = b->next;
- free_term(tb, b);
- b = *bp;
- } else {
- bp = &b->next;
- b = b->next;
- }
- }
- }
- /* else slot has been joined and purged by shrink() */
- WUNLOCK_HASH(lck);
- fixdel = fx->next;
- erts_db_free(ERTS_ALC_T_DB_FIX_DEL,
- (DbTable *) tb,
- (void *) fx,
- sizeof(FixedDeletion));
- ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion));
+ fixdel = (FixedDeletion*) erts_atomic_xchg_mb(&tb->fixdel,
+ (erts_aint_t) NULL);
+ while (fixdel) {
+ FixedDeletion *free_me;
+
+ do {
+ HashDbTerm **bp;
+ HashDbTerm *b;
+ HashDbTerm *free_us = NULL;
+ erts_rwmtx_t* lck;
+
+ lck = WLOCK_HASH(tb, fixdel->slot);
+
+ if (IS_FIXED(tb)) { /* interrupted by fixer */
+ WUNLOCK_HASH(lck);
+ restore_fixdel(tb,fixdel);
+ if (!IS_FIXED(tb)) {
+ goto restart; /* unfixed again! */
+ }
+ return work;
+ }
+ if (fixdel->slot < NACTIVE(tb)) {
+ bp = &BUCKET(tb, fixdel->slot);
+ b = *bp;
+
+ while (b != NULL) {
+ if (is_pseudo_deleted(b)) {
+ HashDbTerm* nxt = b->next;
+ b->next = free_us;
+ free_us = b;
+ work++;
+ b = *bp = nxt;
+ } else {
+ bp = &b->next;
+ b = b->next;
+ }
+ }
+ }
+ /* else slot has been joined and purged by shrink() */
+ WUNLOCK_HASH(lck);
+ free_term_list(tb, free_us);
+
+ }while (fixdel->all && fixdel->slot-- > 0);
+
+ free_me = fixdel;
+ fixdel = fixdel->next;
+ free_fixdel(tb, free_me);
+ work++;
}
/* ToDo: Maybe try grow/shrink the table as well */
+
+ return work;
}
int db_create_hash(Process *p, DbTable *tbl)
{
DbTableHash *tb = &tbl->hash;
- erts_smp_atomic_init_nob(&tb->szm, SEGSZ_MASK);
- erts_smp_atomic_init_nob(&tb->nactive, SEGSZ);
- erts_smp_atomic_init_nob(&tb->fixdel, (erts_aint_t)NULL);
- erts_smp_atomic_init_nob(&tb->segtab, (erts_aint_t)NULL);
- SET_SEGTAB(tb, alloc_ext_seg(tb,0,NULL)->segtab);
+ erts_atomic_init_nob(&tb->szm, FIRST_SEGSZ_MASK);
+ erts_atomic_init_nob(&tb->nactive, FIRST_SEGSZ);
+ erts_atomic_init_nob(&tb->fixdel, (erts_aint_t)NULL);
+ erts_atomic_init_nob(&tb->segtab, (erts_aint_t)NULL);
+ SET_SEGTAB(tb, tb->first_segtab);
tb->nsegs = NSEG_1;
- tb->nslots = SEGSZ;
+ tb->nslots = FIRST_SEGSZ;
+ tb->first_segtab[0] = (struct segment*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
+ (DbTable *) tb,
+ SIZEOF_SEGMENT(FIRST_SEGSZ));
+ sys_memset(tb->first_segtab[0], 0, SIZEOF_SEGMENT(FIRST_SEGSZ));
- erts_smp_atomic_init_nob(&tb->is_resizing, 0);
-#ifdef ERTS_SMP
+ erts_atomic_init_nob(&tb->is_resizing, 0);
if (tb->common.type & DB_FINE_LOCKED) {
- erts_smp_rwmtx_opt_t rwmtx_opt = ERTS_SMP_RWMTX_OPT_DEFAULT_INITER;
+ erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
int i;
if (tb->common.type & DB_FREQ_READ)
- rwmtx_opt.type = ERTS_SMP_RWMTX_TYPE_FREQUENT_READ;
+ rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
if (erts_ets_rwmtx_spin_count >= 0)
rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
- tb->locks = (DbTableHashFineLocks*) erts_db_alloc_fnf(ERTS_ALC_T_DB_SEG, /* Other type maybe? */
- (DbTable *) tb,
- sizeof(DbTableHashFineLocks));
+ tb->locks = (DbTableHashFineLocks*) erts_db_alloc(ERTS_ALC_T_DB_SEG, /* Other type maybe? */
+ (DbTable *) tb,
+ sizeof(DbTableHashFineLocks));
for (i=0; i<DB_HASH_LOCK_CNT; ++i) {
- erts_smp_rwmtx_init_opt_x(&tb->locks->lck_vec[i].lck, &rwmtx_opt,
- "db_hash_slot", make_small(i));
+ erts_rwmtx_init_opt(&tb->locks->lck_vec[i].lck, &rwmtx_opt,
+ "db_hash_slot", tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB);
}
- /* This important property is needed to guarantee that the buckets
+ /* This important property is needed to guarantee the two buckets
* involved in a grow/shrink operation it protected by the same lock:
*/
- ASSERT(erts_smp_atomic_read_nob(&tb->nactive) % DB_HASH_LOCK_CNT == 0);
+ ASSERT(erts_atomic_read_nob(&tb->nactive) % DB_HASH_LOCK_CNT == 0);
}
else { /* coarse locking */
tb->locks = NULL;
}
ERTS_THR_MEMORY_BARRIER;
-#endif /* ERST_SMP */
return DB_ERROR_NONE;
}
@@ -703,22 +705,12 @@ static int db_first_hash(Process *p, DbTable *tbl, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
Uint ix = 0;
- erts_smp_rwmtx_t* lck = RLOCK_HASH(tb,ix);
+ erts_rwmtx_t* lck = RLOCK_HASH(tb,ix);
HashDbTerm* list;
- for (;;) {
- list = BUCKET(tb,ix);
- if (list != NULL) {
- if (list->hvalue == INVALID_HASH) {
- list = next(tb,&ix,&lck,list);
- }
- break;
- }
- if ((ix=next_slot(tb,ix,&lck)) == 0) {
- list = NULL;
- break;
- }
- }
+ list = BUCKET(tb,ix);
+ list = next_live(tb, &ix, &lck, list);
+
if (list != NULL) {
*ret = db_copy_key(p, tbl, &list->dbterm);
RUNLOCK_HASH(lck);
@@ -736,7 +728,7 @@ static int db_next_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
HashValue hval;
Uint ix;
HashDbTerm* b;
- erts_smp_rwmtx_t* lck;
+ erts_rwmtx_t* lck;
hval = MAKE_HASH(key);
lck = RLOCK_HASH(tb,hval);
@@ -755,13 +747,13 @@ static int db_next_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
}
/* Key found */
- b = next(tb, &ix, &lck, b);
+ b = next_live(tb, &ix, &lck, b->next);
if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
while (b != 0) {
if (!has_live_key(tb, b, key, hval)) {
break;
}
- b = next(tb, &ix, &lck, b);
+ b = next_live(tb, &ix, &lck, b->next);
}
}
if (b == NULL) {
@@ -783,7 +775,7 @@ int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail)
HashDbTerm** bp;
HashDbTerm* b;
HashDbTerm* q;
- erts_smp_rwmtx_t* lck;
+ erts_rwmtx_t* lck;
int nitems;
int ret = DB_ERROR_NONE;
@@ -808,8 +800,9 @@ int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail)
*/
if (tb->common.status & DB_SET) {
HashDbTerm* bnext = b->next;
- if (b->hvalue == INVALID_HASH) {
- erts_smp_atomic_inc_nob(&tb->common.nitems);
+ if (is_pseudo_deleted(b)) {
+ erts_atomic_inc_nob(&tb->common.nitems);
+ b->pseudo_deleted = 0;
}
else if (key_clash_fail) {
ret = DB_ERROR_BADKEY;
@@ -817,14 +810,14 @@ int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail)
}
q = replace_dbterm(tb, b, obj);
q->next = bnext;
- q->hvalue = hval; /* In case of INVALID_HASH */
+ ASSERT(q->hvalue == hval);
*bp = q;
goto Ldone;
}
else if (key_clash_fail) { /* && (DB_BAG || DB_DUPLICATE_BAG) */
q = b;
do {
- if (q->hvalue != INVALID_HASH) {
+ if (!is_pseudo_deleted(q)) {
ret = DB_ERROR_BADKEY;
goto Ldone;
}
@@ -836,9 +829,10 @@ int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail)
q = b;
do {
if (db_eq(&tb->common,obj,&q->dbterm)) {
- if (q->hvalue == INVALID_HASH) {
- erts_smp_atomic_inc_nob(&tb->common.nitems);
- q->hvalue = hval;
+ if (is_pseudo_deleted(q)) {
+ erts_atomic_inc_nob(&tb->common.nitems);
+ q->pseudo_deleted = 0;
+ ASSERT(q->hvalue == hval);
if (q != b) { /* must move to preserve key insertion order */
*qp = q->next;
q->next = b;
@@ -856,17 +850,17 @@ int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail)
Lnew:
q = new_dbterm(tb, obj);
q->hvalue = hval;
+ q->pseudo_deleted = 0;
q->next = b;
*bp = q;
- nitems = erts_smp_atomic_inc_read_nob(&tb->common.nitems);
+ nitems = erts_atomic_inc_read_nob(&tb->common.nitems);
WUNLOCK_HASH(lck);
{
int nactive = NACTIVE(tb);
- if (nitems > nactive * (CHAIN_LEN+1) && !IS_FIXED(tb)) {
- grow(tb, nactive);
+ if (nitems > GROW_LIMIT(nactive) && !IS_FIXED(tb)) {
+ grow(tb, nitems);
}
}
- CHECK_TABLES();
return DB_ERROR_NONE;
Ldone:
@@ -884,14 +878,13 @@ get_term_list(Process *p, DbTableHash *tb, Eterm key, HashValue hval,
if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
while (b2 && has_key(tb, b2, key, hval)) {
- if (b2->hvalue != INVALID_HASH)
+ if (!is_pseudo_deleted(b2))
sz += b2->dbterm.size + 2;
b2 = b2->next;
}
}
copy = build_term_list(p, b1, b2, sz, tb);
- CHECK_TABLES();
if (bend) {
*bend = b2;
}
@@ -904,7 +897,7 @@ int db_get_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
HashValue hval;
int ix;
HashDbTerm* b;
- erts_smp_rwmtx_t* lck;
+ erts_rwmtx_t* lck;
hval = MAKE_HASH(key);
lck = RLOCK_HASH(tb,hval);
@@ -923,70 +916,6 @@ done:
RUNLOCK_HASH(lck);
return DB_ERROR_NONE;
}
-
-int db_get_element_array(DbTable *tbl,
- Eterm key,
- int ndex,
- Eterm *ret,
- int *num_ret)
-{
- DbTableHash *tb = &tbl->hash;
- HashValue hval;
- int ix;
- HashDbTerm* b1;
- int num = 0;
- int retval;
- erts_smp_rwmtx_t* lck;
-
- ASSERT(!IS_FIXED(tbl)); /* no support for fixed tables here */
-
- hval = MAKE_HASH(key);
- lck = RLOCK_HASH(tb, hval);
- ix = hash_to_ix(tb, hval);
- b1 = BUCKET(tb, ix);
-
- while(b1 != 0) {
- if (has_live_key(tb,b1,key,hval)) {
- if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
- HashDbTerm* b;
- HashDbTerm* b2 = b1->next;
-
- while(b2 != NULL && has_live_key(tb,b2,key,hval)) {
- if (ndex > arityval(b2->dbterm.tpl[0])) {
- retval = DB_ERROR_BADITEM;
- goto done;
- }
- b2 = b2->next;
- }
-
- b = b1;
- while(b != b2) {
- if (num < *num_ret) {
- ret[num++] = b->dbterm.tpl[ndex];
- } else {
- retval = DB_ERROR_NONE;
- goto done;
- }
- b = b->next;
- }
- *num_ret = num;
- }
- else {
- ASSERT(*num_ret > 0);
- ret[0] = b1->dbterm.tpl[ndex];
- *num_ret = 1;
- }
- retval = DB_ERROR_NONE;
- goto done;
- }
- b1 = b1->next;
- }
- retval = DB_ERROR_BADKEY;
-done:
- RUNLOCK_HASH(lck);
- return retval;
-}
-
static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret)
{
@@ -994,7 +923,7 @@ static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret)
HashValue hval;
int ix;
HashDbTerm* b1;
- erts_smp_rwmtx_t* lck;
+ erts_rwmtx_t* lck;
hval = MAKE_HASH(key);
ix = hash_to_ix(tb, hval);
@@ -1023,7 +952,7 @@ static int db_get_element_hash(Process *p, DbTable *tbl,
HashValue hval;
int ix;
HashDbTerm* b1;
- erts_smp_rwmtx_t* lck;
+ erts_rwmtx_t* lck;
int retval;
hval = MAKE_HASH(key);
@@ -1045,7 +974,7 @@ static int db_get_element_hash(Process *p, DbTable *tbl,
while(b2 != NULL && has_key(tb,b2,key,hval)) {
if (ndex > arityval(b2->dbterm.tpl[0])
- && b2->hvalue != INVALID_HASH) {
+ && !is_pseudo_deleted(b2)) {
retval = DB_ERROR_BADITEM;
goto done;
}
@@ -1053,7 +982,7 @@ static int db_get_element_hash(Process *p, DbTable *tbl,
}
b = b1;
while(b != b2) {
- if (b->hvalue != INVALID_HASH) {
+ if (!is_pseudo_deleted(b)) {
Eterm *hp;
Eterm copy = db_copy_element_from_ets(&tb->common, p,
&b->dbterm, ndex, &hp, 2);
@@ -1079,54 +1008,6 @@ done:
}
/*
- * Very internal interface, removes elements of arity two from
- * BAG. Used for the PID meta table
- */
-int db_erase_bag_exact2(DbTable *tbl, Eterm key, Eterm value)
-{
- DbTableHash *tb = &tbl->hash;
- HashValue hval;
- int ix;
- HashDbTerm** bp;
- HashDbTerm* b;
- erts_smp_rwmtx_t* lck;
- int found = 0;
-
- hval = MAKE_HASH(key);
- lck = WLOCK_HASH(tb,hval);
- ix = hash_to_ix(tb, hval);
- bp = &BUCKET(tb, ix);
- b = *bp;
-
- ASSERT(!IS_FIXED(tb));
- ASSERT((tb->common.status & DB_BAG));
- ASSERT(!tb->common.compress);
-
- while(b != 0) {
- if (has_live_key(tb,b,key,hval)) {
- found = 1;
- if ((arityval(b->dbterm.tpl[0]) == 2) &&
- EQ(value, b->dbterm.tpl[2])) {
- *bp = b->next;
- free_term(tb, b);
- erts_smp_atomic_dec_nob(&tb->common.nitems);
- b = *bp;
- break;
- }
- } else if (found) {
- break;
- }
- bp = &b->next;
- b = b->next;
- }
- WUNLOCK_HASH(lck);
- if (found) {
- try_shrink(tb);
- }
- return DB_ERROR_NONE;
-}
-
-/*
** NB, this is for the db_erase/2 bif.
*/
int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret)
@@ -1136,7 +1017,8 @@ int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret)
int ix;
HashDbTerm** bp;
HashDbTerm* b;
- erts_smp_rwmtx_t* lck;
+ HashDbTerm* free_us = NULL;
+ erts_rwmtx_t* lck;
int nitems_diff = 0;
hval = MAKE_HASH(key);
@@ -1151,16 +1033,17 @@ int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret)
if (nitems_diff == -1 && IS_FIXED(tb)
&& add_fixed_deletion(tb, ix, 0)) {
/* Pseudo remove (no need to keep several of same key) */
- b->hvalue = INVALID_HASH;
+ b->pseudo_deleted = 1;
} else {
- *bp = b->next;
- free_term(tb, b);
- b = *bp;
+ HashDbTerm* next = b->next;
+ b->next = free_us;
+ free_us = b;
+ b = *bp = next;
continue;
}
}
else {
- if (nitems_diff && b->hvalue != INVALID_HASH)
+ if (nitems_diff && !is_pseudo_deleted(b))
break;
}
bp = &b->next;
@@ -1168,9 +1051,10 @@ int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret)
}
WUNLOCK_HASH(lck);
if (nitems_diff) {
- erts_smp_atomic_add_nob(&tb->common.nitems, nitems_diff);
+ erts_atomic_add_nob(&tb->common.nitems, nitems_diff);
try_shrink(tb);
}
+ free_term_list(tb, free_us);
*ret = am_true;
return DB_ERROR_NONE;
}
@@ -1185,7 +1069,8 @@ static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret)
int ix;
HashDbTerm** bp;
HashDbTerm* b;
- erts_smp_rwmtx_t* lck;
+ HashDbTerm* free_us = NULL;
+ erts_rwmtx_t* lck;
int nitems_diff = 0;
int nkeys = 0;
Eterm key;
@@ -1203,13 +1088,14 @@ static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret)
if (db_eq(&tb->common,object, &b->dbterm)) {
--nitems_diff;
if (nkeys==1 && IS_FIXED(tb) && add_fixed_deletion(tb,ix,0)) {
- b->hvalue = INVALID_HASH; /* Pseudo remove */
+ b->pseudo_deleted = 1;
bp = &b->next;
b = b->next;
} else {
- *bp = b->next;
- free_term(tb, b);
- b = *bp;
+ HashDbTerm* next = b->next;
+ b->next = free_us;
+ free_us = b;
+ b = *bp = next;
}
if (tb->common.status & (DB_DUPLICATE_BAG)) {
continue;
@@ -1218,7 +1104,7 @@ static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret)
}
}
}
- else if (nitems_diff && b->hvalue != INVALID_HASH) {
+ else if (nitems_diff && !is_pseudo_deleted(b)) {
break;
}
bp = &b->next;
@@ -1226,9 +1112,10 @@ static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret)
}
WUNLOCK_HASH(lck);
if (nitems_diff) {
- erts_smp_atomic_add_nob(&tb->common.nitems, nitems_diff);
+ erts_atomic_add_nob(&tb->common.nitems, nitems_diff);
try_shrink(tb);
}
+ free_term_list(tb, free_us);
*ret = am_true;
return DB_ERROR_NONE;
}
@@ -1237,7 +1124,7 @@ static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret)
static int db_slot_hash(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
- erts_smp_rwmtx_t* lck;
+ erts_rwmtx_t* lck;
Sint slot;
int retval;
int nactive;
@@ -1264,831 +1151,1136 @@ static int db_slot_hash(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret)
/*
- * This is just here so I can take care of the return value
- * that is to be sent during a trap (the BIF_TRAP macros explicitly returns)
+ * Match traversal callbacks
*/
-static BIF_RETTYPE bif_trap1(Export *bif,
- Process *p,
- Eterm p1)
+
+typedef struct match_callbacks_t_ match_callbacks_t;
+struct match_callbacks_t_
{
- BIF_TRAP1(bif, p, p1);
-}
-
+/* Called when no match is possible.
+ * context_ptr: Pointer to context
+ * ret: Pointer to traversal function term return.
+ *
+ * Both the direct return value and 'ret' are used as the traversal function return values.
+ */
+ int (*on_nothing_can_match)(match_callbacks_t* ctx, Eterm* ret);
+
+/* Called for each match result.
+ * context_ptr: Pointer to context
+ * slot_ix: Current slot index
+ * current_ptr_ptr: Triple pointer to either the bucket or the 'next' pointer in the previous element;
+ * can be (carefully) used to adjust iteration when deleting or replacing elements.
+ * match_res: The result of running the match program against the current term.
+ *
+ * Should return 1 for successful match, 0 otherwise.
+ */
+ int (*on_match_res)(match_callbacks_t* ctx, Sint slot_ix,
+ HashDbTerm*** current_ptr_ptr, Eterm match_res);
+
+/* Called when either we've matched enough elements in this cycle or EOT was reached.
+ * context_ptr: Pointer to context
+ * slot_ix: Current slot index
+ * got: How many elements have been matched so far
+ * iterations_left: Number of intended iterations (down from an initial max.) left in this traversal cycle
+ * mpp: Double pointer to the compiled match program
+ * ret: Pointer to traversal function term return.
+ *
+ * Both the direct return value and 'ret' are used as the traversal function return values.
+ * If *mpp is set to NULL, it won't be deallocated (useful for trapping.)
+ */
+ int (*on_loop_ended)(match_callbacks_t* ctx, Sint slot_ix, Sint got,
+ Sint iterations_left, Binary** mpp, Eterm* ret);
+
+/* Called when it's time to trap
+ * context_ptr: Pointer to context
+ * slot_ix: Current slot index
+ * got: How many elements have been matched so far
+ * mpp: Double pointer to the compiled match program
+ * ret: Pointer to traversal function term return.
+ *
+ * Both the direct return value and 'ret' are used as the traversal function return values.
+ * If *mpp is set to NULL, it won't be deallocated (useful for trapping.)
+ */
+ int (*on_trap)(match_callbacks_t* ctx, Sint slot_ix, Sint got, Binary** mpp,
+ Eterm* ret);
+
+};
+
+
/*
- * Continue collecting select matches, this may happen either due to a trap
- * or when the user calls ets:select/1
+ * Begin hash table match traversal
*/
-static int db_select_continue_hash(Process *p,
- DbTable *tbl,
- Eterm continuation,
- Eterm *ret)
+static int match_traverse(Process* p, DbTableHash* tb,
+ Eterm pattern,
+ extra_match_validator_t extra_match_validator, /* Optional */
+ Sint chunk_size, /* If 0, no chunking */
+ Sint iterations_left, /* Nr. of iterations left */
+ Eterm** hpp, /* Heap */
+ int lock_for_write, /* Set to 1 if we're going to delete or
+ modify existing terms */
+ match_callbacks_t* ctx,
+ Eterm* ret)
{
- DbTableHash *tb = &tbl->hash;
- Sint slot_ix;
- Sint save_slot_ix;
- Sint chunk_size;
- int all_objects;
- Binary *mp;
- int num_left = 1000;
- HashDbTerm *current = 0;
- Eterm match_list;
- Eterm *hp;
+ Sint slot_ix; /* Slot index */
+ HashDbTerm** current_ptr; /* Refers to either the bucket pointer or
+ * the 'next' pointer in the previous term
+ */
+ HashDbTerm* saved_current; /* Helper to avoid double skip on match */
+ struct mp_info mpi;
+ unsigned current_list_pos = 0; /* Prefound buckets list index */
Eterm match_res;
- Sint got;
- Eterm *tptr;
- erts_smp_rwmtx_t* lck;
+ Sint got = 0; /* Matched terms counter */
+ erts_rwmtx_t* lck; /* Slot lock */
+ int ret_value;
+ erts_rwmtx_t* (*lock_hash_function)(DbTableHash*, HashValue)
+ = (lock_for_write ? WLOCK_HASH : RLOCK_HASH);
+ void (*unlock_hash_function)(erts_rwmtx_t*)
+ = (lock_for_write ? WUNLOCK_HASH : RUNLOCK_HASH);
+ Sint (*next_slot_function)(DbTableHash*, Uint, erts_rwmtx_t**)
+ = (lock_for_write ? next_slot_w : next_slot);
+
+ if ((ret_value = analyze_pattern(tb, pattern, extra_match_validator, &mpi))
+ != DB_ERROR_NONE)
+ {
+ *ret = NIL;
+ goto done;
+ }
-#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);
+ if (!mpi.something_can_match) {
+ /* Can't possibly match anything */
+ ret_value = ctx->on_nothing_can_match(ctx, ret);
+ goto done;
+ }
- /* Decode continuation. We know it's a tuple but not the arity or anything else */
+ /*
+ * Look for initial slot / bucket
+ */
+ if (!mpi.key_given) {
+ /* Run this code if pattern is variable or GETKEY(pattern) */
+ /* is a variable */
+ slot_ix = 0;
+ lck = lock_hash_function(tb,slot_ix);
+ for (;;) {
+ ASSERT(slot_ix < NACTIVE(tb));
+ if (*(current_ptr = &BUCKET(tb,slot_ix)) != NULL) {
+ break;
+ }
+ slot_ix = next_slot_function(tb,slot_ix,&lck);
+ if (slot_ix == 0) {
+ ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left,
+ &mpi.mp, ret);
+ goto done;
+ }
+ }
+ } else {
+ /* We have at least one */
+ slot_ix = mpi.lists[current_list_pos].ix;
+ lck = lock_hash_function(tb, slot_ix);
+ current_ptr = mpi.lists[current_list_pos].bucket;
+ ASSERT(*current_ptr == BUCKET(tb,slot_ix));
+ ++current_list_pos;
+ }
- tptr = tuple_val(continuation);
+ /*
+ * Execute traversal cycle
+ */
+ for(;;) {
+ if (*current_ptr != NULL) {
+ if (!is_pseudo_deleted(*current_ptr)) {
+ match_res = db_match_dbterm(&tb->common, p, mpi.mp,
+ &(*current_ptr)->dbterm, hpp, 2);
+ saved_current = *current_ptr;
+ if (ctx->on_match_res(ctx, slot_ix, &current_ptr, match_res)) {
+ ++got;
+ }
+ --iterations_left;
+ if (*current_ptr != saved_current) {
+ /* Don't advance to next, the callback did it already */
+ continue;
+ }
+ }
+ current_ptr = &((*current_ptr)->next);
+ }
+ else if (mpi.key_given) { /* Key is bound */
+ unlock_hash_function(lck);
+ if (current_list_pos == mpi.num_lists) {
+ ret_value = ctx->on_loop_ended(ctx, -1, got, iterations_left, &mpi.mp, ret);
+ goto done;
+ } else {
+ slot_ix = mpi.lists[current_list_pos].ix;
+ lck = lock_hash_function(tb, slot_ix);
+ current_ptr = mpi.lists[current_list_pos].bucket;
+ ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix));
+ ++current_list_pos;
+ }
+ }
+ else { /* Key is variable */
+ if ((slot_ix = next_slot_function(tb,slot_ix,&lck)) == 0) {
+ slot_ix = -1;
+ break;
+ }
+ if (chunk_size && got >= chunk_size) {
+ unlock_hash_function(lck);
+ break;
+ }
+ if (iterations_left <= 0 || MBUF(p)) {
+ /*
+ * We have either reached our limit, or just created some heap fragments.
+ * Since many heap fragments will make the GC slower, trap and GC now.
+ */
+ unlock_hash_function(lck);
+ ret_value = ctx->on_trap(ctx, slot_ix, got, &mpi.mp, ret);
+ goto done;
+ }
+ current_ptr = &BUCKET(tb,slot_ix);
+ }
+ }
- if (arityval(*tptr) != 6)
- RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
-
- if (!is_small(tptr[2]) || !is_small(tptr[3]) || !is_binary(tptr[4]) ||
- !(is_list(tptr[5]) || tptr[5] == NIL) || !is_small(tptr[6]))
- RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
- if ((chunk_size = signed_val(tptr[3])) < 0)
- RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
- if (!(thing_subtag(*binary_val(tptr[4])) == REFC_BINARY_SUBTAG))
- RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
- mp = ((ProcBin *) binary_val(tptr[4]))->val;
- if (!IsMatchProgBinary(mp))
- RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
- all_objects = mp->flags & BIN_FLAG_ALL_OBJECTS;
- match_list = tptr[5];
- if ((got = signed_val(tptr[6])) < 0)
- RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
+ ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, &mpi.mp, ret);
- slot_ix = signed_val(tptr[2]);
- if (slot_ix < 0 /* EOT */
- || (chunk_size && got >= chunk_size)) {
- goto done; /* Already got all or enough in the match_list */
+done:
+ /* We should only jump directly to this label if
+ * we've already called ctx->nothing_can_match / loop_ended / trap
+ */
+ if (mpi.mp != NULL) {
+ erts_bin_free(mpi.mp);
}
-
- lck = RLOCK_HASH(tb,slot_ix);
- if (slot_ix >= NACTIVE(tb)) {
- RUNLOCK_HASH(lck);
- RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
+ if (mpi.lists != mpi.dlists) {
+ erts_free(ERTS_ALC_T_DB_SEL_LIST,
+ (void *) mpi.lists);
}
+ return ret_value;
- while ((current = BUCKET(tb,slot_ix)) == NULL) {
- slot_ix = next_slot(tb, slot_ix, &lck);
- if (slot_ix == 0) {
- slot_ix = -1; /* EOT */
- goto done;
- }
- }
- for(;;) {
- if (current->hvalue != INVALID_HASH &&
- (match_res = db_match_dbterm(&tb->common, p, mp, all_objects,
- &current->dbterm, &hp, 2),
- is_value(match_res))) {
-
- match_list = CONS(hp, match_res, match_list);
- ++got;
- }
+}
- --num_left;
- save_slot_ix = slot_ix;
- if ((current = next(tb, (Uint*)&slot_ix, &lck, current)) == NULL) {
- slot_ix = -1; /* EOT */
- break;
- }
- if (slot_ix != save_slot_ix) {
- if (chunk_size && got >= chunk_size) {
- RUNLOCK_HASH(lck);
- break;
- }
- if (num_left <= 0 || MBUF(p)) {
- /*
- * We have either reached our limit, or just created some heap fragments.
- * Since many heap fragments will make the GC slower, trap and GC now.
- */
- RUNLOCK_HASH(lck);
- goto trap;
- }
- }
+/*
+ * Continue hash table match traversal
+ */
+static int match_traverse_continue(Process* p, DbTableHash* tb,
+ Sint chunk_size, /* If 0, no chunking */
+ Sint iterations_left, /* Nr. of iterations left */
+ Eterm** hpp, /* Heap */
+ Sint slot_ix, /* Slot index to resume traversal from */
+ Sint got, /* Matched terms counter */
+ Binary** mpp, /* Existing match program */
+ int lock_for_write, /* Set to 1 if we're going to delete or
+ modify existing terms */
+ match_callbacks_t* ctx,
+ Eterm* ret)
+{
+ HashDbTerm** current_ptr; /* Refers to either the bucket pointer or
+ * the 'next' pointer in the previous term
+ */
+ HashDbTerm* saved_current; /* Helper to avoid double skip on match */
+ Eterm match_res;
+ erts_rwmtx_t* lck;
+ int ret_value;
+ erts_rwmtx_t* (*lock_hash_function)(DbTableHash*, HashValue)
+ = (lock_for_write ? WLOCK_HASH : RLOCK_HASH);
+ void (*unlock_hash_function)(erts_rwmtx_t*)
+ = (lock_for_write ? WUNLOCK_HASH : RUNLOCK_HASH);
+ Sint (*next_slot_function)(DbTableHash* tb, Uint ix, erts_rwmtx_t** lck_ptr)
+ = (lock_for_write ? next_slot_w : next_slot);
+
+ if (got < 0) {
+ *ret = NIL;
+ return DB_ERROR_BADPARAM;
+ }
+
+ if (slot_ix < 0 /* EOT */
+ || (chunk_size && got >= chunk_size))
+ {
+ /* Already got all or enough in the match_list */
+ ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, mpp, ret);
+ goto done;
}
-done:
- BUMP_REDS(p, 1000 - num_left);
- if (chunk_size) {
- Eterm continuation;
- Eterm rest = NIL;
- Sint rest_size = 0;
-
- if (got > chunk_size) { /* Cannot write destructively here,
- the list may have
- been in user space */
- rest = NIL;
- hp = HAlloc(p, (got - chunk_size) * 2);
- while (got-- > chunk_size) {
- rest = CONS(hp, CAR(list_val(match_list)), rest);
- hp += 2;
- match_list = CDR(list_val(match_list));
- ++rest_size;
- }
- }
- if (rest != NIL || slot_ix >= 0) {
- hp = HAlloc(p,3+7);
- continuation = TUPLE6(hp, tptr[1], make_small(slot_ix),
- tptr[3], tptr[4], rest,
- make_small(rest_size));
- hp += 7;
- RET_TO_BIF(TUPLE2(hp, match_list, continuation),DB_ERROR_NONE);
- } else {
- if (match_list != NIL) {
- hp = HAlloc(p, 3);
- RET_TO_BIF(TUPLE2(hp, match_list, am_EOT),DB_ERROR_NONE);
- } else {
- RET_TO_BIF(am_EOT, DB_ERROR_NONE);
- }
- }
+
+ lck = lock_hash_function(tb, slot_ix);
+ if (slot_ix >= NACTIVE(tb)) { /* Is this possible? */
+ unlock_hash_function(lck);
+ *ret = NIL;
+ ret_value = DB_ERROR_BADPARAM;
+ goto done;
}
- RET_TO_BIF(match_list,DB_ERROR_NONE);
-trap:
- BUMP_ALL_REDS(p);
+ /*
+ * Resume traversal cycle from where we left
+ */
+ current_ptr = &BUCKET(tb,slot_ix);
+ for(;;) {
+ if (*current_ptr != NULL) {
+ if (!is_pseudo_deleted(*current_ptr)) {
+ match_res = db_match_dbterm(&tb->common, p, *mpp,
+ &(*current_ptr)->dbterm, hpp, 2);
+ saved_current = *current_ptr;
+ if (ctx->on_match_res(ctx, slot_ix, &current_ptr, match_res)) {
+ ++got;
+ }
+ --iterations_left;
+ if (*current_ptr != saved_current) {
+ /* Don't advance to next, the callback did it already */
+ continue;
+ }
+ }
+ current_ptr = &((*current_ptr)->next);
+ }
+ else {
+ if ((slot_ix=next_slot_function(tb,slot_ix,&lck)) == 0) {
+ slot_ix = -1;
+ break;
+ }
+ if (chunk_size && got >= chunk_size) {
+ unlock_hash_function(lck);
+ break;
+ }
+ if (iterations_left <= 0 || MBUF(p)) {
+ /*
+ * We have either reached our limit, or just created some heap fragments.
+ * Since many heap fragments will make the GC slower, trap and GC now.
+ */
+ unlock_hash_function(lck);
+ ret_value = ctx->on_trap(ctx, slot_ix, got, mpp, ret);
+ goto done;
+ }
+ current_ptr = &BUCKET(tb,slot_ix);
+ }
+ }
- hp = HAlloc(p,7);
- continuation = TUPLE6(hp, tptr[1], make_small(slot_ix), tptr[3],
- tptr[4], match_list, make_small(got));
- RET_TO_BIF(bif_trap1(&ets_select_continue_exp, p,
- continuation),
- DB_ERROR_NONE);
+ ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, mpp, ret);
-#undef RET_TO_BIF
+done:
+ /* We should only jump directly to this label if
+ * we've already called on_loop_ended / on_trap
+ */
+ return ret_value;
}
-static int db_select_hash(Process *p, DbTable *tbl,
- Eterm pattern, int reverse,
- Eterm *ret)
-{
- return db_select_chunk_hash(p, tbl, pattern, 0, reverse, ret);
-}
-static int db_select_chunk_hash(Process *p, DbTable *tbl,
- Eterm pattern, Sint chunk_size,
- int reverse, /* not used */
- Eterm *ret)
+/*
+ * Common traversal trapping/continuation code;
+ * used by select_count, select_delete and select_replace,
+ * as well as their continuation-handling counterparts.
+ */
+
+static ERTS_INLINE int on_simple_trap(Export* trap_function,
+ Process* p,
+ DbTableHash* tb,
+ Eterm tid,
+ Eterm* prev_continuation_tptr,
+ Sint slot_ix,
+ Sint got,
+ Binary** mpp,
+ Eterm* ret)
{
- DbTableHash *tb = &tbl->hash;
- struct mp_info mpi;
- Sint slot_ix;
- HashDbTerm *current = 0;
- unsigned current_list_pos = 0;
- Eterm match_list;
- Eterm match_res;
- Eterm *hp;
- int num_left = 1000;
- Uint got = 0;
- Eterm continuation;
- int errcode;
+ Eterm* hp;
+ Eterm egot;
Eterm mpb;
- erts_smp_rwmtx_t* lck;
+ Eterm continuation;
+ int is_first_trap = (prev_continuation_tptr == NULL);
+ size_t base_halloc_sz = (is_first_trap ? ERTS_MAGIC_REF_THING_SIZE : 0);
+ BUMP_ALL_REDS(p);
+ if (IS_USMALL(0, got)) {
+ hp = HAllocX(p, base_halloc_sz + 5, ERTS_MAGIC_REF_THING_SIZE);
+ egot = make_small(got);
+ }
+ else {
+ hp = HAllocX(p, base_halloc_sz + BIG_UINT_HEAP_SIZE + 5,
+ ERTS_MAGIC_REF_THING_SIZE);
+ egot = uint_to_big(got, hp);
+ hp += BIG_UINT_HEAP_SIZE;
+ }
-#define RET_TO_BIF(Term,RetVal) do { \
- if (mpi.mp != NULL) { \
- erts_bin_free(mpi.mp); \
- } \
- if (mpi.lists != mpi.dlists) { \
- erts_free(ERTS_ALC_T_DB_SEL_LIST, \
- (void *) mpi.lists); \
- } \
- *ret = (Term); \
- return RetVal; \
- } while(0)
+ if (is_first_trap) {
+ if (is_atom(tid))
+ tid = erts_db_make_tid(p, &tb->common);
+ mpb = erts_db_make_match_prog_ref(p, *mpp, &hp);
+ *mpp = NULL; /* otherwise the caller will destroy it */
+ }
+ else {
+ ASSERT(!is_atom(tid));
+ mpb = prev_continuation_tptr[3];
+ }
+ continuation = TUPLE4(
+ hp,
+ tid,
+ make_small(slot_ix),
+ mpb,
+ egot);
+ ERTS_BIF_PREP_TRAP1(*ret, trap_function, p, continuation);
+ return DB_ERROR_NONE;
+}
- if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) {
- RET_TO_BIF(NIL,errcode);
- }
+static ERTS_INLINE int unpack_simple_continuation(Eterm continuation,
+ Eterm** tptr_ptr,
+ Eterm* tid_ptr,
+ Sint* slot_ix_p,
+ Binary** mpp,
+ Sint* got_p)
+{
+ Eterm* tptr;
+ ASSERT(is_tuple(continuation));
+ tptr = tuple_val(continuation);
+ if (arityval(*tptr) != 4)
+ return 1;
- if (!mpi.something_can_match) {
- if (chunk_size) {
- RET_TO_BIF(am_EOT, DB_ERROR_NONE); /* We're done */
- }
- RET_TO_BIF(NIL, DB_ERROR_NONE);
- /* can't possibly match anything */
+ if (! is_small(tptr[2]) || !(is_big(tptr[4]) || is_small(tptr[4]))) {
+ return 1;
}
- if (!mpi.key_given) {
- /* Run this code if pattern is variable or GETKEY(pattern) */
- /* is a variable */
- slot_ix = 0;
- lck = RLOCK_HASH(tb,slot_ix);
- for (;;) {
- ASSERT(slot_ix < NACTIVE(tb));
- if ((current = BUCKET(tb,slot_ix)) != NULL) {
- break;
- }
- slot_ix = next_slot(tb,slot_ix,&lck);
- if (slot_ix == 0) {
- if (chunk_size) {
- RET_TO_BIF(am_EOT, DB_ERROR_NONE); /* We're done */
- }
- RET_TO_BIF(NIL,DB_ERROR_NONE);
- }
- }
- } else {
- /* We have at least one */
- slot_ix = mpi.lists[current_list_pos].ix;
- lck = RLOCK_HASH(tb, slot_ix);
- current = *(mpi.lists[current_list_pos].bucket);
- ASSERT(current == BUCKET(tb,slot_ix));
- ++current_list_pos;
+ *tptr_ptr = tptr;
+ *tid_ptr = tptr[1];
+ *slot_ix_p = unsigned_val(tptr[2]);
+ *mpp = erts_db_get_match_prog_binary_unchecked(tptr[3]);
+ if (is_big(tptr[4])) {
+ *got_p = big_to_uint32(tptr[4]);
+ }
+ else {
+ *got_p = unsigned_val(tptr[4]);
}
+ return 0;
+}
- match_list = NIL;
- for(;;) {
- if (current != NULL) {
- if (current->hvalue != INVALID_HASH) {
- match_res = db_match_dbterm(&tb->common, p, mpi.mp, 0,
- &current->dbterm, &hp, 2);
- if (is_value(match_res)) {
- match_list = CONS(hp, match_res, match_list);
- ++got;
- }
- }
- current = current->next;
- }
- else if (mpi.key_given) { /* Key is bound */
- RUNLOCK_HASH(lck);
- if (current_list_pos == mpi.num_lists) {
- slot_ix = -1; /* EOT */
- goto done;
- } else {
- slot_ix = mpi.lists[current_list_pos].ix;
- lck = RLOCK_HASH(tb, slot_ix);
- current = *(mpi.lists[current_list_pos].bucket);
- ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix));
- ++current_list_pos;
- }
- }
- else { /* Key is variable */
- --num_left;
+/*
+ *
+ * select / select_chunk match traversal
+ *
+ */
- if ((slot_ix=next_slot(tb,slot_ix,&lck)) == 0) {
- slot_ix = -1;
- break;
- }
- if (chunk_size && got >= chunk_size) {
- RUNLOCK_HASH(lck);
- break;
- }
- if (num_left <= 0 || MBUF(p)) {
- /*
- * We have either reached our limit, or just created some heap fragments.
- * Since many heap fragments will make the GC slower, trap and GC now.
- */
- RUNLOCK_HASH(lck);
- goto trap;
- }
- current = BUCKET(tb,slot_ix);
+#define MAX_SELECT_CHUNK_ITERATIONS 1000
+
+typedef struct {
+ match_callbacks_t base;
+ Process* p;
+ DbTableHash* tb;
+ Eterm tid;
+ Eterm* hp;
+ Sint chunk_size;
+ Eterm match_list;
+ Eterm* prev_continuation_tptr;
+} select_chunk_context_t;
+
+static int select_chunk_on_nothing_can_match(match_callbacks_t* ctx_base, Eterm* ret)
+{
+ select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
+ *ret = (ctx->chunk_size > 0 ? am_EOT : NIL);
+ return DB_ERROR_NONE;
+}
+
+static int select_chunk_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
+ HashDbTerm*** current_ptr_ptr,
+ Eterm match_res)
+{
+ select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
+ if (is_value(match_res)) {
+ ctx->match_list = CONS(ctx->hp, match_res, ctx->match_list);
+ return 1;
+ }
+ return 0;
+}
+
+static int select_chunk_on_loop_ended(match_callbacks_t* ctx_base,
+ Sint slot_ix, Sint got,
+ Sint iterations_left, Binary** mpp,
+ Eterm* ret)
+{
+ select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
+ Eterm mpb;
+
+ if (iterations_left == MAX_SELECT_CHUNK_ITERATIONS) {
+ /* We didn't get to iterate a single time, which means EOT */
+ ASSERT(ctx->match_list == NIL);
+ *ret = (ctx->chunk_size > 0 ? am_EOT : NIL);
+ return DB_ERROR_NONE;
+ }
+ else {
+ ASSERT(iterations_left < MAX_SELECT_CHUNK_ITERATIONS);
+ BUMP_REDS(ctx->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left);
+ if (ctx->chunk_size) {
+ Eterm continuation;
+ Eterm rest = NIL;
+ Sint rest_size = 0;
+
+ if (got > ctx->chunk_size) { /* Split list in return value and 'rest' */
+ Eterm tmp = ctx->match_list;
+ rest = ctx->match_list;
+ while (got-- > ctx->chunk_size + 1) {
+ tmp = CDR(list_val(tmp));
+ ++rest_size;
+ }
+ ++rest_size;
+ ctx->match_list = CDR(list_val(tmp));
+ CDR(list_val(tmp)) = NIL; /* Destructive, the list has never
+ been in 'user space' */
+ }
+ if (rest != NIL || slot_ix >= 0) { /* Need more calls */
+ Eterm tid = ctx->tid;
+ ctx->hp = HAllocX(ctx->p,
+ 3 + 7 + ERTS_MAGIC_REF_THING_SIZE,
+ ERTS_MAGIC_REF_THING_SIZE);
+ mpb = erts_db_make_match_prog_ref(ctx->p, *mpp, &ctx->hp);
+ if (is_atom(tid))
+ tid = erts_db_make_tid(ctx->p,
+ &ctx->tb->common);
+ continuation = TUPLE6(
+ ctx->hp,
+ tid,
+ make_small(slot_ix),
+ make_small(ctx->chunk_size),
+ mpb, rest,
+ make_small(rest_size));
+ *mpp = NULL; /* Otherwise the caller will destroy it */
+ ctx->hp += 7;
+ *ret = TUPLE2(ctx->hp, ctx->match_list, continuation);
+ return DB_ERROR_NONE;
+ } else { /* All data is exhausted */
+ if (ctx->match_list != NIL) { /* No more data to search but still a
+ result to return to the caller */
+ ctx->hp = HAlloc(ctx->p, 3);
+ *ret = TUPLE2(ctx->hp, ctx->match_list, am_EOT);
+ return DB_ERROR_NONE;
+ } else { /* Reached the end of the ttable with no data to return */
+ *ret = am_EOT;
+ return DB_ERROR_NONE;
+ }
+ }
}
+ *ret = ctx->match_list;
+ return DB_ERROR_NONE;
}
-done:
- BUMP_REDS(p, 1000 - num_left);
- if (chunk_size) {
- Eterm continuation;
- Eterm rest = NIL;
- Sint rest_size = 0;
-
- if (mpi.all_objects)
- (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS;
- if (got > chunk_size) { /* Split list in return value and 'rest' */
- Eterm tmp = match_list;
- rest = match_list;
- while (got-- > chunk_size + 1) {
- tmp = CDR(list_val(tmp));
- ++rest_size;
- }
- ++rest_size;
- match_list = CDR(list_val(tmp));
- CDR(list_val(tmp)) = NIL; /* Destructive, the list has never
- been in 'user space' */
- }
- if (rest != NIL || slot_ix >= 0) { /* Need more calls */
- hp = HAlloc(p,3+7+PROC_BIN_SIZE);
- mpb =db_make_mp_binary(p,(mpi.mp),&hp);
- if (mpi.all_objects)
- (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS;
- continuation = TUPLE6(hp, tb->common.id,make_small(slot_ix),
- make_small(chunk_size),
- mpb, rest,
- make_small(rest_size));
- mpi.mp = NULL; /*otherwise the return macro will destroy it */
- hp += 7;
- RET_TO_BIF(TUPLE2(hp, match_list, continuation),DB_ERROR_NONE);
- } else { /* All data is exhausted */
- if (match_list != NIL) { /* No more data to search but still a
- result to return to the caller */
- hp = HAlloc(p, 3);
- RET_TO_BIF(TUPLE2(hp, match_list, am_EOT),DB_ERROR_NONE);
- } else { /* Reached the end of the ttable with no data to return */
- RET_TO_BIF(am_EOT, DB_ERROR_NONE);
- }
- }
+}
+
+static int select_chunk_on_trap(match_callbacks_t* ctx_base,
+ Sint slot_ix, Sint got,
+ Binary** mpp, Eterm* ret)
+{
+ select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
+ Eterm mpb;
+ Eterm continuation;
+ Eterm* hp;
+
+ BUMP_ALL_REDS(ctx->p);
+
+ if (ctx->prev_continuation_tptr == NULL) {
+ Eterm tid = ctx->tid;
+ /* First time we're trapping */
+ hp = HAllocX(ctx->p, 7 + ERTS_MAGIC_REF_THING_SIZE,
+ ERTS_MAGIC_REF_THING_SIZE);
+ if (is_atom(tid))
+ tid = erts_db_make_tid(ctx->p, &ctx->tb->common);
+ mpb = erts_db_make_match_prog_ref(ctx->p, *mpp, &hp);
+ continuation = TUPLE6(
+ hp,
+ tid,
+ make_small(slot_ix),
+ make_small(ctx->chunk_size),
+ mpb,
+ ctx->match_list,
+ make_small(got));
+ *mpp = NULL; /* otherwise the caller will destroy it */
}
- RET_TO_BIF(match_list,DB_ERROR_NONE);
-trap:
- BUMP_ALL_REDS(p);
- if (mpi.all_objects)
- (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS;
- hp = HAlloc(p,7+PROC_BIN_SIZE);
- mpb =db_make_mp_binary(p,(mpi.mp),&hp);
- continuation = TUPLE6(hp, tb->common.id, make_small(slot_ix),
- make_small(chunk_size),
- mpb, match_list,
- make_small(got));
- mpi.mp = NULL; /*otherwise the return macro will destroy it */
- RET_TO_BIF(bif_trap1(&ets_select_continue_exp, p,
- continuation),
- DB_ERROR_NONE);
+ else {
+ /* Not the first time we're trapping; reuse continuation terms */
+ hp = HAlloc(ctx->p, 7);
+ continuation = TUPLE6(
+ hp,
+ ctx->prev_continuation_tptr[1],
+ make_small(slot_ix),
+ ctx->prev_continuation_tptr[3],
+ ctx->prev_continuation_tptr[4],
+ ctx->match_list,
+ make_small(got));
+ }
+ ERTS_BIF_PREP_TRAP1(*ret, &ets_select_continue_exp, ctx->p,
+ continuation);
+ return DB_ERROR_NONE;
+}
-#undef RET_TO_BIF
+static int db_select_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern,
+ int reverse, Eterm *ret)
+{
+ return db_select_chunk_hash(p, tbl, tid, pattern, 0, reverse, ret);
+}
+static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Sint chunk_size,
+ int reverse, Eterm *ret)
+{
+ select_chunk_context_t ctx;
+
+ ctx.base.on_nothing_can_match = select_chunk_on_nothing_can_match;
+ ctx.base.on_match_res = select_chunk_on_match_res;
+ ctx.base.on_loop_ended = select_chunk_on_loop_ended;
+ ctx.base.on_trap = select_chunk_on_trap,
+ ctx.p = p;
+ ctx.tb = &tbl->hash;
+ ctx.tid = tid;
+ ctx.hp = NULL;
+ ctx.chunk_size = chunk_size;
+ ctx.match_list = NIL;
+ ctx.prev_continuation_tptr = NULL;
+
+ return match_traverse(
+ ctx.p, ctx.tb,
+ pattern, NULL,
+ ctx.chunk_size,
+ MAX_SELECT_CHUNK_ITERATIONS,
+ &ctx.hp, 0,
+ &ctx.base, ret);
}
-static int db_select_count_hash(Process *p,
- DbTable *tbl,
- Eterm pattern,
- Eterm *ret)
+/*
+ *
+ * select_continue match traversal
+ *
+ */
+
+static
+int select_chunk_continue_on_loop_ended(match_callbacks_t* ctx_base,
+ Sint slot_ix, Sint got,
+ Sint iterations_left, Binary** mpp,
+ Eterm* ret)
{
- DbTableHash *tb = &tbl->hash;
- struct mp_info mpi;
- Uint slot_ix = 0;
- HashDbTerm* current = NULL;
- unsigned current_list_pos = 0;
- Eterm *hp;
- int num_left = 1000;
- Uint got = 0;
+ select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
Eterm continuation;
- int errcode;
- Eterm egot;
- Eterm mpb;
- erts_smp_rwmtx_t* lck;
+ Eterm rest = NIL;
+ Eterm* hp;
+
+ ASSERT(iterations_left <= MAX_SELECT_CHUNK_ITERATIONS);
+ BUMP_REDS(ctx->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left);
+ if (ctx->chunk_size) {
+ Sint rest_size = 0;
+ if (got > ctx->chunk_size) {
+ /* Cannot write destructively here,
+ the list may have
+ been in user space */
+ hp = HAlloc(ctx->p, (got - ctx->chunk_size) * 2);
+ while (got-- > ctx->chunk_size) {
+ rest = CONS(hp, CAR(list_val(ctx->match_list)), rest);
+ hp += 2;
+ ctx->match_list = CDR(list_val(ctx->match_list));
+ ++rest_size;
+ }
+ }
+ if (rest != NIL || slot_ix >= 0) {
+ hp = HAlloc(ctx->p, 3 + 7);
+ continuation = TUPLE6(
+ hp,
+ ctx->prev_continuation_tptr[1],
+ make_small(slot_ix),
+ ctx->prev_continuation_tptr[3],
+ ctx->prev_continuation_tptr[4],
+ rest,
+ make_small(rest_size));
+ hp += 7;
+ *ret = TUPLE2(hp, ctx->match_list, continuation);
+ return DB_ERROR_NONE;
+ } else {
+ if (ctx->match_list != NIL) {
+ hp = HAlloc(ctx->p, 3);
+ *ret = TUPLE2(hp, ctx->match_list, am_EOT);
+ return DB_ERROR_NONE;
+ } else {
+ *ret = am_EOT;
+ return DB_ERROR_NONE;
+ }
+ }
+ }
+ *ret = ctx->match_list;
+ return DB_ERROR_NONE;
+}
-#define RET_TO_BIF(Term,RetVal) do { \
- if (mpi.mp != NULL) { \
- erts_bin_free(mpi.mp); \
- } \
- if (mpi.lists != mpi.dlists) { \
- erts_free(ERTS_ALC_T_DB_SEL_LIST, \
- (void *) mpi.lists); \
- } \
- *ret = (Term); \
- return RetVal; \
- } while(0)
+/*
+ * This is called when select traps
+ */
+static int db_select_continue_hash(Process* p, DbTable* tbl, Eterm continuation,
+ Eterm* ret)
+{
+ select_chunk_context_t ctx;
+ Eterm* tptr;
+ Eterm tid;
+ Binary* mp;
+ Sint got;
+ Sint slot_ix;
+ Sint chunk_size;
+ Eterm match_list;
+ Sint iterations_left = MAX_SELECT_CHUNK_ITERATIONS;
+ /* Decode continuation. We know it's a tuple but not the arity or anything else */
+ ASSERT(is_tuple(continuation));
+ tptr = tuple_val(continuation);
- if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) {
- RET_TO_BIF(NIL,errcode);
- }
+ if (arityval(*tptr) != 6)
+ goto badparam;
- if (!mpi.something_can_match) {
- RET_TO_BIF(make_small(0), DB_ERROR_NONE);
- /* can't possibly match anything */
- }
+ if (!is_small(tptr[2]) || !is_small(tptr[3]) ||
+ !(is_list(tptr[5]) || tptr[5] == NIL) || !is_small(tptr[6]))
+ goto badparam;
+ if ((chunk_size = signed_val(tptr[3])) < 0)
+ goto badparam;
- if (!mpi.key_given) {
- /* Run this code if pattern is variable or GETKEY(pattern) */
- /* is a variable */
- slot_ix = 0;
- lck = RLOCK_HASH(tb,slot_ix);
- current = BUCKET(tb,slot_ix);
- } else {
- /* We have at least one */
- slot_ix = mpi.lists[current_list_pos].ix;
- lck = RLOCK_HASH(tb, slot_ix);
- current = *(mpi.lists[current_list_pos].bucket);
- ASSERT(current == BUCKET(tb,slot_ix));
- ++current_list_pos;
- }
+ mp = erts_db_get_match_prog_binary(tptr[4]);
+ if (mp == NULL)
+ goto badparam;
- for(;;) {
- if (current != NULL) {
- if (current->hvalue != INVALID_HASH) {
- if (db_match_dbterm(&tb->common, p, mpi.mp, 0,
- &current->dbterm, NULL,0) == am_true) {
- ++got;
- }
- --num_left;
- }
- current = current->next;
- }
- else { /* next bucket */
- if (mpi.key_given) { /* Key is bound */
- RUNLOCK_HASH(lck);
- if (current_list_pos == mpi.num_lists) {
- goto done;
- } else {
- slot_ix = mpi.lists[current_list_pos].ix;
- lck = RLOCK_HASH(tb, slot_ix);
- current = *(mpi.lists[current_list_pos].bucket);
- ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix));
- ++current_list_pos;
- }
- }
- else {
- if ((slot_ix=next_slot(tb,slot_ix,&lck)) == 0) {
- goto done;
- }
- if (num_left <= 0) {
- RUNLOCK_HASH(lck);
- goto trap;
- }
- current = BUCKET(tb,slot_ix);
- }
- }
- }
-done:
- BUMP_REDS(p, 1000 - num_left);
- RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE);
-trap:
- BUMP_ALL_REDS(p);
- if (IS_USMALL(0, got)) {
- hp = HAlloc(p, PROC_BIN_SIZE + 5);
- egot = make_small(got);
- }
- else {
- hp = HAlloc(p, BIG_UINT_HEAP_SIZE + PROC_BIN_SIZE + 5);
- egot = uint_to_big(got, hp);
- hp += BIG_UINT_HEAP_SIZE;
- }
- mpb = db_make_mp_binary(p,mpi.mp,&hp);
- continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix),
- mpb,
- egot);
- mpi.mp = NULL; /*otherwise the return macro will destroy it */
- RET_TO_BIF(bif_trap1(&ets_select_count_continue_exp, p,
- continuation),
- DB_ERROR_NONE);
+ if ((got = signed_val(tptr[6])) < 0)
+ goto badparam;
+
+ tid = tptr[1];
+ slot_ix = signed_val(tptr[2]);
+ match_list = tptr[5];
-#undef RET_TO_BIF
+ /* Proceed */
+ ctx.base.on_match_res = select_chunk_on_match_res;
+ ctx.base.on_loop_ended = select_chunk_continue_on_loop_ended;
+ ctx.base.on_trap = select_chunk_on_trap;
+ ctx.p = p;
+ ctx.tb = &tbl->hash;
+ ctx.tid = tid;
+ ctx.hp = NULL;
+ ctx.chunk_size = chunk_size;
+ ctx.match_list = match_list;
+ ctx.prev_continuation_tptr = tptr;
+
+ return match_traverse_continue(
+ ctx.p, ctx.tb, ctx.chunk_size,
+ iterations_left, &ctx.hp, slot_ix, got, &mp, 0,
+ &ctx.base, ret);
+
+badparam:
+ *ret = NIL;
+ return DB_ERROR_BADPARAM;
}
-static int db_select_delete_hash(Process *p,
- DbTable *tbl,
- Eterm pattern,
- Eterm *ret)
-{
- DbTableHash *tb = &tbl->hash;
- struct mp_info mpi;
- Uint slot_ix = 0;
- HashDbTerm **current = NULL;
- unsigned current_list_pos = 0;
- Eterm *hp;
- int num_left = 1000;
- Uint got = 0;
- Eterm continuation;
- int errcode;
- Uint last_pseudo_delete = (Uint)-1;
- Eterm mpb;
- Eterm egot;
-#ifdef ERTS_SMP
- erts_aint_t fixated_by_me = tb->common.is_thread_safe ? 0 : 1; /* ToDo: something nicer */
-#else
- erts_aint_t fixated_by_me = 0;
-#endif
- erts_smp_rwmtx_t* lck;
+#undef MAX_SELECT_CHUNK_ITERATIONS
-#define RET_TO_BIF(Term,RetVal) do { \
- if (mpi.mp != NULL) { \
- erts_bin_free(mpi.mp); \
- } \
- if (mpi.lists != mpi.dlists) { \
- erts_free(ERTS_ALC_T_DB_SEL_LIST, \
- (void *) mpi.lists); \
- } \
- *ret = (Term); \
- return RetVal; \
- } while(0)
+/*
+ *
+ * select_count match traversal
+ *
+ */
- if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) {
- RET_TO_BIF(NIL,errcode);
- }
+#define MAX_SELECT_COUNT_ITERATIONS 1000
- if (!mpi.something_can_match) {
- RET_TO_BIF(make_small(0), DB_ERROR_NONE);
- /* can't possibly match anything */
- }
+typedef struct {
+ match_callbacks_t base;
+ Process* p;
+ DbTableHash* tb;
+ Eterm tid;
+ Eterm* prev_continuation_tptr;
+} select_count_context_t;
- if (!mpi.key_given) {
- /* Run this code if pattern is variable or GETKEY(pattern) */
- /* is a variable */
- lck = WLOCK_HASH(tb,slot_ix);
- current = &BUCKET(tb,slot_ix);
- } else {
- /* We have at least one */
- slot_ix = mpi.lists[current_list_pos].ix;
- lck = WLOCK_HASH(tb, slot_ix);
- current = mpi.lists[current_list_pos++].bucket;
- ASSERT(*current == BUCKET(tb,slot_ix));
- }
+static int select_count_on_nothing_can_match(match_callbacks_t* ctx_base,
+ Eterm* ret)
+{
+ *ret = make_small(0);
+ return DB_ERROR_NONE;
+}
+static int select_count_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
+ HashDbTerm*** current_ptr_ptr,
+ Eterm match_res)
+{
+ return (match_res == am_true);
+}
- for(;;) {
- if ((*current) == NULL) {
- if (mpi.key_given) { /* Key is bound */
- WUNLOCK_HASH(lck);
- if (current_list_pos == mpi.num_lists) {
- goto done;
- } else {
- slot_ix = mpi.lists[current_list_pos].ix;
- lck = WLOCK_HASH(tb, slot_ix);
- current = mpi.lists[current_list_pos].bucket;
- ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix));
- ++current_list_pos;
- }
- } else {
- if ((slot_ix=next_slot_w(tb,slot_ix,&lck)) == 0) {
- goto done;
- }
- if (num_left <= 0) {
- WUNLOCK_HASH(lck);
- goto trap;
- }
- current = &BUCKET(tb,slot_ix);
- }
- }
- else if ((*current)->hvalue == INVALID_HASH) {
- current = &((*current)->next);
- }
- else {
- int did_erase = 0;
- if (db_match_dbterm(&tb->common, p, mpi.mp, 0,
- &(*current)->dbterm, NULL, 0) == am_true) {
- HashDbTerm *del;
- if (NFIXED(tb) > fixated_by_me) { /* fixated by others? */
- if (slot_ix != last_pseudo_delete) {
- if (!add_fixed_deletion(tb, slot_ix, fixated_by_me))
- goto do_erase;
- last_pseudo_delete = slot_ix;
- }
- (*current)->hvalue = INVALID_HASH;
- } else {
- do_erase:
- del = *current;
- *current = (*current)->next;
- free_term(tb, del);
- did_erase = 1;
- }
- erts_smp_atomic_dec_nob(&tb->common.nitems);
- ++got;
- }
- --num_left;
- if (!did_erase) {
- current = &((*current)->next);
- }
- }
- }
-done:
- BUMP_REDS(p, 1000 - num_left);
- if (got) {
- try_shrink(tb);
- }
- RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE);
-trap:
- BUMP_ALL_REDS(p);
- if (IS_USMALL(0, got)) {
- hp = HAlloc(p, PROC_BIN_SIZE + 5);
- egot = make_small(got);
- }
- else {
- hp = HAlloc(p, BIG_UINT_HEAP_SIZE + PROC_BIN_SIZE + 5);
- egot = uint_to_big(got, hp);
- hp += BIG_UINT_HEAP_SIZE;
+static int select_count_on_loop_ended(match_callbacks_t* ctx_base,
+ Sint slot_ix, Sint got,
+ Sint iterations_left, Binary** mpp,
+ Eterm* ret)
+{
+ select_count_context_t* ctx = (select_count_context_t*) ctx_base;
+ ASSERT(iterations_left <= MAX_SELECT_COUNT_ITERATIONS);
+ BUMP_REDS(ctx->p, MAX_SELECT_COUNT_ITERATIONS - iterations_left);
+ *ret = erts_make_integer(got, ctx->p);
+ return DB_ERROR_NONE;
+}
+
+static int select_count_on_trap(match_callbacks_t* ctx_base,
+ Sint slot_ix, Sint got,
+ Binary** mpp, Eterm* ret)
+{
+ select_count_context_t* ctx = (select_count_context_t*) ctx_base;
+ return on_simple_trap(
+ &ets_select_count_continue_exp,
+ ctx->p,
+ ctx->tb,
+ ctx->tid,
+ ctx->prev_continuation_tptr,
+ slot_ix, got, mpp, ret);
+}
+
+static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret)
+{
+ select_count_context_t ctx;
+ Sint iterations_left = MAX_SELECT_COUNT_ITERATIONS;
+ Sint chunk_size = 0;
+
+ ctx.base.on_nothing_can_match = select_count_on_nothing_can_match;
+ ctx.base.on_match_res = select_count_on_match_res;
+ ctx.base.on_loop_ended = select_count_on_loop_ended;
+ ctx.base.on_trap = select_count_on_trap;
+ ctx.p = p;
+ ctx.tb = &tbl->hash;
+ ctx.tid = tid;
+ ctx.prev_continuation_tptr = NULL;
+
+ return match_traverse(
+ ctx.p, ctx.tb,
+ pattern, NULL,
+ chunk_size, iterations_left, NULL, 0,
+ &ctx.base, ret);
+}
+
+/*
+ * This is called when select_count traps
+ */
+static int db_select_count_continue_hash(Process* p, DbTable* tbl,
+ Eterm continuation, Eterm* ret)
+{
+ select_count_context_t ctx;
+ Eterm* tptr;
+ Eterm tid;
+ Binary* mp;
+ Sint got;
+ Sint slot_ix;
+ Sint chunk_size = 0;
+ *ret = NIL;
+
+ if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
+ *ret = NIL;
+ return DB_ERROR_BADPARAM;
}
- mpb = db_make_mp_binary(p,mpi.mp,&hp);
- continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix),
- mpb,
- egot);
- mpi.mp = NULL; /*otherwise the return macro will destroy it */
- RET_TO_BIF(bif_trap1(&ets_select_delete_continue_exp, p,
- continuation),
- DB_ERROR_NONE);
-#undef RET_TO_BIF
+ ctx.base.on_match_res = select_count_on_match_res;
+ ctx.base.on_loop_ended = select_count_on_loop_ended;
+ ctx.base.on_trap = select_count_on_trap;
+ ctx.p = p;
+ ctx.tb = &tbl->hash;
+ ctx.tid = tid;
+ ctx.prev_continuation_tptr = tptr;
+ return match_traverse_continue(
+ ctx.p, ctx.tb, chunk_size,
+ MAX_SELECT_COUNT_ITERATIONS,
+ NULL, slot_ix, got, &mp, 0,
+ &ctx.base, ret);
}
+
+#undef MAX_SELECT_COUNT_ITERATIONS
+
+
/*
-** This is called when select_delete traps
-*/
-static int db_select_delete_continue_hash(Process *p,
- DbTable *tbl,
- Eterm continuation,
- Eterm *ret)
+ *
+ * select_delete match traversal
+ *
+ */
+
+#define MAX_SELECT_DELETE_ITERATIONS 1000
+
+typedef struct {
+ match_callbacks_t base;
+ Process* p;
+ DbTableHash* tb;
+ Eterm tid;
+ Eterm* prev_continuation_tptr;
+ erts_aint_t fixated_by_me;
+ Uint last_pseudo_delete;
+ HashDbTerm* free_us;
+} select_delete_context_t;
+
+static int select_delete_on_nothing_can_match(match_callbacks_t* ctx_base,
+ Eterm* ret)
{
- DbTableHash *tb = &tbl->hash;
- Uint slot_ix;
- Uint last_pseudo_delete = (Uint)-1;
- HashDbTerm **current = NULL;
- Eterm *hp;
- int num_left = 1000;
- Uint got;
- Eterm *tptr;
- Binary *mp;
- Eterm egot;
- int fixated_by_me = ONLY_WRITER(p,tb) ? 0 : 1; /* ToDo: something nicer */
- erts_smp_rwmtx_t* lck;
+ *ret = make_small(0);
+ return DB_ERROR_NONE;
+}
-#define RET_TO_BIF(Term,RetVal) do { \
- *ret = (Term); \
- return RetVal; \
- } while(0)
+static int select_delete_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
+ HashDbTerm*** current_ptr_ptr,
+ Eterm match_res)
+{
+ HashDbTerm** current_ptr = *current_ptr_ptr;
+ select_delete_context_t* ctx = (select_delete_context_t*) ctx_base;
+ HashDbTerm* del;
+ if (match_res != am_true)
+ return 0;
-
- tptr = tuple_val(continuation);
- slot_ix = unsigned_val(tptr[2]);
- mp = ((ProcBin *) binary_val(tptr[3]))->val;
- if (is_big(tptr[4])) {
- got = big_to_uint32(tptr[4]);
- } else {
- got = unsigned_val(tptr[4]);
+ if (NFIXED(ctx->tb) > ctx->fixated_by_me) { /* fixated by others? */
+ if (slot_ix != ctx->last_pseudo_delete) {
+ if (!add_fixed_deletion(ctx->tb, slot_ix, ctx->fixated_by_me))
+ goto do_erase;
+ ctx->last_pseudo_delete = slot_ix;
+ }
+ (*current_ptr)->pseudo_deleted = 1;
}
-
- lck = WLOCK_HASH(tb,slot_ix);
- if (slot_ix >= NACTIVE(tb)) {
- WUNLOCK_HASH(lck);
- goto done;
- }
- current = &BUCKET(tb,slot_ix);
-
- for(;;) {
- if ((*current) == NULL) {
- if ((slot_ix=next_slot_w(tb,slot_ix,&lck)) == 0) {
- goto done;
- }
- if (num_left <= 0) {
- WUNLOCK_HASH(lck);
- goto trap;
- }
- current = &BUCKET(tb,slot_ix);
- }
- else if ((*current)->hvalue == INVALID_HASH) {
- current = &((*current)->next);
- }
- else {
- int did_erase = 0;
- if (db_match_dbterm(&tb->common, p, mp, 0,
- &(*current)->dbterm, NULL, 0) == am_true) {
- HashDbTerm *del;
- if (NFIXED(tb) > fixated_by_me) { /* fixated by others? */
- if (slot_ix != last_pseudo_delete) {
- if (!add_fixed_deletion(tb, slot_ix, fixated_by_me))
- goto do_erase;
- last_pseudo_delete = slot_ix;
- }
- (*current)->hvalue = INVALID_HASH;
- } else {
- do_erase:
- del = *current;
- *current = (*current)->next;
- free_term(tb, del);
- did_erase = 1;
- }
- erts_smp_atomic_dec_nob(&tb->common.nitems);
- ++got;
- }
-
- --num_left;
- if (!did_erase) {
- current = &((*current)->next);
- }
- }
+ else {
+ do_erase:
+ del = *current_ptr;
+ *current_ptr = (*current_ptr)->next; // replace pointer to term using next
+ del->next = ctx->free_us;
+ ctx->free_us = del;
}
-done:
- BUMP_REDS(p, 1000 - num_left);
+ erts_atomic_dec_nob(&ctx->tb->common.nitems);
+
+ return 1;
+}
+
+static int select_delete_on_loop_ended(match_callbacks_t* ctx_base,
+ Sint slot_ix, Sint got,
+ Sint iterations_left, Binary** mpp,
+ Eterm* ret)
+{
+ select_delete_context_t* ctx = (select_delete_context_t*) ctx_base;
+ free_term_list(ctx->tb, ctx->free_us);
+ ctx->free_us = NULL;
+ ASSERT(iterations_left <= MAX_SELECT_DELETE_ITERATIONS);
+ BUMP_REDS(ctx->p, MAX_SELECT_DELETE_ITERATIONS - iterations_left);
if (got) {
- try_shrink(tb);
- }
- RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE);
-trap:
- BUMP_ALL_REDS(p);
- if (IS_USMALL(0, got)) {
- hp = HAlloc(p, 5);
- egot = make_small(got);
- }
- else {
- hp = HAlloc(p, BIG_UINT_HEAP_SIZE + 5);
- egot = uint_to_big(got, hp);
- hp += BIG_UINT_HEAP_SIZE;
+ try_shrink(ctx->tb);
}
- continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix),
- tptr[3],
- egot);
- RET_TO_BIF(bif_trap1(&ets_select_delete_continue_exp, p,
- continuation),
- DB_ERROR_NONE);
+ *ret = erts_make_integer(got, ctx->p);
+ return DB_ERROR_NONE;
+}
-#undef RET_TO_BIF
+static int select_delete_on_trap(match_callbacks_t* ctx_base,
+ Sint slot_ix, Sint got,
+ Binary** mpp, Eterm* ret)
+{
+ select_delete_context_t* ctx = (select_delete_context_t*) ctx_base;
+ free_term_list(ctx->tb, ctx->free_us);
+ ctx->free_us = NULL;
+ return on_simple_trap(
+ &ets_select_delete_continue_exp,
+ ctx->p,
+ ctx->tb,
+ ctx->tid,
+ ctx->prev_continuation_tptr,
+ slot_ix, got, mpp, ret);
+}
+static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret)
+{
+ select_delete_context_t ctx;
+ Sint chunk_size = 0;
+
+ ctx.base.on_nothing_can_match = select_delete_on_nothing_can_match;
+ ctx.base.on_match_res = select_delete_on_match_res;
+ ctx.base.on_loop_ended = select_delete_on_loop_ended;
+ ctx.base.on_trap = select_delete_on_trap;
+ ctx.p = p;
+ ctx.tb = &tbl->hash;
+ ctx.tid = tid;
+ ctx.prev_continuation_tptr = NULL;
+ ctx.fixated_by_me = ctx.tb->common.is_thread_safe ? 0 : 1; /* TODO: something nicer */
+ ctx.last_pseudo_delete = (Uint) -1;
+ ctx.free_us = NULL;
+
+ return match_traverse(
+ ctx.p, ctx.tb,
+ pattern, NULL,
+ chunk_size,
+ MAX_SELECT_DELETE_ITERATIONS, NULL, 1,
+ &ctx.base, ret);
}
-
+
/*
-** This is called when select_count traps
-*/
-static int db_select_count_continue_hash(Process *p,
- DbTable *tbl,
- Eterm continuation,
- Eterm *ret)
+ * This is called when select_delete traps
+ */
+static int db_select_delete_continue_hash(Process* p, DbTable* tbl,
+ Eterm continuation, Eterm* ret)
{
- DbTableHash *tb = &tbl->hash;
- Uint slot_ix;
- HashDbTerm* current;
- Eterm *hp;
- int num_left = 1000;
- Uint got;
- Eterm *tptr;
- Binary *mp;
- Eterm egot;
- erts_smp_rwmtx_t* lck;
+ select_delete_context_t ctx;
+ Eterm* tptr;
+ Eterm tid;
+ Binary* mp;
+ Sint got;
+ Sint slot_ix;
+ Sint chunk_size = 0;
+
+ if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
+ *ret = NIL;
+ return DB_ERROR_BADPARAM;
+ }
+
+ ctx.base.on_match_res = select_delete_on_match_res;
+ ctx.base.on_loop_ended = select_delete_on_loop_ended;
+ ctx.base.on_trap = select_delete_on_trap;
+ ctx.p = p;
+ ctx.tb = &tbl->hash;
+ ctx.tid = tid;
+ ctx.prev_continuation_tptr = tptr;
+ ctx.fixated_by_me = ONLY_WRITER(p, ctx.tb) ? 0 : 1; /* TODO: something nicer */
+ ctx.last_pseudo_delete = (Uint) -1;
+ ctx.free_us = NULL;
+
+ return match_traverse_continue(
+ ctx.p, ctx.tb, chunk_size,
+ MAX_SELECT_DELETE_ITERATIONS,
+ NULL, slot_ix, got, &mp, 1,
+ &ctx.base, ret);
+}
-#define RET_TO_BIF(Term,RetVal) do { \
- *ret = (Term); \
- return RetVal; \
- } while(0)
+#undef MAX_SELECT_DELETE_ITERATIONS
-
- tptr = tuple_val(continuation);
- slot_ix = unsigned_val(tptr[2]);
- mp = ((ProcBin *) binary_val(tptr[3]))->val;
- if (is_big(tptr[4])) {
- got = big_to_uint32(tptr[4]);
- } else {
- got = unsigned_val(tptr[4]);
- }
-
- lck = RLOCK_HASH(tb, slot_ix);
- if (slot_ix >= NACTIVE(tb)) { /* Is this posible? */
- RUNLOCK_HASH(lck);
- goto done;
- }
- current = BUCKET(tb,slot_ix);
-
- for(;;) {
- if (current != NULL) {
- if (current->hvalue == INVALID_HASH) {
- current = current->next;
- continue;
- }
- if (db_match_dbterm(&tb->common, p, mp, 0, &current->dbterm,
- NULL, 0) == am_true) {
- ++got;
- }
- --num_left;
- current = current->next;
- }
- else { /* next bucket */
- if ((slot_ix = next_slot(tb,slot_ix,&lck)) == 0) {
- goto done;
- }
- if (num_left <= 0) {
- RUNLOCK_HASH(lck);
- goto trap;
- }
- current = BUCKET(tb,slot_ix);
- }
- }
-done:
- BUMP_REDS(p, 1000 - num_left);
- RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE);
-trap:
- BUMP_ALL_REDS(p);
- if (IS_USMALL(0, got)) {
- hp = HAlloc(p, 5);
- egot = make_small(got);
- }
- else {
- hp = HAlloc(p, BIG_UINT_HEAP_SIZE + 5);
- egot = uint_to_big(got, hp);
- hp += BIG_UINT_HEAP_SIZE;
+/*
+ *
+ * select_replace match traversal
+ *
+ */
+
+#define MAX_SELECT_REPLACE_ITERATIONS 1000
+
+typedef struct {
+ match_callbacks_t base;
+ Process* p;
+ DbTableHash* tb;
+ Eterm tid;
+ Eterm* prev_continuation_tptr;
+} select_replace_context_t;
+
+static int select_replace_on_nothing_can_match(match_callbacks_t* ctx_base,
+ Eterm* ret)
+{
+ *ret = make_small(0);
+ return DB_ERROR_NONE;
+}
+
+static int select_replace_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
+ HashDbTerm*** current_ptr_ptr,
+ Eterm match_res)
+{
+ select_replace_context_t* ctx = (select_replace_context_t*) ctx_base;
+ DbTableHash* tb = ctx->tb;
+ HashDbTerm* new;
+ HashDbTerm* next;
+ HashValue hval;
+
+ if (is_value(match_res)) {
+#ifdef DEBUG
+ Eterm key = db_getkey(tb->common.keypos, match_res);
+ ASSERT(is_value(key));
+ ASSERT(eq(key, GETKEY(tb, (**current_ptr_ptr)->dbterm.tpl)));
+#endif
+ next = (**current_ptr_ptr)->next;
+ hval = (**current_ptr_ptr)->hvalue;
+ new = new_dbterm(tb, match_res);
+ new->next = next;
+ new->hvalue = hval;
+ new->pseudo_deleted = 0;
+ free_term(tb, **current_ptr_ptr);
+ **current_ptr_ptr = new; /* replace 'next' pointer in previous object */
+ *current_ptr_ptr = &((**current_ptr_ptr)->next); /* advance to next object */
+ return 1;
}
- continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix),
- tptr[3],
- egot);
- RET_TO_BIF(bif_trap1(&ets_select_count_continue_exp, p,
- continuation),
- DB_ERROR_NONE);
+ return 0;
+}
-#undef RET_TO_BIF
+static int select_replace_on_loop_ended(match_callbacks_t* ctx_base, Sint slot_ix,
+ Sint got, Sint iterations_left,
+ Binary** mpp, Eterm* ret)
+{
+ select_replace_context_t* ctx = (select_replace_context_t*) ctx_base;
+ ASSERT(iterations_left <= MAX_SELECT_REPLACE_ITERATIONS);
+ /* the more objects we've replaced, the more reductions we've consumed */
+ BUMP_REDS(ctx->p,
+ MIN(MAX_SELECT_REPLACE_ITERATIONS * 2,
+ (MAX_SELECT_REPLACE_ITERATIONS - iterations_left) + (int)got));
+ *ret = erts_make_integer(got, ctx->p);
+ return DB_ERROR_NONE;
+}
+static int select_replace_on_trap(match_callbacks_t* ctx_base,
+ Sint slot_ix, Sint got,
+ Binary** mpp, Eterm* ret)
+{
+ select_replace_context_t* ctx = (select_replace_context_t*) ctx_base;
+ return on_simple_trap(
+ &ets_select_replace_continue_exp,
+ ctx->p,
+ ctx->tb,
+ ctx->tid,
+ ctx->prev_continuation_tptr,
+ slot_ix, got, mpp, ret);
}
-
+
+static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret)
+{
+ select_replace_context_t ctx;
+ Sint chunk_size = 0;
+
+ /* Bag implementation presented both semantic consistency and performance issues,
+ * unsupported for now
+ */
+ ASSERT(!(tbl->hash.common.status & DB_BAG));
+
+ ctx.base.on_nothing_can_match = select_replace_on_nothing_can_match;
+ ctx.base.on_match_res = select_replace_on_match_res;
+ ctx.base.on_loop_ended = select_replace_on_loop_ended;
+ ctx.base.on_trap = select_replace_on_trap;
+ ctx.p = p;
+ ctx.tb = &tbl->hash;
+ ctx.tid = tid;
+ ctx.prev_continuation_tptr = NULL;
+
+ return match_traverse(
+ ctx.p, ctx.tb,
+ pattern, db_match_keeps_key,
+ chunk_size,
+ MAX_SELECT_REPLACE_ITERATIONS, NULL, 1,
+ &ctx.base, ret);
+}
+
+/*
+ * This is called when select_replace traps
+ */
+static int db_select_replace_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret)
+{
+ select_replace_context_t ctx;
+ Eterm* tptr;
+ Eterm tid ;
+ Binary* mp;
+ Sint got;
+ Sint slot_ix;
+ Sint chunk_size = 0;
+ *ret = NIL;
+
+ if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
+ *ret = NIL;
+ return DB_ERROR_BADPARAM;
+ }
+
+ /* Proceed */
+ ctx.base.on_match_res = select_replace_on_match_res;
+ ctx.base.on_loop_ended = select_replace_on_loop_ended;
+ ctx.base.on_trap = select_replace_on_trap;
+ ctx.p = p;
+ ctx.tb = &tbl->hash;
+ ctx.tid = tid;
+ ctx.prev_continuation_tptr = tptr;
+
+ return match_traverse_continue(
+ ctx.p, ctx.tb, chunk_size,
+ MAX_SELECT_REPLACE_ITERATIONS,
+ NULL, slot_ix, got, &mp, 1,
+ &ctx.base, ret);
+}
+
+
static int db_take_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
HashDbTerm **bp, *b;
+ HashDbTerm *free_us = NULL;
HashValue hval = MAKE_HASH(key);
- erts_smp_rwmtx_t *lck = WLOCK_HASH(tb, hval);
+ erts_rwmtx_t *lck = WLOCK_HASH(tb, hval);
int ix = hash_to_ix(tb, hval);
int nitems_diff = 0;
@@ -2104,12 +2296,13 @@ static int db_take_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
&& add_fixed_deletion(tb, ix, 0)) {
/* Pseudo remove (no need to keep several of same key) */
bp = &b->next;
- b->hvalue = INVALID_HASH;
+ b->pseudo_deleted = 1;
b = b->next;
} else {
- *bp = b->next;
- free_term(tb, b);
- b = *bp;
+ HashDbTerm* next = b->next;
+ b->next = free_us;
+ free_us = b;
+ b = *bp = next;
}
}
break;
@@ -2117,12 +2310,14 @@ static int db_take_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
}
WUNLOCK_HASH(lck);
if (nitems_diff) {
- erts_smp_atomic_add_nob(&tb->common.nitems, nitems_diff);
+ erts_atomic_add_nob(&tb->common.nitems, nitems_diff);
try_shrink(tb);
}
+ free_term_list(tb, free_us);
return DB_ERROR_NONE;
}
+
/*
** Other interface routines (not directly coupled to one bif)
*/
@@ -2132,30 +2327,56 @@ void db_initialize_hash(void)
}
-int db_mark_all_deleted_hash(DbTable *tbl)
+static SWord db_mark_all_deleted_hash(DbTable *tbl, SWord reds)
{
+ const int LOOPS_PER_REDUCTION = 8;
DbTableHash *tb = &tbl->hash;
- HashDbTerm* list;
+ FixedDeletion* fixdel;
+ SWord loops = reds * LOOPS_PER_REDUCTION;
int i;
- ERTS_SMP_LC_ASSERT(IS_TAB_WLOCKED(tb));
+ ERTS_LC_ASSERT(IS_TAB_WLOCKED(tb));
- for (i = 0; i < NACTIVE(tb); i++) {
- if ((list = BUCKET(tb,i)) != NULL) {
- add_fixed_deletion(tb, i, 0);
- do {
- list->hvalue = INVALID_HASH;
- list = list->next;
- }while(list != NULL);
- }
+ fixdel = (FixedDeletion*) erts_atomic_read_nob(&tb->fixdel);
+ if (fixdel && fixdel->trap) {
+ /* Continue after trap */
+ ASSERT(fixdel->all);
+ ASSERT(fixdel->slot < NACTIVE(tb));
+ i = fixdel->slot;
}
- erts_smp_atomic_set_nob(&tb->common.nitems, 0);
- return DB_ERROR_NONE;
+ else {
+ /* First call */
+ int ok;
+ fixdel = alloc_fixdel(tb);
+ ok = link_fixdel(tb, fixdel, 0);
+ ASSERT(ok); (void)ok;
+ i = 0;
+ }
+
+ do {
+ HashDbTerm* b;
+ for (b = BUCKET(tb,i); b; b = b->next)
+ b->pseudo_deleted = 1;
+ } while (++i < NACTIVE(tb) && --loops > 0);
+
+ if (i < NACTIVE(tb)) {
+ /* Yield */
+ fixdel->slot = i;
+ fixdel->all = 0;
+ fixdel->trap = 1;
+ return -1;
+ }
+
+ fixdel->slot = NACTIVE(tb) - 1;
+ fixdel->all = 1;
+ fixdel->trap = 0;
+ erts_atomic_set_nob(&tb->common.nitems, 0);
+ return loops < 0 ? 0 : loops / LOOPS_PER_REDUCTION;
}
/* Display hash table contents (for dump) */
-static void db_print_hash(int to, void *to_arg, int show, DbTable *tbl)
+static void db_print_hash(fmtfn_t to, void *to_arg, int show, DbTable *tbl)
{
DbTableHash *tb = &tbl->hash;
DbHashStats stats;
@@ -2163,7 +2384,6 @@ static void db_print_hash(int to, void *to_arg, int show, DbTable *tbl)
erts_print(to, to_arg, "Buckets: %d\n", NACTIVE(tb));
-#ifdef ERTS_SMP
i = tbl->common.is_thread_safe;
/* If crash dumping we set table to thread safe in order to
avoid taking any locks */
@@ -2173,9 +2393,6 @@ static void db_print_hash(int to, void *to_arg, int show, DbTable *tbl)
db_calc_stats_hash(&tbl->hash, &stats);
tbl->common.is_thread_safe = i;
-#else
- db_calc_stats_hash(&tbl->hash, &stats);
-#endif
erts_print(to, to_arg, "Chain Length Avg: %f\n", stats.avg_chain_len);
erts_print(to, to_arg, "Chain Length Max: %d\n", stats.max_chain_len);
@@ -2197,7 +2414,7 @@ static void db_print_hash(int to, void *to_arg, int show, DbTable *tbl)
continue;
erts_print(to, to_arg, "%d: [", i);
while(list != 0) {
- if (list->hvalue == INVALID_HASH)
+ if (is_pseudo_deleted(list))
erts_print(to, to_arg, "*");
if (tb->common.compress) {
Eterm key = GETKEY(tb, list->dbterm.tpl);
@@ -2216,50 +2433,42 @@ static void db_print_hash(int to, void *to_arg, int show, DbTable *tbl)
}
}
-/* release all memory occupied by a single table */
-static int db_free_table_hash(DbTable *tbl)
+static int db_free_empty_table_hash(DbTable *tbl)
{
- while (!db_free_table_continue_hash(tbl))
+ ASSERT(NITEMS(tbl) == 0);
+ while (db_free_table_continue_hash(tbl, ERTS_SWORD_MAX) < 0)
;
return 0;
}
-static int db_free_table_continue_hash(DbTable *tbl)
+static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds)
{
DbTableHash *tb = &tbl->hash;
- int done;
- FixedDeletion* fixdel = (FixedDeletion*) erts_smp_atomic_read_acqb(&tb->fixdel);
- ERTS_SMP_LC_ASSERT(IS_TAB_WLOCKED(tb));
+ FixedDeletion* fixdel = (FixedDeletion*) erts_atomic_read_acqb(&tb->fixdel);
+ ERTS_LC_ASSERT(IS_TAB_WLOCKED(tb) || (tb->common.status & DB_DELETE));
- done = 0;
while (fixdel != NULL) {
FixedDeletion *fx = fixdel;
fixdel = fx->next;
- erts_db_free(ERTS_ALC_T_DB_FIX_DEL,
- (DbTable *) tb,
- (void *) fx,
- sizeof(FixedDeletion));
- ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion));
- if (++done >= 2*DELETE_RECORD_LIMIT) {
- erts_smp_atomic_set_relb(&tb->fixdel, (erts_aint_t)fixdel);
- return 0; /* Not done */
+ free_fixdel(tb, fx);
+ if (--reds < 0) {
+ erts_atomic_set_relb(&tb->fixdel, (erts_aint_t)fixdel);
+ return reds; /* Not done */
}
}
- erts_smp_atomic_set_relb(&tb->fixdel, (erts_aint_t)NULL);
+ erts_atomic_set_relb(&tb->fixdel, (erts_aint_t)NULL);
- done /= 2;
while(tb->nslots != 0) {
- free_seg(tb, 1);
+ reds -= EXT_SEGSZ/64 + free_seg(tb, 1);
/*
* If we have done enough work, get out here.
*/
- if (++done >= (DELETE_RECORD_LIMIT / CHAIN_LEN / SEGSZ)) {
- return 0; /* Not done */
+ if (reds < 0) {
+ return reds; /* Not done */
}
}
-#ifdef ERTS_SMP
if (tb->locks != NULL) {
int i;
for (i=0; i<DB_HASH_LOCK_CNT; ++i) {
@@ -2269,9 +2478,8 @@ static int db_free_table_continue_hash(DbTable *tbl)
(void*)tb->locks, sizeof(DbTableHashFineLocks));
tb->locks = NULL;
}
-#endif
- ASSERT(erts_smp_atomic_read_nob(&tb->common.memory_size) == sizeof(DbTable));
- return 1; /* Done */
+ ASSERT(erts_atomic_read_nob(&tb->common.memory_size) == sizeof(DbTable));
+ return reds; /* Done */
}
@@ -2284,7 +2492,8 @@ static int db_free_table_continue_hash(DbTable *tbl)
** slots should be searched. Also compiles the match program
*/
static int analyze_pattern(DbTableHash *tb, Eterm pattern,
- struct mp_info *mpi)
+ extra_match_validator_t extra_validator, /* Optional callback */
+ struct mp_info *mpi)
{
Eterm *ptpl;
Eterm lst, tpl, ttpl;
@@ -2300,7 +2509,6 @@ static int analyze_pattern(DbTableHash *tb, Eterm pattern,
mpi->num_lists = 0;
mpi->key_given = 1;
mpi->something_can_match = 0;
- mpi->all_objects = 1;
mpi->mp = NULL;
for (lst = pattern; is_list(lst); lst = CDR(list_val(lst)))
@@ -2322,7 +2530,10 @@ static int analyze_pattern(DbTableHash *tb, Eterm pattern,
i = 0;
for(lst = pattern; is_list(lst); lst = CDR(list_val(lst))) {
- Eterm body;
+ Eterm match;
+ Eterm guard;
+ Eterm body;
+
ttpl = CAR(list_val(lst));
if (!is_tuple(ttpl)) {
if (buff != sbuff) {
@@ -2337,12 +2548,19 @@ static int analyze_pattern(DbTableHash *tb, Eterm pattern,
}
return DB_ERROR_BADPARAM;
}
- matches[i] = tpl = ptpl[1];
- guards[i] = ptpl[2];
+ matches[i] = match = tpl = ptpl[1];
+ guards[i] = guard = ptpl[2];
bodies[i] = body = ptpl[3];
+
+ if(extra_validator != NULL && !extra_validator(tb->common.keypos, match, guard, body)) {
+ if (buff != sbuff) {
+ erts_free(ERTS_ALC_T_DB_TMP, buff);
+ }
+ return DB_ERROR_BADPARAM;
+ }
+
if (!is_list(body) || CDR(list_val(body)) != NIL ||
CAR(list_val(body)) != am_DollarUnderscore) {
- mpi->all_objects = 0;
}
++i;
if (!(mpi->key_given)) {
@@ -2357,7 +2575,7 @@ static int analyze_pattern(DbTableHash *tb, Eterm pattern,
if (!db_has_variable(key)) { /* Bound key */
int ix, search_slot;
HashDbTerm** bp;
- erts_smp_rwmtx_t* lck;
+ erts_rwmtx_t* lck;
hval = MAKE_HASH(key);
lck = RLOCK_HASH(tb,hval);
ix = hash_to_ix(tb, hval);
@@ -2414,69 +2632,58 @@ static int analyze_pattern(DbTableHash *tb, Eterm pattern,
return DB_ERROR_NONE;
}
-static struct ext_segment* alloc_ext_seg(DbTableHash* tb, unsigned seg_ix,
- struct segment** old_segtab)
+static struct ext_segtab* alloc_ext_segtab(DbTableHash* tb, unsigned seg_ix)
{
- int nsegs;
- struct ext_segment* eseg;
+ struct segment** old_segtab = SEGTAB(tb);
+ int nsegs = 0;
+ struct ext_segtab* est;
+ ASSERT(seg_ix >= NSEG_1);
switch (seg_ix) {
- case 0: nsegs = NSEG_1; break;
- case 1: nsegs = NSEG_2; break;
- default: nsegs = seg_ix + NSEG_INC; break;
- }
- eseg = (struct ext_segment*) erts_db_alloc_fnf(ERTS_ALC_T_DB_SEG,
- (DbTable *) tb,
- SIZEOF_EXTSEG(nsegs));
- ASSERT(eseg != NULL);
- sys_memset(&eseg->s, 0, sizeof(struct segment));
- IF_DEBUG(eseg->s.is_ext_segment = 1);
- eseg->prev_segtab = old_segtab;
- eseg->nsegs = nsegs;
- if (old_segtab) {
- ASSERT(nsegs > tb->nsegs);
- sys_memcpy(eseg->segtab, old_segtab, tb->nsegs*sizeof(struct segment*));
- }
+ case NSEG_1: nsegs = NSEG_2; break;
+ default: nsegs = seg_ix + NSEG_INC; break;
+ }
+ ASSERT(nsegs > tb->nsegs);
+ est = (struct ext_segtab*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
+ (DbTable *) tb,
+ SIZEOF_EXT_SEGTAB(nsegs));
+ est->nsegs = nsegs;
+ est->prev_segtab = old_segtab;
+ est->prev_nsegs = tb->nsegs;
+ sys_memcpy(est->segtab, old_segtab, tb->nsegs*sizeof(struct segment*));
#ifdef DEBUG
- sys_memset(&eseg->segtab[seg_ix], 0, (nsegs-seg_ix)*sizeof(struct segment*));
+ sys_memset(&est->segtab[seg_ix], 0, (nsegs-seg_ix)*sizeof(struct segment*));
#endif
- eseg->segtab[seg_ix] = &eseg->s;
- return eseg;
+ return est;
}
/* Extend table with one new segment
*/
-static int alloc_seg(DbTableHash *tb)
+static void alloc_seg(DbTableHash *tb)
{
- int seg_ix = tb->nslots >> SEGSZ_EXP;
-
- if (seg_ix+1 == tb->nsegs) { /* New segtab needed (extended segment) */
- struct segment** segtab = SEGTAB(tb);
- struct ext_segment* seg = alloc_ext_seg(tb, seg_ix, segtab);
- if (seg == NULL) return 0;
- segtab[seg_ix] = &seg->s;
- /* We don't use the new segtab until next call (see "shrink race") */
- }
- else { /* Just a new plain segment */
- struct segment** segtab;
- if (seg_ix == tb->nsegs) { /* Time to start use segtab from last call */
- struct ext_segment* eseg;
- eseg = (struct ext_segment*) SEGTAB(tb)[seg_ix-1];
- MY_ASSERT(eseg!=NULL && eseg->s.is_ext_segment);
- SET_SEGTAB(tb, eseg->segtab);
- tb->nsegs = eseg->nsegs;
- }
- ASSERT(seg_ix < tb->nsegs);
- segtab = SEGTAB(tb);
- ASSERT(segtab[seg_ix] == NULL);
- segtab[seg_ix] = (struct segment*) erts_db_alloc_fnf(ERTS_ALC_T_DB_SEG,
- (DbTable *) tb,
- sizeof(struct segment));
- if (segtab[seg_ix] == NULL) return 0;
- sys_memset(segtab[seg_ix], 0, sizeof(struct segment));
- }
- tb->nslots += SEGSZ;
- return 1;
+ int seg_ix = SLOT_IX_TO_SEG_IX(tb->nslots);
+ struct segment** segtab;
+
+ ASSERT(seg_ix > 0);
+ if (seg_ix == tb->nsegs) { /* New segtab needed */
+ struct ext_segtab* est = alloc_ext_segtab(tb, seg_ix);
+ SET_SEGTAB(tb, est->segtab);
+ tb->nsegs = est->nsegs;
+ }
+ ASSERT(seg_ix < tb->nsegs);
+ segtab = SEGTAB(tb);
+ segtab[seg_ix] = (struct segment*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
+ (DbTable *) tb,
+ SIZEOF_SEGMENT(EXT_SEGSZ));
+ sys_memset(segtab[seg_ix], 0, SIZEOF_SEGMENT(EXT_SEGSZ));
+ tb->nslots += EXT_SEGSZ;
+}
+
+static void dealloc_ext_segtab(void* lop_data)
+{
+ struct ext_segtab* est = (struct ext_segtab*) lop_data;
+
+ erts_free(ERTS_ALC_T_DB_SEG, est);
}
/* Shrink table by freeing the top segment
@@ -2484,20 +2691,20 @@ static int alloc_seg(DbTableHash *tb)
*/
static int free_seg(DbTableHash *tb, int free_records)
{
- const int seg_ix = (tb->nslots >> SEGSZ_EXP) - 1;
+ const int seg_ix = SLOT_IX_TO_SEG_IX(tb->nslots) - 1;
struct segment** const segtab = SEGTAB(tb);
- struct ext_segment* const top = (struct ext_segment*) segtab[seg_ix];
- int bytes;
+ struct segment* const segp = segtab[seg_ix];
+ Uint seg_sz;
int nrecords = 0;
- ASSERT(top != NULL);
+ ASSERT(segp != NULL);
#ifndef DEBUG
if (free_records)
#endif
{
- int i;
- for (i=0; i<SEGSZ; ++i) {
- HashDbTerm* p = top->s.buckets[i];
+ int i = (seg_ix == 0) ? FIRST_SEGSZ : EXT_SEGSZ;
+ while (i--) {
+ HashDbTerm* p = segp->buckets[i];
while(p != 0) {
HashDbTerm* nxt = p->next;
ASSERT(free_records); /* segment not empty as assumed? */
@@ -2507,55 +2714,44 @@ static int free_seg(DbTableHash *tb, int free_records)
}
}
}
-
- /* The "shrink race":
- * We must avoid deallocating an extended segment while its segtab may
- * still be used by other threads.
- * The trick is to stop use a segtab one call earlier. That is, stop use
- * a segtab when the segment above it is deallocated. When the segtab is
- * later deallocated, it has not been used for a very long time.
- * It is even theoretically safe as we have by then rehashed the entire
- * segment, seizing *all* locks, so there cannot exist any retarded threads
- * still hanging in BUCKET macro with an old segtab pointer.
- * For this to work, we must of course allocate a new segtab one call
- * earlier in alloc_seg() as well. And this is also the reason why
- * the minimum size of the first segtab is 2 and not 1 (NSEG_1).
- */
- if (seg_ix == tb->nsegs-1 || seg_ix==0) { /* Dealloc extended segment */
- MY_ASSERT(top->s.is_ext_segment);
- ASSERT(segtab != top->segtab || seg_ix==0);
- bytes = SIZEOF_EXTSEG(top->nsegs);
- }
- else { /* Dealloc plain segment */
- struct ext_segment* newtop = (struct ext_segment*) segtab[seg_ix-1];
- MY_ASSERT(!top->s.is_ext_segment);
-
- if (segtab == newtop->segtab) { /* New top segment is extended */
- MY_ASSERT(newtop->s.is_ext_segment);
- if (newtop->prev_segtab != NULL) {
- /* Time to use a smaller segtab */
- SET_SEGTAB(tb, newtop->prev_segtab);
- tb->nsegs = seg_ix;
- ASSERT(tb->nsegs == EXTSEG(SEGTAB(tb))->nsegs);
- }
- else {
- ASSERT(NSEG_1 > 2 && seg_ix==1);
- }
- }
- bytes = sizeof(struct segment);
+ if (seg_ix >= NSEG_1) {
+ struct ext_segtab* est = ErtsContainerStruct_(segtab,struct ext_segtab,segtab);
+
+ if (seg_ix == est->prev_nsegs) { /* Dealloc extended segtab */
+ ASSERT(est->prev_segtab != NULL);
+ SET_SEGTAB(tb, est->prev_segtab);
+ tb->nsegs = est->prev_nsegs;
+
+ if (!tb->common.is_thread_safe) {
+ /*
+ * Table is doing a graceful shrink operation and we must avoid
+ * deallocating this segtab while it may still be read by other
+ * threads. Schedule deallocation with thread progress to make
+ * sure no lingering threads are still hanging in BUCKET macro
+ * with an old segtab pointer.
+ */
+ Uint sz = SIZEOF_EXT_SEGTAB(est->nsegs);
+ ASSERT(sz == ERTS_ALC_DBG_BLK_SZ(est));
+ ERTS_DB_ALC_MEM_UPDATE_(tb, sz, 0);
+ erts_schedule_thr_prgr_later_cleanup_op(dealloc_ext_segtab,
+ est,
+ &est->lop,
+ sz);
+ }
+ else
+ erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable*)tb, est,
+ SIZEOF_EXT_SEGTAB(est->nsegs));
+ }
}
+ seg_sz = (seg_ix == 0) ? FIRST_SEGSZ : EXT_SEGSZ;
+ erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb, segp, SIZEOF_SEGMENT(seg_sz));
- erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb,
- (void*)top, bytes);
#ifdef DEBUG
- if (seg_ix > 0) {
- segtab[seg_ix] = NULL;
- } else {
- SET_SEGTAB(tb, NULL);
- }
+ if (seg_ix < tb->nsegs)
+ SEGTAB(tb)[seg_ix] = NULL;
#endif
- tb->nslots -= SEGSZ;
+ tb->nslots -= seg_sz;
ASSERT(tb->nslots >= 0);
return nrecords;
}
@@ -2578,7 +2774,7 @@ static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2,
if (!sz) {
ptr = ptr1;
while(ptr != ptr2) {
- if (ptr->hvalue != INVALID_HASH)
+ if (!is_pseudo_deleted(ptr))
sz += ptr->dbterm.size + 2;
ptr = ptr->next;
}
@@ -2589,7 +2785,7 @@ static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2,
ptr = ptr1;
while(ptr != ptr2) {
- if (ptr->hvalue != INVALID_HASH) {
+ if (!is_pseudo_deleted(ptr)) {
copy = db_copy_object_from_ets(&tb->common, &ptr->dbterm, &hp, &MSO(p));
list = CONS(hp, copy, list);
hp += 2;
@@ -2605,103 +2801,106 @@ static ERTS_INLINE int
begin_resizing(DbTableHash* tb)
{
if (DB_USING_FINE_LOCKING(tb))
- return !erts_smp_atomic_xchg_acqb(&tb->is_resizing, 1);
- else {
- if (erts_smp_atomic_read_nob(&tb->is_resizing))
- return 0;
- erts_smp_atomic_set_nob(&tb->is_resizing, 1);
- return 1;
- }
+ return !erts_atomic_xchg_acqb(&tb->is_resizing, 1);
+ else
+ ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock));
+ return 1;
}
static ERTS_INLINE void
done_resizing(DbTableHash* tb)
{
if (DB_USING_FINE_LOCKING(tb))
- erts_smp_atomic_set_relb(&tb->is_resizing, 0);
- else
- erts_smp_atomic_set_nob(&tb->is_resizing, 0);
+ erts_atomic_set_relb(&tb->is_resizing, 0);
}
-/* Grow table with one new bucket.
+/* Grow table with one or more new buckets.
** Allocate new segment if needed.
*/
-static void grow(DbTableHash* tb, int nactive)
+static void grow(DbTableHash* tb, int nitems)
{
HashDbTerm** pnext;
HashDbTerm** to_pnext;
HashDbTerm* p;
- erts_smp_rwmtx_t* lck;
- int from_ix;
+ erts_rwmtx_t* lck;
+ int nactive;
+ int from_ix, to_ix;
int szm;
+ int loop_limit = 5;
- if (!begin_resizing(tb))
- return; /* already in progress */
- if (NACTIVE(tb) != nactive) {
- goto abort; /* already done (race) */
- }
-
- /* Ensure that the slot nactive exists */
- if (nactive == tb->nslots) {
- /* Time to get a new segment */
- ASSERT((nactive & SEGSZ_MASK) == 0);
- if (!alloc_seg(tb)) goto abort;
- }
- ASSERT(nactive < tb->nslots);
+ do {
+ if (!begin_resizing(tb))
+ return; /* already in progress */
+ nactive = NACTIVE(tb);
+ if (nitems <= GROW_LIMIT(nactive)) {
+ goto abort; /* already done (race) */
+ }
- szm = erts_smp_atomic_read_nob(&tb->szm);
- if (nactive <= szm) {
- from_ix = nactive & (szm >> 1);
- } else {
- ASSERT(nactive == szm+1);
- from_ix = 0;
- szm = (szm<<1) | 1;
- }
+ /* Ensure that the slot nactive exists */
+ if (nactive == tb->nslots) {
+ /* Time to get a new segment */
+ ASSERT(((nactive-FIRST_SEGSZ) & EXT_SEGSZ_MASK) == 0);
+ alloc_seg(tb);
+ }
+ ASSERT(nactive < tb->nslots);
- lck = WLOCK_HASH(tb, from_ix);
- /* Now a final double check (with the from_ix lock held)
- * that we did not get raced by a table fixer.
- */
- if (IS_FIXED(tb)) {
- WUNLOCK_HASH(lck);
- goto abort;
- }
- erts_smp_atomic_inc_nob(&tb->nactive);
- if (from_ix == 0) {
- if (DB_USING_FINE_LOCKING(tb))
- erts_smp_atomic_set_relb(&tb->szm, szm);
- else
- erts_smp_atomic_set_nob(&tb->szm, szm);
- }
- done_resizing(tb);
+ szm = erts_atomic_read_nob(&tb->szm);
+ if (nactive <= szm) {
+ from_ix = nactive & (szm >> 1);
+ } else {
+ ASSERT(nactive == szm+1);
+ from_ix = 0;
+ szm = (szm<<1) | 1;
+ }
+ to_ix = nactive;
+
+ lck = WLOCK_HASH(tb, from_ix);
+ ERTS_ASSERT(lck == GET_LOCK_MAYBE(tb,to_ix));
+ /* Now a final double check (with the from_ix lock held)
+ * that we did not get raced by a table fixer.
+ */
+ if (IS_FIXED(tb)) {
+ WUNLOCK_HASH(lck);
+ goto abort;
+ }
+ erts_atomic_set_nob(&tb->nactive, ++nactive);
+ if (from_ix == 0) {
+ if (DB_USING_FINE_LOCKING(tb))
+ erts_atomic_set_relb(&tb->szm, szm);
+ else
+ erts_atomic_set_nob(&tb->szm, szm);
+ }
+ done_resizing(tb);
+
+ /* Finally, let's split the bucket. We try to do it in a smart way
+ to keep link order and avoid unnecessary updates of next-pointers */
+ pnext = &BUCKET(tb, from_ix);
+ p = *pnext;
+ to_pnext = &BUCKET(tb, to_ix);
+ while (p != NULL) {
+ if (is_pseudo_deleted(p)) { /* rare but possible with fine locking */
+ *pnext = p->next;
+ free_term(tb, p);
+ p = *pnext;
+ }
+ else {
+ int ix = p->hvalue & szm;
+ if (ix != from_ix) {
+ ASSERT(ix == (from_ix ^ ((szm+1)>>1)));
+ *to_pnext = p;
+ /* Swap "from" and "to": */
+ from_ix = ix;
+ to_pnext = pnext;
+ }
+ pnext = &p->next;
+ p = *pnext;
+ }
+ }
+ *to_pnext = NULL;
+ WUNLOCK_HASH(lck);
- /* Finally, let's split the bucket. We try to do it in a smart way
- to keep link order and avoid unnecessary updates of next-pointers */
- pnext = &BUCKET(tb, from_ix);
- p = *pnext;
- to_pnext = &BUCKET(tb, nactive);
- while (p != NULL) {
- if (p->hvalue == INVALID_HASH) { /* rare but possible with fine locking */
- *pnext = p->next;
- free_term(tb, p);
- p = *pnext;
- }
- else {
- int ix = p->hvalue & szm;
- if (ix != from_ix) {
- ASSERT(ix == (from_ix ^ ((szm+1)>>1)));
- *to_pnext = p;
- /* Swap "from" and "to": */
- from_ix = ix;
- to_pnext = pnext;
- }
- pnext = &p->next;
- p = *pnext;
- }
- }
- *to_pnext = NULL;
+ }while (--loop_limit && nitems > GROW_LIMIT(nactive));
- WUNLOCK_HASH(lck);
return;
abort:
@@ -2712,60 +2911,78 @@ abort:
/* Shrink table by joining top bucket.
** Remove top segment if it gets empty.
*/
-static void shrink(DbTableHash* tb, int nactive)
-{
- if (!begin_resizing(tb))
- return; /* already in progress */
- if (NACTIVE(tb) == nactive) {
- erts_smp_rwmtx_t* lck;
- int src_ix = nactive - 1;
- int low_szm = erts_smp_atomic_read_nob(&tb->szm) >> 1;
- int dst_ix = src_ix & low_szm;
-
- ASSERT(dst_ix < src_ix);
- ASSERT(nactive > SEGSZ);
- lck = WLOCK_HASH(tb, dst_ix);
- /* Double check for racing table fixers */
- if (!IS_FIXED(tb)) {
- HashDbTerm** src_bp = &BUCKET(tb, src_ix);
- HashDbTerm** dst_bp = &BUCKET(tb, dst_ix);
- HashDbTerm** bp = src_bp;
-
- /* Q: Why join lists by appending "dst" at the end of "src"?
- A: Must step through "src" anyway to purge pseudo deleted. */
- while(*bp != NULL) {
- if ((*bp)->hvalue == INVALID_HASH) {
- HashDbTerm* deleted = *bp;
- *bp = deleted->next;
- free_term(tb, deleted);
- } else {
- bp = &(*bp)->next;
- }
- }
- *bp = *dst_bp;
- *dst_bp = *src_bp;
- *src_bp = NULL;
-
- erts_smp_atomic_set_nob(&tb->nactive, src_ix);
- if (dst_ix == 0) {
- erts_smp_atomic_set_relb(&tb->szm, low_szm);
- }
- WUNLOCK_HASH(lck);
-
- if (tb->nslots - src_ix >= SEGSZ) {
- free_seg(tb, 0);
- }
- }
- else {
- WUNLOCK_HASH(lck);
- }
+static void shrink(DbTableHash* tb, int nitems)
+{
+ HashDbTerm** src_bp;
+ HashDbTerm** dst_bp;
+ HashDbTerm** bp;
+ erts_rwmtx_t* lck;
+ int src_ix, dst_ix, low_szm;
+ int nactive;
+ int loop_limit = 5;
- }
- /*else already done */
+ do {
+ if (!begin_resizing(tb))
+ return; /* already in progress */
+ nactive = NACTIVE(tb);
+ if (!(nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive))) {
+ goto abort; /* already done (race) */
+ }
+ src_ix = nactive - 1;
+ low_szm = erts_atomic_read_nob(&tb->szm) >> 1;
+ dst_ix = src_ix & low_szm;
+
+ ASSERT(dst_ix < src_ix);
+ ASSERT(nactive > FIRST_SEGSZ);
+ lck = WLOCK_HASH(tb, dst_ix);
+ ERTS_ASSERT(lck == GET_LOCK_MAYBE(tb,src_ix));
+ /* Double check for racing table fixers */
+ if (IS_FIXED(tb)) {
+ WUNLOCK_HASH(lck);
+ goto abort;
+ }
+
+ src_bp = &BUCKET(tb, src_ix);
+ dst_bp = &BUCKET(tb, dst_ix);
+ bp = src_bp;
+
+ /*
+ * We join lists by appending "dst" at the end of "src"
+ * as we must step through "src" anyway to purge pseudo deleted.
+ */
+ while(*bp != NULL) {
+ if (is_pseudo_deleted(*bp)) {
+ HashDbTerm* deleted = *bp;
+ *bp = deleted->next;
+ free_term(tb, deleted);
+ } else {
+ bp = &(*bp)->next;
+ }
+ }
+ *bp = *dst_bp;
+ *dst_bp = *src_bp;
+ *src_bp = NULL;
+
+ nactive = src_ix;
+ erts_atomic_set_nob(&tb->nactive, nactive);
+ if (dst_ix == 0) {
+ erts_atomic_set_relb(&tb->szm, low_szm);
+ }
+ WUNLOCK_HASH(lck);
+
+ if (tb->nslots - src_ix >= EXT_SEGSZ) {
+ free_seg(tb, 0);
+ }
+ done_resizing(tb);
+
+ } while (--loop_limit
+ && nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive));
+ return;
+
+abort:
done_resizing(tb);
}
-
/* Search a list of tuples for a matching key */
static HashDbTerm* search_list(DbTableHash* tb, Eterm key,
@@ -2784,15 +3001,15 @@ static HashDbTerm* search_list(DbTableHash* tb, Eterm key,
/* It return the next live object in a table, NULL if no more */
/* In-bucket: RLOCKED */
/* Out-bucket: RLOCKED unless NULL */
-static HashDbTerm* next(DbTableHash *tb, Uint *iptr, erts_smp_rwmtx_t** lck_ptr,
- HashDbTerm *list)
+static HashDbTerm* next_live(DbTableHash *tb, Uint *iptr, erts_rwmtx_t** lck_ptr,
+ HashDbTerm *list)
{
int i;
- ERTS_SMP_LC_ASSERT(IS_HASH_RLOCKED(tb,*iptr));
+ ERTS_LC_ASSERT(IS_HASH_RLOCKED(tb,*iptr));
- for (list = list->next; list != NULL; list = list->next) {
- if (list->hvalue != INVALID_HASH)
+ for ( ; list != NULL; list = list->next) {
+ if (!is_pseudo_deleted(list))
return list;
}
@@ -2801,7 +3018,7 @@ static HashDbTerm* next(DbTableHash *tb, Uint *iptr, erts_smp_rwmtx_t** lck_ptr,
list = BUCKET(tb,i);
while (list != NULL) {
- if (list->hvalue != INVALID_HASH) {
+ if (!is_pseudo_deleted(list)) {
*iptr = i;
return list;
}
@@ -2819,7 +3036,7 @@ db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj,
DbTableHash *tb = &tbl->hash;
HashValue hval;
HashDbTerm **bp, *b;
- erts_smp_rwmtx_t* lck;
+ erts_rwmtx_t* lck;
int flags = 0;
ASSERT(tb->common.status & DB_SET);
@@ -2834,7 +3051,7 @@ db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj,
break;
}
if (has_key(tb, b, key, hval)) {
- if (b->hvalue != INVALID_HASH) {
+ if (!is_pseudo_deleted(b)) {
goto Ldone;
}
break;
@@ -2864,18 +3081,20 @@ db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj,
HashDbTerm *q = new_dbterm(tb, obj);
q->hvalue = hval;
+ q->pseudo_deleted = 0;
q->next = NULL;
*bp = b = q;
flags |= DB_INC_TRY_GROW;
} else {
HashDbTerm *q, *next = b->next;
- ASSERT(b->hvalue == INVALID_HASH);
+ ASSERT(is_pseudo_deleted(b));
q = replace_dbterm(tb, b, obj);
q->next = next;
- q->hvalue = hval;
+ ASSERT(q->hvalue == hval);
+ q->pseudo_deleted = 0;
*bp = b = q;
- erts_smp_atomic_inc_nob(&tb->common.nitems);
+ erts_atomic_inc_nob(&tb->common.nitems);
}
HRelease(p, hend, htop);
@@ -2901,24 +3120,24 @@ db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle)
DbTableHash *tb = &tbl->hash;
HashDbTerm **bp = (HashDbTerm **) handle->bp;
HashDbTerm *b = *bp;
- erts_smp_rwmtx_t* lck = (erts_smp_rwmtx_t*) handle->lck;
+ erts_rwmtx_t* lck = (erts_rwmtx_t*) handle->lck;
HashDbTerm* free_me = NULL;
- ERTS_SMP_LC_ASSERT(IS_HASH_WLOCKED(tb, lck)); /* locked by db_lookup_dbterm_hash */
+ ERTS_LC_ASSERT(IS_HASH_WLOCKED(tb, lck)); /* locked by db_lookup_dbterm_hash */
ASSERT((&b->dbterm == handle->dbterm) == !(tb->common.compress && handle->flags & DB_MUST_RESIZE));
if (handle->flags & DB_NEW_OBJECT && cret != DB_ERROR_NONE) {
if (IS_FIXED(tb) && add_fixed_deletion(tb, hash_to_ix(tb, b->hvalue),
0)) {
- b->hvalue = INVALID_HASH;
+ b->pseudo_deleted = 1;
} else {
*bp = b->next;
free_me = b;
}
WUNLOCK_HASH(lck);
- erts_smp_atomic_dec_nob(&tb->common.nitems);
+ erts_atomic_dec_nob(&tb->common.nitems);
try_shrink(tb);
} else {
if (handle->flags & DB_MUST_RESIZE) {
@@ -2927,12 +3146,12 @@ db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle)
}
if (handle->flags & DB_INC_TRY_GROW) {
int nactive;
- int nitems = erts_smp_atomic_inc_read_nob(&tb->common.nitems);
+ int nitems = erts_atomic_inc_read_nob(&tb->common.nitems);
WUNLOCK_HASH(lck);
nactive = NACTIVE(tb);
- if (nitems > nactive * (CHAIN_LEN + 1) && !IS_FIXED(tb)) {
- grow(tb, nactive);
+ if (nitems > GROW_LIMIT(nactive) && !IS_FIXED(tb)) {
+ grow(tb, nitems);
}
} else {
WUNLOCK_HASH(lck);
@@ -2948,16 +3167,19 @@ db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle)
return;
}
-static int db_delete_all_objects_hash(Process* p, DbTable* tbl)
+static SWord db_delete_all_objects_hash(Process* p, DbTable* tbl, SWord reds)
{
if (IS_FIXED(tbl)) {
- db_mark_all_deleted_hash(tbl);
+ reds = db_mark_all_deleted_hash(tbl, reds);
} else {
- db_free_table_hash(tbl);
+ reds = db_free_table_continue_hash(tbl, reds);
+ if (reds < 0)
+ return reds;
+
db_create_hash(p, tbl);
- erts_smp_atomic_set_nob(&tbl->hash.common.nitems, 0);
+ erts_atomic_set_nob(&tbl->hash.common.nitems, 0);
}
- return 0;
+ return reds;
}
void db_foreach_offheap_hash(DbTable *tbl,
@@ -2985,7 +3207,7 @@ void db_foreach_offheap_hash(DbTable *tbl,
void db_calc_stats_hash(DbTableHash* tb, DbHashStats* stats)
{
HashDbTerm* b;
- erts_smp_rwmtx_t* lck;
+ erts_rwmtx_t* lck;
int sum = 0;
int sq_sum = 0;
int kept_items = 0;
@@ -3000,7 +3222,7 @@ void db_calc_stats_hash(DbTableHash* tb, DbHashStats* stats)
len = 0;
for (b = BUCKET(tb,ix); b!=NULL; b=b->next) {
len++;
- if (b->hvalue == INVALID_HASH)
+ if (is_pseudo_deleted(b))
++kept_items;
}
sum += len;
@@ -3016,24 +3238,30 @@ void db_calc_stats_hash(DbTableHash* tb, DbHashStats* stats)
stats->std_dev_expected = sqrt(stats->avg_chain_len * (1 - 1.0/NACTIVE(tb)));
stats->kept_items = kept_items;
}
-#ifdef HARDDEBUG
-void db_check_table_hash(DbTable *tbl)
+/* For testing only */
+Eterm erts_ets_hash_sizeof_ext_segtab(void)
{
- DbTableHash *tb = &tbl->hash;
- HashDbTerm* list;
- int j;
-
- for (j = 0; j < tb->nactive; j++) {
- if ((list = BUCKET(tb,j)) != 0) {
- while (list != 0) {
- if (!is_tuple(make_tuple(list->dbterm.tpl))) {
- erts_exit(ERTS_ERROR_EXIT, "Bad term in slot %d of ets table", j);
- }
- list = list->next;
- }
- }
- }
+ return make_small(((SIZEOF_EXT_SEGTAB(0)-1) / sizeof(UWord)) + 1);
}
-#endif
+#ifdef ERTS_ENABLE_LOCK_COUNT
+void erts_lcnt_enable_db_hash_lock_count(DbTableHash *tb, int enable) {
+ int i;
+
+ if(tb->locks == NULL) {
+ return;
+ }
+
+ for(i = 0; i < DB_HASH_LOCK_CNT; i++) {
+ erts_lcnt_ref_t *ref = &tb->locks->lck_vec[i].lck.lcnt;
+
+ if(enable) {
+ erts_lcnt_install_new_lock_info(ref, "db_hash_slot", tb->common.the_name,
+ ERTS_LOCK_TYPE_RWMUTEX | ERTS_LOCK_FLAGS_CATEGORY_DB);
+ } else {
+ erts_lcnt_uninstall(ref);
+ }
+ }
+}
+#endif /* ERTS_ENABLE_LOCK_COUNT */