diff options
Diffstat (limited to 'erts/emulator/beam/erl_db_hash.c')
-rw-r--r-- | erts/emulator/beam/erl_db_hash.c | 2783 |
1 files changed, 1457 insertions, 1326 deletions
diff --git a/erts/emulator/beam/erl_db_hash.c b/erts/emulator/beam/erl_db_hash.c index 390369fdb9..cb5c496e90 100644 --- a/erts/emulator/beam/erl_db_hash.c +++ b/erts/emulator/beam/erl_db_hash.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 1998-2016. All Rights Reserved. + * Copyright Ericsson AB 1998-2017. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -84,46 +84,47 @@ #include "erl_db_hash.h" -#ifdef MYDEBUG /* Will fail test case ets_SUITE:memory */ -# define IF_DEBUG(x) x -# define MY_ASSERT(x) ASSERT(x) -#else -# define IF_DEBUG(x) -# define MY_ASSERT(x) -#endif - /* * The following symbols can be manipulated to "tune" the linear hash array */ -#define CHAIN_LEN 6 /* Medium bucket chain len */ +#define GROW_LIMIT(NACTIVE) ((NACTIVE)*1) +#define SHRINK_LIMIT(NACTIVE) ((NACTIVE) / 2) + +/* +** We want the first mandatory segment to be small (to reduce minimal footprint) +** and larger extra segments (to reduce number of alloc/free calls). +*/ -/* Number of slots per segment */ -#define SEGSZ_EXP 8 -#define SEGSZ (1 << SEGSZ_EXP) -#define SEGSZ_MASK (SEGSZ-1) +/* Number of slots in first segment */ +#define FIRST_SEGSZ_EXP 8 +#define FIRST_SEGSZ (1 << FIRST_SEGSZ_EXP) +#define FIRST_SEGSZ_MASK (FIRST_SEGSZ - 1) -#define NSEG_1 2 /* Size of first segment table (must be at least 2) */ +/* Number of slots per extra segment */ +#define EXT_SEGSZ_EXP 11 +#define EXT_SEGSZ (1 << EXT_SEGSZ_EXP) +#define EXT_SEGSZ_MASK (EXT_SEGSZ-1) + +#define NSEG_1 (ErtsSizeofMember(DbTableHash,first_segtab) / sizeof(struct segment*)) #define NSEG_2 256 /* Size of second segment table */ #define NSEG_INC 128 /* Number of segments to grow after that */ -#ifdef ERTS_SMP # define DB_USING_FINE_LOCKING(TB) (((TB))->common.type & DB_FINE_LOCKED) -#else -# define DB_USING_FINE_LOCKING(TB) 0 -#endif #ifdef ETHR_ORDERED_READ_DEPEND -#define SEGTAB(tb) ((struct segment**) erts_smp_atomic_read_nob(&(tb)->segtab)) +#define SEGTAB(tb) ((struct segment**) erts_atomic_read_nob(&(tb)->segtab)) #else #define SEGTAB(tb) \ (DB_USING_FINE_LOCKING(tb) \ - ? ((struct segment**) erts_smp_atomic_read_ddrb(&(tb)->segtab)) \ - : ((struct segment**) erts_smp_atomic_read_nob(&(tb)->segtab))) + ? ((struct segment**) erts_atomic_read_ddrb(&(tb)->segtab)) \ + : ((struct segment**) erts_atomic_read_nob(&(tb)->segtab))) #endif -#define NACTIVE(tb) ((int)erts_smp_atomic_read_nob(&(tb)->nactive)) -#define NITEMS(tb) ((int)erts_smp_atomic_read_nob(&(tb)->common.nitems)) +#define NACTIVE(tb) ((int)erts_atomic_read_nob(&(tb)->nactive)) +#define NITEMS(tb) ((int)erts_atomic_read_nob(&(tb)->common.nitems)) -#define BUCKET(tb, i) SEGTAB(tb)[(i) >> SEGSZ_EXP]->buckets[(i) & SEGSZ_MASK] +#define SLOT_IX_TO_SEG_IX(i) (((i)+(EXT_SEGSZ-FIRST_SEGSZ)) >> EXT_SEGSZ_EXP) + +#define BUCKET(tb, i) SEGTAB(tb)[SLOT_IX_TO_SEG_IX(i)]->buckets[(i) & EXT_SEGSZ_MASK] /* * When deleting a table, the number of records to delete. @@ -137,12 +138,12 @@ static ERTS_INLINE Uint hash_to_ix(DbTableHash* tb, HashValue hval) { Uint mask = (DB_USING_FINE_LOCKING(tb) - ? erts_smp_atomic_read_acqb(&tb->szm) - : erts_smp_atomic_read_nob(&tb->szm)); + ? erts_atomic_read_acqb(&tb->szm) + : erts_atomic_read_nob(&tb->szm)); Uint ix = hval & mask; - if (ix >= erts_smp_atomic_read_nob(&tb->nactive)) { + if (ix >= erts_atomic_read_nob(&tb->nactive)) { ix &= mask>>1; - ASSERT(ix < erts_smp_atomic_read_nob(&tb->nactive)); + ASSERT(ix < erts_atomic_read_nob(&tb->nactive)); } return ix; } @@ -161,7 +162,7 @@ static ERTS_INLINE int add_fixed_deletion(DbTableHash* tb, int ix, sizeof(FixedDeletion)); ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion)); fixd->slot = ix; - was_next = erts_smp_atomic_read_acqb(&tb->fixdel); + was_next = erts_atomic_read_acqb(&tb->fixdel); do { /* Lockless atomic insertion in linked list: */ if (NFIXED(tb) <= fixated_by_me) { erts_db_free(ERTS_ALC_T_DB_FIX_DEL, (DbTable*)tb, @@ -170,7 +171,7 @@ static ERTS_INLINE int add_fixed_deletion(DbTableHash* tb, int ix, } exp_next = was_next; fixd->next = (FixedDeletion*) exp_next; - was_next = erts_smp_atomic_cmpxchg_mb(&tb->fixdel, + was_next = erts_atomic_cmpxchg_mb(&tb->fixdel, (erts_aint_t) fixd, exp_next); }while (was_next != exp_next); @@ -184,63 +185,57 @@ static ERTS_INLINE int add_fixed_deletion(DbTableHash* tb, int ix, /* optimised version of make_hash (normal case? atomic key) */ #define MAKE_HASH(term) \ ((is_atom(term) ? (atom_tab(atom_val(term))->slot.bucket.hvalue) : \ - make_internal_hash(term)) % MAX_HASH) + make_internal_hash(term, 0)) % MAX_HASH) -#ifdef ERTS_SMP # define DB_HASH_LOCK_MASK (DB_HASH_LOCK_CNT-1) # define GET_LOCK(tb,hval) (&(tb)->locks->lck_vec[(hval) & DB_HASH_LOCK_MASK].lck) +# define GET_LOCK_MAYBE(tb,hval) ((tb)->common.is_thread_safe ? NULL : GET_LOCK(tb,hval)) /* Fine grained read lock */ -static ERTS_INLINE erts_smp_rwmtx_t* RLOCK_HASH(DbTableHash* tb, HashValue hval) +static ERTS_INLINE erts_rwmtx_t* RLOCK_HASH(DbTableHash* tb, HashValue hval) { if (tb->common.is_thread_safe) { return NULL; } else { - erts_smp_rwmtx_t* lck = GET_LOCK(tb,hval); + erts_rwmtx_t* lck = GET_LOCK(tb,hval); ASSERT(tb->common.type & DB_FINE_LOCKED); - erts_smp_rwmtx_rlock(lck); + erts_rwmtx_rlock(lck); return lck; } } /* Fine grained write lock */ -static ERTS_INLINE erts_smp_rwmtx_t* WLOCK_HASH(DbTableHash* tb, HashValue hval) +static ERTS_INLINE erts_rwmtx_t* WLOCK_HASH(DbTableHash* tb, HashValue hval) { if (tb->common.is_thread_safe) { return NULL; } else { - erts_smp_rwmtx_t* lck = GET_LOCK(tb,hval); + erts_rwmtx_t* lck = GET_LOCK(tb,hval); ASSERT(tb->common.type & DB_FINE_LOCKED); - erts_smp_rwmtx_rwlock(lck); + erts_rwmtx_rwlock(lck); return lck; } } -static ERTS_INLINE void RUNLOCK_HASH(erts_smp_rwmtx_t* lck) +static ERTS_INLINE void RUNLOCK_HASH(erts_rwmtx_t* lck) { if (lck != NULL) { - erts_smp_rwmtx_runlock(lck); + erts_rwmtx_runlock(lck); } } -static ERTS_INLINE void WUNLOCK_HASH(erts_smp_rwmtx_t* lck) +static ERTS_INLINE void WUNLOCK_HASH(erts_rwmtx_t* lck) { if (lck != NULL) { - erts_smp_rwmtx_rwunlock(lck); + erts_rwmtx_rwunlock(lck); } } -#else /* ERTS_SMP */ -# define RLOCK_HASH(tb,hval) NULL -# define WLOCK_HASH(tb,hval) NULL -# define RUNLOCK_HASH(lck) ((void)lck) -# define WUNLOCK_HASH(lck) ((void)lck) -#endif /* ERTS_SMP */ #ifdef ERTS_ENABLE_LOCK_CHECK # define IFN_EXCL(tb,cmd) (((tb)->common.is_thread_safe) || (cmd)) -# define IS_HASH_RLOCKED(tb,hval) IFN_EXCL(tb,erts_smp_lc_rwmtx_is_rlocked(GET_LOCK(tb,hval))) -# define IS_HASH_WLOCKED(tb,lck) IFN_EXCL(tb,erts_smp_lc_rwmtx_is_rwlocked(lck)) -# define IS_TAB_WLOCKED(tb) erts_smp_lc_rwmtx_is_rwlocked(&(tb)->common.rwlock) +# define IS_HASH_RLOCKED(tb,hval) IFN_EXCL(tb,erts_lc_rwmtx_is_rlocked(GET_LOCK(tb,hval))) +# define IS_HASH_WLOCKED(tb,lck) IFN_EXCL(tb,erts_lc_rwmtx_is_rwlocked(lck)) +# define IS_TAB_WLOCKED(tb) erts_lc_rwmtx_is_rwlocked(&(tb)->common.rwlock) #else # define IS_HASH_RLOCKED(tb,hval) (1) # define IS_HASH_WLOCKED(tb,hval) (1) @@ -253,33 +248,25 @@ static ERTS_INLINE void WUNLOCK_HASH(erts_smp_rwmtx_t* lck) ** Slot READ locks updated accordingly, unlocked if EOT. */ static ERTS_INLINE Sint next_slot(DbTableHash* tb, Uint ix, - erts_smp_rwmtx_t** lck_ptr) + erts_rwmtx_t** lck_ptr) { -#ifdef ERTS_SMP ix += DB_HASH_LOCK_CNT; if (ix < NACTIVE(tb)) return ix; RUNLOCK_HASH(*lck_ptr); ix = (ix + 1) & DB_HASH_LOCK_MASK; if (ix != 0) *lck_ptr = RLOCK_HASH(tb,ix); return ix; -#else - return (++ix < NACTIVE(tb)) ? ix : 0; -#endif } /* Same as next_slot but with WRITE locking */ static ERTS_INLINE Sint next_slot_w(DbTableHash* tb, Uint ix, - erts_smp_rwmtx_t** lck_ptr) + erts_rwmtx_t** lck_ptr) { -#ifdef ERTS_SMP ix += DB_HASH_LOCK_CNT; if (ix < NACTIVE(tb)) return ix; WUNLOCK_HASH(*lck_ptr); ix = (ix + 1) & DB_HASH_LOCK_MASK; if (ix != 0) *lck_ptr = WLOCK_HASH(tb,ix); return ix; -#else - return next_slot(tb,ix,lck_ptr); -#endif } @@ -318,83 +305,52 @@ struct mp_info { /* A table segment */ struct segment { - HashDbTerm* buckets[SEGSZ]; -#ifdef MYDEBUG - int is_ext_segment; -#endif + HashDbTerm* buckets[1]; }; +#define SIZEOF_SEGMENT(N) \ + (offsetof(struct segment,buckets) + sizeof(HashDbTerm*)*(N)) -/* A segment that also contains a segment table */ -struct ext_segment { - struct segment s; /* The segment itself. Must be first */ - +/* An extended segment table */ +struct ext_segtab { + ErtsThrPrgrLaterOp lop; struct segment** prev_segtab; /* Used when table is shrinking */ - int nsegs; /* Size of segtab */ + int prev_nsegs; /* Size of prev_segtab */ + int nsegs; /* Size of this segtab */ struct segment* segtab[1]; /* The segment table */ }; -#define SIZEOF_EXTSEG(NSEGS) \ - (offsetof(struct ext_segment,segtab) + sizeof(struct segment*)*(NSEGS)) - -#if defined(DEBUG) || defined(VALGRIND) -# define EXTSEG(SEGTAB_PTR) \ - ((struct ext_segment*) (((char*)(SEGTAB_PTR)) - offsetof(struct ext_segment,segtab))) -#endif +#define SIZEOF_EXT_SEGTAB(NSEGS) \ + (offsetof(struct ext_segtab,segtab) + sizeof(struct segment*)*(NSEGS)) static ERTS_INLINE void SET_SEGTAB(DbTableHash* tb, struct segment** segtab) { if (DB_USING_FINE_LOCKING(tb)) - erts_smp_atomic_set_wb(&tb->segtab, (erts_aint_t) segtab); + erts_atomic_set_wb(&tb->segtab, (erts_aint_t) segtab); else - erts_smp_atomic_set_nob(&tb->segtab, (erts_aint_t) segtab); -#ifdef VALGRIND - tb->top_ptr_to_segment_with_active_segtab = EXTSEG(segtab); -#endif + erts_atomic_set_nob(&tb->segtab, (erts_aint_t) segtab); } - -/* How the table segments relate to each other: - - ext_segment: ext_segment: "plain" segment - #=================# #================# #=============# - | bucket[0] |<--+ +------->| bucket[256] | +->| bucket[512] | - | bucket[1] | | | | [257] | | | [513] | - : : | | : : | : : - | bucket[255] | | | | [511] | | | [767] | - |-----------------| | | |----------------| | #=============# - | prev_segtab=NULL| | | +--<---prev_segtab | | - | nsegs = 2 | | | | | nsegs = 256 | | -+->| segtab[0] -->-------+---|---|--<---segtab[0] |<-+ | -| | segtab[1] -->-----------+---|--<---segtab[1] | | | -| #=================# | | segtab[2] -->-----|--+ ext_segment: -| | : : | #================# -+----------------<---------------+ | segtab[255] ->----|----->| bucket[255*256]| - #================# | | | - | : : - | |----------------| - +----<---prev_segtab | - : : -*/ - +/* Used by select_replace on analyze_pattern */ +typedef int (*extra_match_validator_t)(int keypos, Eterm match, Eterm guard, Eterm body); /* ** Forward decl's (static functions) */ -static struct ext_segment* alloc_ext_seg(DbTableHash* tb, unsigned seg_ix, - struct segment** old_segtab); -static int alloc_seg(DbTableHash *tb); +static struct ext_segtab* alloc_ext_segtab(DbTableHash* tb, unsigned seg_ix); +static void alloc_seg(DbTableHash *tb); static int free_seg(DbTableHash *tb, int free_records); -static HashDbTerm* next(DbTableHash *tb, Uint *iptr, erts_smp_rwmtx_t** lck_ptr, - HashDbTerm *list); +static HashDbTerm* next_live(DbTableHash *tb, Uint *iptr, erts_rwmtx_t** lck_ptr, + HashDbTerm *list); static HashDbTerm* search_list(DbTableHash* tb, Eterm key, HashValue hval, HashDbTerm *list); -static void shrink(DbTableHash* tb, int nactive); -static void grow(DbTableHash* tb, int nactive); +static void shrink(DbTableHash* tb, int nitems); +static void grow(DbTableHash* tb, int nitems); static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2, Uint sz, DbTableHash*); -static int analyze_pattern(DbTableHash *tb, Eterm pattern, - struct mp_info *mpi); +static int analyze_pattern(DbTableHash *tb, Eterm pattern, + extra_match_validator_t extra_validator, /* Optional callback */ + struct mp_info *mpi); /* * Method interface functions @@ -418,24 +374,29 @@ static int db_erase_object_hash(DbTable *tbl, Eterm object,Eterm *ret); static int db_slot_hash(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret); -static int db_select_chunk_hash(Process *p, DbTable *tbl, +static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, int reverse, Eterm *ret); -static int db_select_hash(Process *p, DbTable *tbl, +static int db_select_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, int reverse, Eterm *ret); -static int db_select_count_hash(Process *p, DbTable *tbl, - Eterm pattern, Eterm *ret); -static int db_select_delete_hash(Process *p, DbTable *tbl, - Eterm pattern, Eterm *ret); - -static int db_select_continue_hash(Process *p, DbTable *tbl, +static int db_select_continue_hash(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret); -static int db_select_count_continue_hash(Process *p, DbTable *tbl, +static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret); +static int db_select_count_continue_hash(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret); +static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret); static int db_select_delete_continue_hash(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret); + +static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret); +static int db_select_replace_continue_hash(Process *p, DbTable *tbl, + Eterm continuation, Eterm *ret); + static int db_take_hash(Process *, DbTable *, Eterm, Eterm *); static void db_print_hash(fmtfn_t to, void *to_arg, @@ -443,7 +404,7 @@ static void db_print_hash(fmtfn_t to, DbTable *tbl); static int db_free_table_hash(DbTable *tbl); -static int db_free_table_continue_hash(DbTable *tbl); +static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds); static void db_foreach_offheap_hash(DbTable *, @@ -463,9 +424,10 @@ db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle); static ERTS_INLINE void try_shrink(DbTableHash* tb) { int nactive = NACTIVE(tb); - if (nactive > SEGSZ && NITEMS(tb) < (nactive * CHAIN_LEN) + int nitems = NITEMS(tb); + if (nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive) && !IS_FIXED(tb)) { - shrink(tb, nactive); + shrink(tb, nitems); } } @@ -547,17 +509,14 @@ DbTableMethod db_hash = db_select_delete_continue_hash, db_select_count_hash, db_select_count_continue_hash, + db_select_replace_hash, + db_select_replace_continue_hash, db_take_hash, db_delete_all_objects_hash, db_free_table_hash, db_free_table_continue_hash, db_print_hash, db_foreach_offheap_hash, -#ifdef HARDDEBUG - db_check_table_hash, -#else - NULL, -#endif db_lookup_dbterm_hash, db_finalize_dbterm_hash }; @@ -579,7 +538,7 @@ static void restore_fixdel(DbTableHash* tb, FixedDeletion* fixdel) { /*int tries = 0;*/ DEBUG_WAIT(); - if (erts_smp_atomic_cmpxchg_relb(&tb->fixdel, + if (erts_atomic_cmpxchg_relb(&tb->fixdel, (erts_aint_t) fixdel, (erts_aint_t) NULL) != (erts_aint_t) NULL) { /* Oboy, must join lists */ @@ -588,13 +547,13 @@ static void restore_fixdel(DbTableHash* tb, FixedDeletion* fixdel) erts_aint_t exp_tail; while (last->next != NULL) last = last->next; - was_tail = erts_smp_atomic_read_acqb(&tb->fixdel); + was_tail = erts_atomic_read_acqb(&tb->fixdel); do { /* Lockless atomic list insertion */ exp_tail = was_tail; last->next = (FixedDeletion*) exp_tail; /*++tries;*/ DEBUG_WAIT(); - was_tail = erts_smp_atomic_cmpxchg_relb(&tb->fixdel, + was_tail = erts_atomic_cmpxchg_relb(&tb->fixdel, (erts_aint_t) fixdel, exp_tail); }while (was_tail != exp_tail); @@ -605,22 +564,23 @@ static void restore_fixdel(DbTableHash* tb, FixedDeletion* fixdel) ** Table interface routines ie what's called by the bif's */ -void db_unfix_table_hash(DbTableHash *tb) +SWord db_unfix_table_hash(DbTableHash *tb) { FixedDeletion* fixdel; + SWord work = 0; - ERTS_SMP_LC_ASSERT(erts_smp_lc_rwmtx_is_rwlocked(&tb->common.rwlock) - || (erts_smp_lc_rwmtx_is_rlocked(&tb->common.rwlock) + ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock) + || (erts_lc_rwmtx_is_rlocked(&tb->common.rwlock) && !tb->common.is_thread_safe)); restart: - fixdel = (FixedDeletion*) erts_smp_atomic_xchg_mb(&tb->fixdel, + fixdel = (FixedDeletion*) erts_atomic_xchg_mb(&tb->fixdel, (erts_aint_t) NULL); while (fixdel != NULL) { FixedDeletion *fx = fixdel; int ix = fx->slot; HashDbTerm **bp; HashDbTerm *b; - erts_smp_rwmtx_t* lck = WLOCK_HASH(tb,ix); + erts_rwmtx_t* lck = WLOCK_HASH(tb,ix); if (IS_FIXED(tb)) { /* interrupted by fixer */ WUNLOCK_HASH(lck); @@ -628,7 +588,7 @@ restart: if (!IS_FIXED(tb)) { goto restart; /* unfixed again! */ } - return; + return work; } if (ix < NACTIVE(tb)) { bp = &BUCKET(tb, ix); @@ -638,6 +598,7 @@ restart: if (b->hvalue == INVALID_HASH) { *bp = b->next; free_term(tb, b); + work++; b = *bp; } else { bp = &b->next; @@ -653,49 +614,54 @@ restart: (void *) fx, sizeof(FixedDeletion)); ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion)); + work++; } /* ToDo: Maybe try grow/shrink the table as well */ + + return work; } int db_create_hash(Process *p, DbTable *tbl) { DbTableHash *tb = &tbl->hash; - erts_smp_atomic_init_nob(&tb->szm, SEGSZ_MASK); - erts_smp_atomic_init_nob(&tb->nactive, SEGSZ); - erts_smp_atomic_init_nob(&tb->fixdel, (erts_aint_t)NULL); - erts_smp_atomic_init_nob(&tb->segtab, (erts_aint_t)NULL); - SET_SEGTAB(tb, alloc_ext_seg(tb,0,NULL)->segtab); + erts_atomic_init_nob(&tb->szm, FIRST_SEGSZ_MASK); + erts_atomic_init_nob(&tb->nactive, FIRST_SEGSZ); + erts_atomic_init_nob(&tb->fixdel, (erts_aint_t)NULL); + erts_atomic_init_nob(&tb->segtab, (erts_aint_t)NULL); + SET_SEGTAB(tb, tb->first_segtab); tb->nsegs = NSEG_1; - tb->nslots = SEGSZ; + tb->nslots = FIRST_SEGSZ; + tb->first_segtab[0] = (struct segment*) erts_db_alloc(ERTS_ALC_T_DB_SEG, + (DbTable *) tb, + SIZEOF_SEGMENT(FIRST_SEGSZ)); + sys_memset(tb->first_segtab[0], 0, SIZEOF_SEGMENT(FIRST_SEGSZ)); - erts_smp_atomic_init_nob(&tb->is_resizing, 0); -#ifdef ERTS_SMP + erts_atomic_init_nob(&tb->is_resizing, 0); if (tb->common.type & DB_FINE_LOCKED) { - erts_smp_rwmtx_opt_t rwmtx_opt = ERTS_SMP_RWMTX_OPT_DEFAULT_INITER; + erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER; int i; if (tb->common.type & DB_FREQ_READ) - rwmtx_opt.type = ERTS_SMP_RWMTX_TYPE_FREQUENT_READ; + rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ; if (erts_ets_rwmtx_spin_count >= 0) rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count; - tb->locks = (DbTableHashFineLocks*) erts_db_alloc_fnf(ERTS_ALC_T_DB_SEG, /* Other type maybe? */ - (DbTable *) tb, - sizeof(DbTableHashFineLocks)); + tb->locks = (DbTableHashFineLocks*) erts_db_alloc(ERTS_ALC_T_DB_SEG, /* Other type maybe? */ + (DbTable *) tb, + sizeof(DbTableHashFineLocks)); for (i=0; i<DB_HASH_LOCK_CNT; ++i) { - erts_smp_rwmtx_init_opt_x(&tb->locks->lck_vec[i].lck, &rwmtx_opt, - "db_hash_slot", make_small(i)); + erts_rwmtx_init_opt(&tb->locks->lck_vec[i].lck, &rwmtx_opt, + "db_hash_slot", tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB); } - /* This important property is needed to guarantee that the buckets + /* This important property is needed to guarantee the two buckets * involved in a grow/shrink operation it protected by the same lock: */ - ASSERT(erts_smp_atomic_read_nob(&tb->nactive) % DB_HASH_LOCK_CNT == 0); + ASSERT(erts_atomic_read_nob(&tb->nactive) % DB_HASH_LOCK_CNT == 0); } else { /* coarse locking */ tb->locks = NULL; } ERTS_THR_MEMORY_BARRIER; -#endif /* ERST_SMP */ return DB_ERROR_NONE; } @@ -703,22 +669,12 @@ static int db_first_hash(Process *p, DbTable *tbl, Eterm *ret) { DbTableHash *tb = &tbl->hash; Uint ix = 0; - erts_smp_rwmtx_t* lck = RLOCK_HASH(tb,ix); + erts_rwmtx_t* lck = RLOCK_HASH(tb,ix); HashDbTerm* list; - for (;;) { - list = BUCKET(tb,ix); - if (list != NULL) { - if (list->hvalue == INVALID_HASH) { - list = next(tb,&ix,&lck,list); - } - break; - } - if ((ix=next_slot(tb,ix,&lck)) == 0) { - list = NULL; - break; - } - } + list = BUCKET(tb,ix); + list = next_live(tb, &ix, &lck, list); + if (list != NULL) { *ret = db_copy_key(p, tbl, &list->dbterm); RUNLOCK_HASH(lck); @@ -736,7 +692,7 @@ static int db_next_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret) HashValue hval; Uint ix; HashDbTerm* b; - erts_smp_rwmtx_t* lck; + erts_rwmtx_t* lck; hval = MAKE_HASH(key); lck = RLOCK_HASH(tb,hval); @@ -755,13 +711,13 @@ static int db_next_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret) } /* Key found */ - b = next(tb, &ix, &lck, b); + b = next_live(tb, &ix, &lck, b->next); if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) { while (b != 0) { if (!has_live_key(tb, b, key, hval)) { break; } - b = next(tb, &ix, &lck, b); + b = next_live(tb, &ix, &lck, b->next); } } if (b == NULL) { @@ -783,7 +739,7 @@ int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail) HashDbTerm** bp; HashDbTerm* b; HashDbTerm* q; - erts_smp_rwmtx_t* lck; + erts_rwmtx_t* lck; int nitems; int ret = DB_ERROR_NONE; @@ -809,7 +765,7 @@ int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail) if (tb->common.status & DB_SET) { HashDbTerm* bnext = b->next; if (b->hvalue == INVALID_HASH) { - erts_smp_atomic_inc_nob(&tb->common.nitems); + erts_atomic_inc_nob(&tb->common.nitems); } else if (key_clash_fail) { ret = DB_ERROR_BADKEY; @@ -837,7 +793,7 @@ int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail) do { if (db_eq(&tb->common,obj,&q->dbterm)) { if (q->hvalue == INVALID_HASH) { - erts_smp_atomic_inc_nob(&tb->common.nitems); + erts_atomic_inc_nob(&tb->common.nitems); q->hvalue = hval; if (q != b) { /* must move to preserve key insertion order */ *qp = q->next; @@ -858,15 +814,14 @@ Lnew: q->hvalue = hval; q->next = b; *bp = q; - nitems = erts_smp_atomic_inc_read_nob(&tb->common.nitems); + nitems = erts_atomic_inc_read_nob(&tb->common.nitems); WUNLOCK_HASH(lck); { int nactive = NACTIVE(tb); - if (nitems > nactive * (CHAIN_LEN+1) && !IS_FIXED(tb)) { - grow(tb, nactive); + if (nitems > GROW_LIMIT(nactive) && !IS_FIXED(tb)) { + grow(tb, nitems); } } - CHECK_TABLES(); return DB_ERROR_NONE; Ldone: @@ -891,7 +846,6 @@ get_term_list(Process *p, DbTableHash *tb, Eterm key, HashValue hval, } } copy = build_term_list(p, b1, b2, sz, tb); - CHECK_TABLES(); if (bend) { *bend = b2; } @@ -904,7 +858,7 @@ int db_get_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret) HashValue hval; int ix; HashDbTerm* b; - erts_smp_rwmtx_t* lck; + erts_rwmtx_t* lck; hval = MAKE_HASH(key); lck = RLOCK_HASH(tb,hval); @@ -923,70 +877,6 @@ done: RUNLOCK_HASH(lck); return DB_ERROR_NONE; } - -int db_get_element_array(DbTable *tbl, - Eterm key, - int ndex, - Eterm *ret, - int *num_ret) -{ - DbTableHash *tb = &tbl->hash; - HashValue hval; - int ix; - HashDbTerm* b1; - int num = 0; - int retval; - erts_smp_rwmtx_t* lck; - - ASSERT(!IS_FIXED(tbl)); /* no support for fixed tables here */ - - hval = MAKE_HASH(key); - lck = RLOCK_HASH(tb, hval); - ix = hash_to_ix(tb, hval); - b1 = BUCKET(tb, ix); - - while(b1 != 0) { - if (has_live_key(tb,b1,key,hval)) { - if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) { - HashDbTerm* b; - HashDbTerm* b2 = b1->next; - - while(b2 != NULL && has_live_key(tb,b2,key,hval)) { - if (ndex > arityval(b2->dbterm.tpl[0])) { - retval = DB_ERROR_BADITEM; - goto done; - } - b2 = b2->next; - } - - b = b1; - while(b != b2) { - if (num < *num_ret) { - ret[num++] = b->dbterm.tpl[ndex]; - } else { - retval = DB_ERROR_NONE; - goto done; - } - b = b->next; - } - *num_ret = num; - } - else { - ASSERT(*num_ret > 0); - ret[0] = b1->dbterm.tpl[ndex]; - *num_ret = 1; - } - retval = DB_ERROR_NONE; - goto done; - } - b1 = b1->next; - } - retval = DB_ERROR_BADKEY; -done: - RUNLOCK_HASH(lck); - return retval; -} - static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret) { @@ -994,7 +884,7 @@ static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret) HashValue hval; int ix; HashDbTerm* b1; - erts_smp_rwmtx_t* lck; + erts_rwmtx_t* lck; hval = MAKE_HASH(key); ix = hash_to_ix(tb, hval); @@ -1023,7 +913,7 @@ static int db_get_element_hash(Process *p, DbTable *tbl, HashValue hval; int ix; HashDbTerm* b1; - erts_smp_rwmtx_t* lck; + erts_rwmtx_t* lck; int retval; hval = MAKE_HASH(key); @@ -1079,54 +969,6 @@ done: } /* - * Very internal interface, removes elements of arity two from - * BAG. Used for the PID meta table - */ -int db_erase_bag_exact2(DbTable *tbl, Eterm key, Eterm value) -{ - DbTableHash *tb = &tbl->hash; - HashValue hval; - int ix; - HashDbTerm** bp; - HashDbTerm* b; - erts_smp_rwmtx_t* lck; - int found = 0; - - hval = MAKE_HASH(key); - lck = WLOCK_HASH(tb,hval); - ix = hash_to_ix(tb, hval); - bp = &BUCKET(tb, ix); - b = *bp; - - ASSERT(!IS_FIXED(tb)); - ASSERT((tb->common.status & DB_BAG)); - ASSERT(!tb->common.compress); - - while(b != 0) { - if (has_live_key(tb,b,key,hval)) { - found = 1; - if ((arityval(b->dbterm.tpl[0]) == 2) && - EQ(value, b->dbterm.tpl[2])) { - *bp = b->next; - free_term(tb, b); - erts_smp_atomic_dec_nob(&tb->common.nitems); - b = *bp; - break; - } - } else if (found) { - break; - } - bp = &b->next; - b = b->next; - } - WUNLOCK_HASH(lck); - if (found) { - try_shrink(tb); - } - return DB_ERROR_NONE; -} - -/* ** NB, this is for the db_erase/2 bif. */ int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret) @@ -1136,7 +978,7 @@ int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret) int ix; HashDbTerm** bp; HashDbTerm* b; - erts_smp_rwmtx_t* lck; + erts_rwmtx_t* lck; int nitems_diff = 0; hval = MAKE_HASH(key); @@ -1168,7 +1010,7 @@ int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret) } WUNLOCK_HASH(lck); if (nitems_diff) { - erts_smp_atomic_add_nob(&tb->common.nitems, nitems_diff); + erts_atomic_add_nob(&tb->common.nitems, nitems_diff); try_shrink(tb); } *ret = am_true; @@ -1185,7 +1027,7 @@ static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret) int ix; HashDbTerm** bp; HashDbTerm* b; - erts_smp_rwmtx_t* lck; + erts_rwmtx_t* lck; int nitems_diff = 0; int nkeys = 0; Eterm key; @@ -1226,7 +1068,7 @@ static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret) } WUNLOCK_HASH(lck); if (nitems_diff) { - erts_smp_atomic_add_nob(&tb->common.nitems, nitems_diff); + erts_atomic_add_nob(&tb->common.nitems, nitems_diff); try_shrink(tb); } *ret = am_true; @@ -1237,7 +1079,7 @@ static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret) static int db_slot_hash(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret) { DbTableHash *tb = &tbl->hash; - erts_smp_rwmtx_t* lck; + erts_rwmtx_t* lck; Sint slot; int retval; int nactive; @@ -1273,822 +1115,1102 @@ static BIF_RETTYPE bif_trap1(Export *bif, { BIF_TRAP1(bif, p, p1); } - + + /* - * Continue collecting select matches, this may happen either due to a trap - * or when the user calls ets:select/1 + * Match traversal callbacks */ -static int db_select_continue_hash(Process *p, - DbTable *tbl, - Eterm continuation, - Eterm *ret) -{ - DbTableHash *tb = &tbl->hash; - Sint slot_ix; - Sint save_slot_ix; - Sint chunk_size; - int all_objects; - Binary *mp; - int num_left = 1000; - HashDbTerm *current = 0; - Eterm match_list; - Eterm *hp; - Eterm match_res; - Sint got; - Eterm *tptr; - erts_smp_rwmtx_t* lck; -#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0); - - /* Decode continuation. We know it's a tuple but not the arity or anything else */ +/* Called when no match is possible. + * context_ptr: Pointer to context + * ret: Pointer to traversal function term return. + * + * Both the direct return value and 'ret' are used as the traversal function return values. + */ +typedef int (*mtraversal_on_nothing_can_match_t)(void* context_ptr, Eterm* ret); + +/* Called for each match result. + * context_ptr: Pointer to context + * slot_ix: Current slot index + * current_ptr_ptr: Triple pointer to either the bucket or the 'next' pointer in the previous element; + * can be (carefully) used to adjust iteration when deleting or replacing elements. + * match_res: The result of running the match program against the current term. + * + * Should return 1 for successful match, 0 otherwise. + */ +typedef int (*mtraversal_on_match_res_t)(void* context_ptr, Sint slot_ix, HashDbTerm*** current_ptr_ptr, + Eterm match_res); + +/* Called when either we've matched enough elements in this cycle or EOT was reached. + * context_ptr: Pointer to context + * slot_ix: Current slot index + * got: How many elements have been matched so far + * iterations_left: Number of intended iterations (down from an initial max.) left in this traversal cycle + * mpp: Double pointer to the compiled match program + * ret: Pointer to traversal function term return. + * + * Both the direct return value and 'ret' are used as the traversal function return values. + * If *mpp is set to NULL, it won't be deallocated (useful for trapping.) + */ +typedef int (*mtraversal_on_loop_ended_t)(void* context_ptr, Sint slot_ix, Sint got, + Sint iterations_left, Binary** mpp, Eterm* ret); + +/* Called when it's time to trap + * context_ptr: Pointer to context + * slot_ix: Current slot index + * got: How many elements have been matched so far + * mpp: Double pointer to the compiled match program + * ret: Pointer to traversal function term return. + * + * Both the direct return value and 'ret' are used as the traversal function return values. + * If *mpp is set to NULL, it won't be deallocated (useful for trapping.) + */ +typedef int (*mtraversal_on_trap_t)(void* context_ptr, Sint slot_ix, Sint got, Binary** mpp, Eterm* ret); - tptr = tuple_val(continuation); +/* + * Begin hash table match traversal + */ +static int match_traverse(Process* p, DbTableHash* tb, + Eterm pattern, + extra_match_validator_t extra_match_validator, /* Optional */ + Sint chunk_size, /* If 0, no chunking */ + Sint iterations_left, /* Nr. of iterations left */ + Eterm** hpp, /* Heap */ + int lock_for_write, /* Set to 1 if we're going to delete or + modify existing terms */ + mtraversal_on_nothing_can_match_t on_nothing_can_match, + mtraversal_on_match_res_t on_match_res, + mtraversal_on_loop_ended_t on_loop_ended, + mtraversal_on_trap_t on_trap, + void* context_ptr, /* State for callbacks above */ + Eterm* ret) +{ + Sint slot_ix; /* Slot index */ + HashDbTerm** current_ptr; /* Refers to either the bucket pointer or + * the 'next' pointer in the previous term + */ + HashDbTerm* saved_current; /* Helper to avoid double skip on match */ + struct mp_info mpi; + unsigned current_list_pos = 0; /* Prefound buckets list index */ + Eterm match_res; + Sint got = 0; /* Matched terms counter */ + erts_rwmtx_t* lck; /* Slot lock */ + int ret_value; + erts_rwmtx_t* (*lock_hash_function)(DbTableHash*, HashValue) + = (lock_for_write ? WLOCK_HASH : RLOCK_HASH); + void (*unlock_hash_function)(erts_rwmtx_t*) + = (lock_for_write ? WUNLOCK_HASH : RUNLOCK_HASH); + Sint (*next_slot_function)(DbTableHash*, Uint, erts_rwmtx_t**) + = (lock_for_write ? next_slot_w : next_slot); + + if ((ret_value = analyze_pattern(tb, pattern, extra_match_validator, &mpi)) + != DB_ERROR_NONE) + { + *ret = NIL; + goto done; + } - if (arityval(*tptr) != 6) - RET_TO_BIF(NIL,DB_ERROR_BADPARAM); - - if (!is_small(tptr[2]) || !is_small(tptr[3]) || !is_binary(tptr[4]) || - !(is_list(tptr[5]) || tptr[5] == NIL) || !is_small(tptr[6])) - RET_TO_BIF(NIL,DB_ERROR_BADPARAM); - if ((chunk_size = signed_val(tptr[3])) < 0) - RET_TO_BIF(NIL,DB_ERROR_BADPARAM); - if (!(thing_subtag(*binary_val(tptr[4])) == REFC_BINARY_SUBTAG)) - RET_TO_BIF(NIL,DB_ERROR_BADPARAM); - mp = ((ProcBin *) binary_val(tptr[4]))->val; - if (!IsMatchProgBinary(mp)) - RET_TO_BIF(NIL,DB_ERROR_BADPARAM); - all_objects = mp->flags & BIN_FLAG_ALL_OBJECTS; - match_list = tptr[5]; - if ((got = signed_val(tptr[6])) < 0) - RET_TO_BIF(NIL,DB_ERROR_BADPARAM); + if (!mpi.something_can_match) { + /* Can't possibly match anything */ + ret_value = on_nothing_can_match(context_ptr, ret); + goto done; + } - slot_ix = signed_val(tptr[2]); - if (slot_ix < 0 /* EOT */ - || (chunk_size && got >= chunk_size)) { - goto done; /* Already got all or enough in the match_list */ + if (mpi.all_objects) { + mpi.mp->intern.flags |= BIN_FLAG_ALL_OBJECTS; } - lck = RLOCK_HASH(tb,slot_ix); - if (slot_ix >= NACTIVE(tb)) { - RUNLOCK_HASH(lck); - RET_TO_BIF(NIL,DB_ERROR_BADPARAM); + /* + * Look for initial slot / bucket + */ + if (!mpi.key_given) { + /* Run this code if pattern is variable or GETKEY(pattern) */ + /* is a variable */ + slot_ix = 0; + lck = lock_hash_function(tb,slot_ix); + for (;;) { + ASSERT(slot_ix < NACTIVE(tb)); + if (*(current_ptr = &BUCKET(tb,slot_ix)) != NULL) { + break; + } + slot_ix = next_slot_function(tb,slot_ix,&lck); + if (slot_ix == 0) { + ret_value = on_loop_ended(context_ptr, slot_ix, got, iterations_left, &mpi.mp, ret); + goto done; + } + } + } else { + /* We have at least one */ + slot_ix = mpi.lists[current_list_pos].ix; + lck = lock_hash_function(tb, slot_ix); + current_ptr = mpi.lists[current_list_pos].bucket; + ASSERT(*current_ptr == BUCKET(tb,slot_ix)); + ++current_list_pos; } - while ((current = BUCKET(tb,slot_ix)) == NULL) { - slot_ix = next_slot(tb, slot_ix, &lck); - if (slot_ix == 0) { - slot_ix = -1; /* EOT */ - goto done; - } - } + /* + * Execute traversal cycle + */ for(;;) { - if (current->hvalue != INVALID_HASH && - (match_res = db_match_dbterm(&tb->common, p, mp, all_objects, - ¤t->dbterm, &hp, 2), - is_value(match_res))) { + if (*current_ptr != NULL) { + if ((*current_ptr)->hvalue != INVALID_HASH) { + match_res = db_match_dbterm(&tb->common, p, mpi.mp, 0, + &(*current_ptr)->dbterm, hpp, 2); + saved_current = *current_ptr; + if (on_match_res(context_ptr, slot_ix, ¤t_ptr, match_res)) { + ++got; + } + --iterations_left; + if (*current_ptr != saved_current) { + /* Don't advance to next, the callback did it already */ + continue; + } + } + current_ptr = &((*current_ptr)->next); + } + else if (mpi.key_given) { /* Key is bound */ + unlock_hash_function(lck); + if (current_list_pos == mpi.num_lists) { + ret_value = on_loop_ended(context_ptr, -1, got, iterations_left, &mpi.mp, ret); + goto done; + } else { + slot_ix = mpi.lists[current_list_pos].ix; + lck = lock_hash_function(tb, slot_ix); + current_ptr = mpi.lists[current_list_pos].bucket; + ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix)); + ++current_list_pos; + } + } + else { /* Key is variable */ + if ((slot_ix = next_slot_function(tb,slot_ix,&lck)) == 0) { + slot_ix = -1; + break; + } + if (chunk_size && got >= chunk_size) { + unlock_hash_function(lck); + break; + } + if (iterations_left <= 0 || MBUF(p)) { + /* + * We have either reached our limit, or just created some heap fragments. + * Since many heap fragments will make the GC slower, trap and GC now. + */ + unlock_hash_function(lck); + ret_value = on_trap(context_ptr, slot_ix, got, &mpi.mp, ret); + goto done; + } + current_ptr = &BUCKET(tb,slot_ix); + } + } - match_list = CONS(hp, match_res, match_list); - ++got; - } + ret_value = on_loop_ended(context_ptr, slot_ix, got, iterations_left, &mpi.mp, ret); - --num_left; - save_slot_ix = slot_ix; - if ((current = next(tb, (Uint*)&slot_ix, &lck, current)) == NULL) { - slot_ix = -1; /* EOT */ - break; - } - if (slot_ix != save_slot_ix) { - if (chunk_size && got >= chunk_size) { - RUNLOCK_HASH(lck); - break; - } - if (num_left <= 0 || MBUF(p)) { - /* - * We have either reached our limit, or just created some heap fragments. - * Since many heap fragments will make the GC slower, trap and GC now. - */ - RUNLOCK_HASH(lck); - goto trap; - } - } - } done: - BUMP_REDS(p, 1000 - num_left); - if (chunk_size) { - Eterm continuation; - Eterm rest = NIL; - Sint rest_size = 0; - - if (got > chunk_size) { /* Cannot write destructively here, - the list may have - been in user space */ - rest = NIL; - hp = HAlloc(p, (got - chunk_size) * 2); - while (got-- > chunk_size) { - rest = CONS(hp, CAR(list_val(match_list)), rest); - hp += 2; - match_list = CDR(list_val(match_list)); - ++rest_size; - } - } - if (rest != NIL || slot_ix >= 0) { - hp = HAlloc(p,3+7); - continuation = TUPLE6(hp, tptr[1], make_small(slot_ix), - tptr[3], tptr[4], rest, - make_small(rest_size)); - hp += 7; - RET_TO_BIF(TUPLE2(hp, match_list, continuation),DB_ERROR_NONE); - } else { - if (match_list != NIL) { - hp = HAlloc(p, 3); - RET_TO_BIF(TUPLE2(hp, match_list, am_EOT),DB_ERROR_NONE); - } else { - RET_TO_BIF(am_EOT, DB_ERROR_NONE); - } - } + /* We should only jump directly to this label if + * we've already called on_nothing_can_match / on_loop_ended / on_trap + */ + if (mpi.mp != NULL) { + erts_bin_free(mpi.mp); } - RET_TO_BIF(match_list,DB_ERROR_NONE); - -trap: - BUMP_ALL_REDS(p); - - hp = HAlloc(p,7); - continuation = TUPLE6(hp, tptr[1], make_small(slot_ix), tptr[3], - tptr[4], match_list, make_small(got)); - RET_TO_BIF(bif_trap1(&ets_select_continue_exp, p, - continuation), - DB_ERROR_NONE); - -#undef RET_TO_BIF - -} + if (mpi.lists != mpi.dlists) { + erts_free(ERTS_ALC_T_DB_SEL_LIST, + (void *) mpi.lists); + } + return ret_value; -static int db_select_hash(Process *p, DbTable *tbl, - Eterm pattern, int reverse, - Eterm *ret) -{ - return db_select_chunk_hash(p, tbl, pattern, 0, reverse, ret); } -static int db_select_chunk_hash(Process *p, DbTable *tbl, - Eterm pattern, Sint chunk_size, - int reverse, /* not used */ - Eterm *ret) +/* + * Continue hash table match traversal + */ +static int match_traverse_continue(Process* p, DbTableHash* tb, + Sint chunk_size, /* If 0, no chunking */ + Sint iterations_left, /* Nr. of iterations left */ + Eterm** hpp, /* Heap */ + Sint slot_ix, /* Slot index to resume traversal from */ + Sint got, /* Matched terms counter */ + Binary** mpp, /* Existing match program */ + int lock_for_write, /* Set to 1 if we're going to delete or + modify existing terms */ + mtraversal_on_match_res_t on_match_res, + mtraversal_on_loop_ended_t on_loop_ended, + mtraversal_on_trap_t on_trap, + void* context_ptr, /* For callbacks */ + Eterm* ret) { - DbTableHash *tb = &tbl->hash; - struct mp_info mpi; - Sint slot_ix; - HashDbTerm *current = 0; - unsigned current_list_pos = 0; - Eterm match_list; + int all_objects = (*mpp)->intern.flags & BIN_FLAG_ALL_OBJECTS; + HashDbTerm** current_ptr; /* Refers to either the bucket pointer or + * the 'next' pointer in the previous term + */ + HashDbTerm* saved_current; /* Helper to avoid double skip on match */ Eterm match_res; - Eterm *hp; - int num_left = 1000; - Uint got = 0; - Eterm continuation; - int errcode; - Eterm mpb; - erts_smp_rwmtx_t* lck; - - -#define RET_TO_BIF(Term,RetVal) do { \ - if (mpi.mp != NULL) { \ - erts_bin_free(mpi.mp); \ - } \ - if (mpi.lists != mpi.dlists) { \ - erts_free(ERTS_ALC_T_DB_SEL_LIST, \ - (void *) mpi.lists); \ - } \ - *ret = (Term); \ - return RetVal; \ - } while(0) - - - if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) { - RET_TO_BIF(NIL,errcode); - } - - if (!mpi.something_can_match) { - if (chunk_size) { - RET_TO_BIF(am_EOT, DB_ERROR_NONE); /* We're done */ - } - RET_TO_BIF(NIL, DB_ERROR_NONE); - /* can't possibly match anything */ + erts_rwmtx_t* lck; + int ret_value; + erts_rwmtx_t* (*lock_hash_function)(DbTableHash*, HashValue) + = (lock_for_write ? WLOCK_HASH : RLOCK_HASH); + void (*unlock_hash_function)(erts_rwmtx_t*) + = (lock_for_write ? WUNLOCK_HASH : RUNLOCK_HASH); + Sint (*next_slot_function)(DbTableHash* tb, Uint ix, erts_rwmtx_t** lck_ptr) + = (lock_for_write ? next_slot_w : next_slot); + + if (got < 0) { + *ret = NIL; + return DB_ERROR_BADPARAM; + } + + if (slot_ix < 0 /* EOT */ + || (chunk_size && got >= chunk_size)) + { + /* Already got all or enough in the match_list */ + ret_value = on_loop_ended(context_ptr, slot_ix, got, iterations_left, mpp, ret); + goto done; } - if (!mpi.key_given) { - /* Run this code if pattern is variable or GETKEY(pattern) */ - /* is a variable */ - slot_ix = 0; - lck = RLOCK_HASH(tb,slot_ix); - for (;;) { - ASSERT(slot_ix < NACTIVE(tb)); - if ((current = BUCKET(tb,slot_ix)) != NULL) { - break; - } - slot_ix = next_slot(tb,slot_ix,&lck); - if (slot_ix == 0) { - if (chunk_size) { - RET_TO_BIF(am_EOT, DB_ERROR_NONE); /* We're done */ - } - RET_TO_BIF(NIL,DB_ERROR_NONE); - } - } - } else { - /* We have at least one */ - slot_ix = mpi.lists[current_list_pos].ix; - lck = RLOCK_HASH(tb, slot_ix); - current = *(mpi.lists[current_list_pos].bucket); - ASSERT(current == BUCKET(tb,slot_ix)); - ++current_list_pos; + lck = lock_hash_function(tb, slot_ix); + if (slot_ix >= NACTIVE(tb)) { /* Is this possible? */ + unlock_hash_function(lck); + *ret = NIL; + ret_value = DB_ERROR_BADPARAM; + goto done; } - match_list = NIL; - + /* + * Resume traversal cycle from where we left + */ + current_ptr = &BUCKET(tb,slot_ix); for(;;) { - if (current != NULL) { - if (current->hvalue != INVALID_HASH) { - match_res = db_match_dbterm(&tb->common, p, mpi.mp, 0, - ¤t->dbterm, &hp, 2); - if (is_value(match_res)) { - match_list = CONS(hp, match_res, match_list); - ++got; - } - } - current = current->next; - } - else if (mpi.key_given) { /* Key is bound */ - RUNLOCK_HASH(lck); - if (current_list_pos == mpi.num_lists) { - slot_ix = -1; /* EOT */ - goto done; - } else { - slot_ix = mpi.lists[current_list_pos].ix; - lck = RLOCK_HASH(tb, slot_ix); - current = *(mpi.lists[current_list_pos].bucket); - ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix)); - ++current_list_pos; - } - } - else { /* Key is variable */ - --num_left; - - if ((slot_ix=next_slot(tb,slot_ix,&lck)) == 0) { - slot_ix = -1; - break; - } - if (chunk_size && got >= chunk_size) { - RUNLOCK_HASH(lck); - break; - } - if (num_left <= 0 || MBUF(p)) { - /* - * We have either reached our limit, or just created some heap fragments. - * Since many heap fragments will make the GC slower, trap and GC now. - */ - RUNLOCK_HASH(lck); - goto trap; - } - current = BUCKET(tb,slot_ix); + if (*current_ptr != NULL) { + if ((*current_ptr)->hvalue != INVALID_HASH) { + match_res = db_match_dbterm(&tb->common, p, *mpp, all_objects, + &(*current_ptr)->dbterm, hpp, 2); + saved_current = *current_ptr; + if (on_match_res(context_ptr, slot_ix, ¤t_ptr, match_res)) { + ++got; + } + --iterations_left; + if (*current_ptr != saved_current) { + /* Don't advance to next, the callback did it already */ + continue; + } + } + current_ptr = &((*current_ptr)->next); + } + else { + if ((slot_ix=next_slot_function(tb,slot_ix,&lck)) == 0) { + slot_ix = -1; + break; + } + if (chunk_size && got >= chunk_size) { + unlock_hash_function(lck); + break; + } + if (iterations_left <= 0 || MBUF(p)) { + /* + * We have either reached our limit, or just created some heap fragments. + * Since many heap fragments will make the GC slower, trap and GC now. + */ + unlock_hash_function(lck); + ret_value = on_trap(context_ptr, slot_ix, got, mpp, ret); + goto done; + } + current_ptr = &BUCKET(tb,slot_ix); } } -done: - BUMP_REDS(p, 1000 - num_left); - if (chunk_size) { - Eterm continuation; - Eterm rest = NIL; - Sint rest_size = 0; - - if (mpi.all_objects) - (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS; - if (got > chunk_size) { /* Split list in return value and 'rest' */ - Eterm tmp = match_list; - rest = match_list; - while (got-- > chunk_size + 1) { - tmp = CDR(list_val(tmp)); - ++rest_size; - } - ++rest_size; - match_list = CDR(list_val(tmp)); - CDR(list_val(tmp)) = NIL; /* Destructive, the list has never - been in 'user space' */ - } - if (rest != NIL || slot_ix >= 0) { /* Need more calls */ - hp = HAlloc(p,3+7+PROC_BIN_SIZE); - mpb =db_make_mp_binary(p,(mpi.mp),&hp); - if (mpi.all_objects) - (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS; - continuation = TUPLE6(hp, tb->common.id,make_small(slot_ix), - make_small(chunk_size), - mpb, rest, - make_small(rest_size)); - mpi.mp = NULL; /*otherwise the return macro will destroy it */ - hp += 7; - RET_TO_BIF(TUPLE2(hp, match_list, continuation),DB_ERROR_NONE); - } else { /* All data is exhausted */ - if (match_list != NIL) { /* No more data to search but still a - result to return to the caller */ - hp = HAlloc(p, 3); - RET_TO_BIF(TUPLE2(hp, match_list, am_EOT),DB_ERROR_NONE); - } else { /* Reached the end of the ttable with no data to return */ - RET_TO_BIF(am_EOT, DB_ERROR_NONE); - } - } - } - RET_TO_BIF(match_list,DB_ERROR_NONE); -trap: - BUMP_ALL_REDS(p); - if (mpi.all_objects) - (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS; - hp = HAlloc(p,7+PROC_BIN_SIZE); - mpb =db_make_mp_binary(p,(mpi.mp),&hp); - continuation = TUPLE6(hp, tb->common.id, make_small(slot_ix), - make_small(chunk_size), - mpb, match_list, - make_small(got)); - mpi.mp = NULL; /*otherwise the return macro will destroy it */ - RET_TO_BIF(bif_trap1(&ets_select_continue_exp, p, - continuation), - DB_ERROR_NONE); -#undef RET_TO_BIF + ret_value = on_loop_ended(context_ptr, slot_ix, got, iterations_left, mpp, ret); + +done: + /* We should only jump directly to this label if + * we've already called on_loop_ended / on_trap + */ + return ret_value; } -static int db_select_count_hash(Process *p, - DbTable *tbl, - Eterm pattern, - Eterm *ret) + +/* + * Common traversal trapping/continuation code; + * used by select_count, select_delete and select_replace, + * as well as their continuation-handling counterparts. + */ + +static ERTS_INLINE int on_mtraversal_simple_trap(Export* trap_function, + Process* p, + DbTableHash* tb, + Eterm tid, + Eterm* prev_continuation_tptr, + Sint slot_ix, + Sint got, + Binary** mpp, + Eterm* ret) { - DbTableHash *tb = &tbl->hash; - struct mp_info mpi; - Uint slot_ix = 0; - HashDbTerm* current = NULL; - unsigned current_list_pos = 0; - Eterm *hp; - int num_left = 1000; - Uint got = 0; - Eterm continuation; - int errcode; + Eterm* hp; Eterm egot; Eterm mpb; - erts_smp_rwmtx_t* lck; - -#define RET_TO_BIF(Term,RetVal) do { \ - if (mpi.mp != NULL) { \ - erts_bin_free(mpi.mp); \ - } \ - if (mpi.lists != mpi.dlists) { \ - erts_free(ERTS_ALC_T_DB_SEL_LIST, \ - (void *) mpi.lists); \ - } \ - *ret = (Term); \ - return RetVal; \ - } while(0) - - - if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) { - RET_TO_BIF(NIL,errcode); - } - - if (!mpi.something_can_match) { - RET_TO_BIF(make_small(0), DB_ERROR_NONE); - /* can't possibly match anything */ - } - - if (!mpi.key_given) { - /* Run this code if pattern is variable or GETKEY(pattern) */ - /* is a variable */ - slot_ix = 0; - lck = RLOCK_HASH(tb,slot_ix); - current = BUCKET(tb,slot_ix); - } else { - /* We have at least one */ - slot_ix = mpi.lists[current_list_pos].ix; - lck = RLOCK_HASH(tb, slot_ix); - current = *(mpi.lists[current_list_pos].bucket); - ASSERT(current == BUCKET(tb,slot_ix)); - ++current_list_pos; - } + Eterm continuation; + int is_first_trap = (prev_continuation_tptr == NULL); + size_t base_halloc_sz = (is_first_trap ? ERTS_MAGIC_REF_THING_SIZE : 0); - for(;;) { - if (current != NULL) { - if (current->hvalue != INVALID_HASH) { - if (db_match_dbterm(&tb->common, p, mpi.mp, 0, - ¤t->dbterm, NULL,0) == am_true) { - ++got; - } - --num_left; - } - current = current->next; - } - else { /* next bucket */ - if (mpi.key_given) { /* Key is bound */ - RUNLOCK_HASH(lck); - if (current_list_pos == mpi.num_lists) { - goto done; - } else { - slot_ix = mpi.lists[current_list_pos].ix; - lck = RLOCK_HASH(tb, slot_ix); - current = *(mpi.lists[current_list_pos].bucket); - ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix)); - ++current_list_pos; - } - } - else { - if ((slot_ix=next_slot(tb,slot_ix,&lck)) == 0) { - goto done; - } - if (num_left <= 0) { - RUNLOCK_HASH(lck); - goto trap; - } - current = BUCKET(tb,slot_ix); - } - } - } -done: - BUMP_REDS(p, 1000 - num_left); - RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE); -trap: BUMP_ALL_REDS(p); if (IS_USMALL(0, got)) { - hp = HAlloc(p, PROC_BIN_SIZE + 5); + hp = HAllocX(p, base_halloc_sz + 5, ERTS_MAGIC_REF_THING_SIZE); egot = make_small(got); } else { - hp = HAlloc(p, BIG_UINT_HEAP_SIZE + PROC_BIN_SIZE + 5); + hp = HAllocX(p, base_halloc_sz + BIG_UINT_HEAP_SIZE + 5, + ERTS_MAGIC_REF_THING_SIZE); egot = uint_to_big(got, hp); hp += BIG_UINT_HEAP_SIZE; } - mpb = db_make_mp_binary(p,mpi.mp,&hp); - continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix), - mpb, - egot); - mpi.mp = NULL; /*otherwise the return macro will destroy it */ - RET_TO_BIF(bif_trap1(&ets_select_count_continue_exp, p, - continuation), - DB_ERROR_NONE); -#undef RET_TO_BIF + if (is_first_trap) { + if (is_atom(tid)) + tid = erts_db_make_tid(p, &tb->common); + mpb = erts_db_make_match_prog_ref(p, *mpp, &hp); + *mpp = NULL; /* otherwise the caller will destroy it */ + } + else { + ASSERT(!is_atom(tid)); + mpb = prev_continuation_tptr[3]; + } + + continuation = TUPLE4( + hp, + tid, + make_small(slot_ix), + mpb, + egot); + *ret = bif_trap1(trap_function, p, continuation); + return DB_ERROR_NONE; } -static int db_select_delete_hash(Process *p, - DbTable *tbl, - Eterm pattern, - Eterm *ret) +static ERTS_INLINE int unpack_simple_mtraversal_continuation(Eterm continuation, + Eterm** tptr_ptr, + Eterm* tid_ptr, + Sint* slot_ix_p, + Binary** mpp, + Sint* got_p) { - DbTableHash *tb = &tbl->hash; - struct mp_info mpi; - Uint slot_ix = 0; - HashDbTerm **current = NULL; - unsigned current_list_pos = 0; - Eterm *hp; - int num_left = 1000; - Uint got = 0; - Eterm continuation; - int errcode; - Uint last_pseudo_delete = (Uint)-1; - Eterm mpb; - Eterm egot; -#ifdef ERTS_SMP - erts_aint_t fixated_by_me = tb->common.is_thread_safe ? 0 : 1; /* ToDo: something nicer */ -#else - erts_aint_t fixated_by_me = 0; -#endif - erts_smp_rwmtx_t* lck; - -#define RET_TO_BIF(Term,RetVal) do { \ - if (mpi.mp != NULL) { \ - erts_bin_free(mpi.mp); \ - } \ - if (mpi.lists != mpi.dlists) { \ - erts_free(ERTS_ALC_T_DB_SEL_LIST, \ - (void *) mpi.lists); \ - } \ - *ret = (Term); \ - return RetVal; \ - } while(0) - + Eterm* tptr; + ASSERT(is_tuple(continuation)); + tptr = tuple_val(continuation); + if (arityval(*tptr) != 4) + return 1; - if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) { - RET_TO_BIF(NIL,errcode); + if (! is_small(tptr[2]) || !(is_big(tptr[4]) || is_small(tptr[4]))) { + return 1; } - if (!mpi.something_can_match) { - RET_TO_BIF(make_small(0), DB_ERROR_NONE); - /* can't possibly match anything */ + *tptr_ptr = tptr; + *tid_ptr = tptr[1]; + *slot_ix_p = unsigned_val(tptr[2]); + *mpp = erts_db_get_match_prog_binary_unchecked(tptr[3]); + if (is_big(tptr[4])) { + *got_p = big_to_uint32(tptr[4]); } + else { + *got_p = unsigned_val(tptr[4]); + } + return 0; +} - if (!mpi.key_given) { - /* Run this code if pattern is variable or GETKEY(pattern) */ - /* is a variable */ - lck = WLOCK_HASH(tb,slot_ix); - current = &BUCKET(tb,slot_ix); - } else { - /* We have at least one */ - slot_ix = mpi.lists[current_list_pos].ix; - lck = WLOCK_HASH(tb, slot_ix); - current = mpi.lists[current_list_pos++].bucket; - ASSERT(*current == BUCKET(tb,slot_ix)); + +/* + * + * select / select_chunk match traversal + * + */ + +#define MAX_SELECT_CHUNK_ITERATIONS 1000 + +typedef struct { + Process* p; + DbTableHash* tb; + Eterm tid; + Eterm* hp; + Sint chunk_size; + Eterm match_list; + Eterm* prev_continuation_tptr; +} mtraversal_select_chunk_context_t; + +static int mtraversal_select_chunk_on_nothing_can_match(void* context_ptr, Eterm* ret) { + mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr; + *ret = (sc_context_ptr->chunk_size > 0 ? am_EOT : NIL); + return DB_ERROR_NONE; +} + +static int mtraversal_select_chunk_on_match_res(void* context_ptr, Sint slot_ix, + HashDbTerm*** current_ptr_ptr, + Eterm match_res) +{ + mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr; + if (is_value(match_res)) { + sc_context_ptr->match_list = CONS(sc_context_ptr->hp, match_res, sc_context_ptr->match_list); + return 1; } + return 0; +} +static int mtraversal_select_chunk_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got, + Sint iterations_left, Binary** mpp, Eterm* ret) +{ + mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr; + Eterm mpb; - for(;;) { - if ((*current) == NULL) { - if (mpi.key_given) { /* Key is bound */ - WUNLOCK_HASH(lck); - if (current_list_pos == mpi.num_lists) { - goto done; - } else { - slot_ix = mpi.lists[current_list_pos].ix; - lck = WLOCK_HASH(tb, slot_ix); - current = mpi.lists[current_list_pos].bucket; - ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix)); - ++current_list_pos; - } - } else { - if ((slot_ix=next_slot_w(tb,slot_ix,&lck)) == 0) { - goto done; - } - if (num_left <= 0) { - WUNLOCK_HASH(lck); - goto trap; - } - current = &BUCKET(tb,slot_ix); - } - } - else if ((*current)->hvalue == INVALID_HASH) { - current = &((*current)->next); - } - else { - int did_erase = 0; - if (db_match_dbterm(&tb->common, p, mpi.mp, 0, - &(*current)->dbterm, NULL, 0) == am_true) { - HashDbTerm *del; - if (NFIXED(tb) > fixated_by_me) { /* fixated by others? */ - if (slot_ix != last_pseudo_delete) { - if (!add_fixed_deletion(tb, slot_ix, fixated_by_me)) - goto do_erase; - last_pseudo_delete = slot_ix; - } - (*current)->hvalue = INVALID_HASH; - } else { - do_erase: - del = *current; - *current = (*current)->next; - free_term(tb, del); - did_erase = 1; - } - erts_smp_atomic_dec_nob(&tb->common.nitems); - ++got; - } - --num_left; - if (!did_erase) { - current = &((*current)->next); - } - } + if (iterations_left == MAX_SELECT_CHUNK_ITERATIONS) { + /* We didn't get to iterate a single time, which means EOT */ + ASSERT(sc_context_ptr->match_list == NIL); + *ret = (sc_context_ptr->chunk_size > 0 ? am_EOT : NIL); + return DB_ERROR_NONE; } -done: - BUMP_REDS(p, 1000 - num_left); - if (got) { - try_shrink(tb); + else { + ASSERT(iterations_left < MAX_SELECT_CHUNK_ITERATIONS); + BUMP_REDS(sc_context_ptr->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left); + if (sc_context_ptr->chunk_size) { + Eterm continuation; + Eterm rest = NIL; + Sint rest_size = 0; + + if (got > sc_context_ptr->chunk_size) { /* Split list in return value and 'rest' */ + Eterm tmp = sc_context_ptr->match_list; + rest = sc_context_ptr->match_list; + while (got-- > sc_context_ptr->chunk_size + 1) { + tmp = CDR(list_val(tmp)); + ++rest_size; + } + ++rest_size; + sc_context_ptr->match_list = CDR(list_val(tmp)); + CDR(list_val(tmp)) = NIL; /* Destructive, the list has never + been in 'user space' */ + } + if (rest != NIL || slot_ix >= 0) { /* Need more calls */ + Eterm tid = sc_context_ptr->tid; + sc_context_ptr->hp = HAllocX(sc_context_ptr->p, + 3 + 7 + ERTS_MAGIC_REF_THING_SIZE, + ERTS_MAGIC_REF_THING_SIZE); + mpb = erts_db_make_match_prog_ref(sc_context_ptr->p, *mpp, &sc_context_ptr->hp); + if (is_atom(tid)) + tid = erts_db_make_tid(sc_context_ptr->p, + &sc_context_ptr->tb->common); + continuation = TUPLE6( + sc_context_ptr->hp, + tid, + make_small(slot_ix), + make_small(sc_context_ptr->chunk_size), + mpb, rest, + make_small(rest_size)); + *mpp = NULL; /* Otherwise the caller will destroy it */ + sc_context_ptr->hp += 7; + *ret = TUPLE2(sc_context_ptr->hp, sc_context_ptr->match_list, continuation); + return DB_ERROR_NONE; + } else { /* All data is exhausted */ + if (sc_context_ptr->match_list != NIL) { /* No more data to search but still a + result to return to the caller */ + sc_context_ptr->hp = HAlloc(sc_context_ptr->p, 3); + *ret = TUPLE2(sc_context_ptr->hp, sc_context_ptr->match_list, am_EOT); + return DB_ERROR_NONE; + } else { /* Reached the end of the ttable with no data to return */ + *ret = am_EOT; + return DB_ERROR_NONE; + } + } + } + *ret = sc_context_ptr->match_list; + return DB_ERROR_NONE; } - RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE); -trap: - BUMP_ALL_REDS(p); - if (IS_USMALL(0, got)) { - hp = HAlloc(p, PROC_BIN_SIZE + 5); - egot = make_small(got); +} + +static int mtraversal_select_chunk_on_trap(void* context_ptr, Sint slot_ix, Sint got, + Binary** mpp, Eterm* ret) +{ + mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr; + Eterm mpb; + Eterm continuation; + Eterm* hp; + + BUMP_ALL_REDS(sc_context_ptr->p); + + if (sc_context_ptr->prev_continuation_tptr == NULL) { + Eterm tid = sc_context_ptr->tid; + /* First time we're trapping */ + hp = HAllocX(sc_context_ptr->p, 7 + ERTS_MAGIC_REF_THING_SIZE, + ERTS_MAGIC_REF_THING_SIZE); + if (is_atom(tid)) + tid = erts_db_make_tid(sc_context_ptr->p, &sc_context_ptr->tb->common); + mpb = erts_db_make_match_prog_ref(sc_context_ptr->p, *mpp, &hp); + continuation = TUPLE6( + hp, + tid, + make_small(slot_ix), + make_small(sc_context_ptr->chunk_size), + mpb, + sc_context_ptr->match_list, + make_small(got)); + *mpp = NULL; /* otherwise the caller will destroy it */ } else { - hp = HAlloc(p, BIG_UINT_HEAP_SIZE + PROC_BIN_SIZE + 5); - egot = uint_to_big(got, hp); - hp += BIG_UINT_HEAP_SIZE; - } - mpb = db_make_mp_binary(p,mpi.mp,&hp); - continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix), - mpb, - egot); - mpi.mp = NULL; /*otherwise the return macro will destroy it */ - RET_TO_BIF(bif_trap1(&ets_select_delete_continue_exp, p, - continuation), - DB_ERROR_NONE); + /* Not the first time we're trapping; reuse continuation terms */ + hp = HAlloc(sc_context_ptr->p, 7); + continuation = TUPLE6( + hp, + sc_context_ptr->prev_continuation_tptr[1], + make_small(slot_ix), + sc_context_ptr->prev_continuation_tptr[3], + sc_context_ptr->prev_continuation_tptr[4], + sc_context_ptr->match_list, + make_small(got)); + } + *ret = bif_trap1(&ets_select_continue_exp, sc_context_ptr->p, continuation); + return DB_ERROR_NONE; +} -#undef RET_TO_BIF +static int db_select_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, int reverse, Eterm *ret) { + return db_select_chunk_hash(p, tbl, tid, pattern, 0, reverse, ret); +} +static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, + int reverse, Eterm *ret) +{ + mtraversal_select_chunk_context_t sc_context; + sc_context.p = p; + sc_context.tb = &tbl->hash; + sc_context.tid = tid; + sc_context.hp = NULL; + sc_context.chunk_size = chunk_size; + sc_context.match_list = NIL; + sc_context.prev_continuation_tptr = NULL; + + return match_traverse( + sc_context.p, sc_context.tb, + pattern, NULL, + sc_context.chunk_size, + MAX_SELECT_CHUNK_ITERATIONS, + &sc_context.hp, 0, + mtraversal_select_chunk_on_nothing_can_match, + mtraversal_select_chunk_on_match_res, + mtraversal_select_chunk_on_loop_ended, + mtraversal_select_chunk_on_trap, + &sc_context, ret); } + /* -** This is called when select_delete traps -*/ -static int db_select_delete_continue_hash(Process *p, - DbTable *tbl, - Eterm continuation, - Eterm *ret) + * + * select_continue match traversal + * + */ + +static int mtraversal_select_chunk_continue_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got, + Sint iterations_left, Binary** mpp, Eterm* ret) { - DbTableHash *tb = &tbl->hash; - Uint slot_ix; - Uint last_pseudo_delete = (Uint)-1; - HashDbTerm **current = NULL; - Eterm *hp; - int num_left = 1000; - Uint got; - Eterm *tptr; - Binary *mp; - Eterm egot; - int fixated_by_me = ONLY_WRITER(p,tb) ? 0 : 1; /* ToDo: something nicer */ - erts_smp_rwmtx_t* lck; + mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr; + Eterm continuation; + Eterm rest = NIL; + Eterm* hp; + + ASSERT(iterations_left <= MAX_SELECT_CHUNK_ITERATIONS); + BUMP_REDS(sc_context_ptr->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left); + if (sc_context_ptr->chunk_size) { + Sint rest_size = 0; + if (got > sc_context_ptr->chunk_size) { + /* Cannot write destructively here, + the list may have + been in user space */ + hp = HAlloc(sc_context_ptr->p, (got - sc_context_ptr->chunk_size) * 2); + while (got-- > sc_context_ptr->chunk_size) { + rest = CONS(hp, CAR(list_val(sc_context_ptr->match_list)), rest); + hp += 2; + sc_context_ptr->match_list = CDR(list_val(sc_context_ptr->match_list)); + ++rest_size; + } + } + if (rest != NIL || slot_ix >= 0) { + hp = HAlloc(sc_context_ptr->p, 3 + 7); + continuation = TUPLE6( + hp, + sc_context_ptr->prev_continuation_tptr[1], + make_small(slot_ix), + sc_context_ptr->prev_continuation_tptr[3], + sc_context_ptr->prev_continuation_tptr[4], + rest, + make_small(rest_size)); + hp += 7; + *ret = TUPLE2(hp, sc_context_ptr->match_list, continuation); + return DB_ERROR_NONE; + } else { + if (sc_context_ptr->match_list != NIL) { + hp = HAlloc(sc_context_ptr->p, 3); + *ret = TUPLE2(hp, sc_context_ptr->match_list, am_EOT); + return DB_ERROR_NONE; + } else { + *ret = am_EOT; + return DB_ERROR_NONE; + } + } + } + *ret = sc_context_ptr->match_list; + return DB_ERROR_NONE; +} -#define RET_TO_BIF(Term,RetVal) do { \ - *ret = (Term); \ - return RetVal; \ - } while(0) +/* + * This is called when select traps + */ +static int db_select_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) { + mtraversal_select_chunk_context_t sc_context = {0}; + Eterm* tptr; + Eterm tid; + Binary* mp; + Sint got; + Sint slot_ix; + Sint chunk_size; + Eterm match_list; + Sint iterations_left = MAX_SELECT_CHUNK_ITERATIONS; - + /* Decode continuation. We know it's a tuple but not the arity or anything else */ + ASSERT(is_tuple(continuation)); tptr = tuple_val(continuation); - slot_ix = unsigned_val(tptr[2]); - mp = ((ProcBin *) binary_val(tptr[3]))->val; - if (is_big(tptr[4])) { - got = big_to_uint32(tptr[4]); - } else { - got = unsigned_val(tptr[4]); - } - - lck = WLOCK_HASH(tb,slot_ix); - if (slot_ix >= NACTIVE(tb)) { - WUNLOCK_HASH(lck); - goto done; - } - current = &BUCKET(tb,slot_ix); - for(;;) { - if ((*current) == NULL) { - if ((slot_ix=next_slot_w(tb,slot_ix,&lck)) == 0) { - goto done; - } - if (num_left <= 0) { - WUNLOCK_HASH(lck); - goto trap; - } - current = &BUCKET(tb,slot_ix); - } - else if ((*current)->hvalue == INVALID_HASH) { - current = &((*current)->next); - } - else { - int did_erase = 0; - if (db_match_dbterm(&tb->common, p, mp, 0, - &(*current)->dbterm, NULL, 0) == am_true) { - HashDbTerm *del; - if (NFIXED(tb) > fixated_by_me) { /* fixated by others? */ - if (slot_ix != last_pseudo_delete) { - if (!add_fixed_deletion(tb, slot_ix, fixated_by_me)) - goto do_erase; - last_pseudo_delete = slot_ix; - } - (*current)->hvalue = INVALID_HASH; - } else { - do_erase: - del = *current; - *current = (*current)->next; - free_term(tb, del); - did_erase = 1; - } - erts_smp_atomic_dec_nob(&tb->common.nitems); - ++got; - } - - --num_left; - if (!did_erase) { - current = &((*current)->next); - } - } - } -done: - BUMP_REDS(p, 1000 - num_left); - if (got) { - try_shrink(tb); - } - RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE); -trap: - BUMP_ALL_REDS(p); - if (IS_USMALL(0, got)) { - hp = HAlloc(p, 5); - egot = make_small(got); - } - else { - hp = HAlloc(p, BIG_UINT_HEAP_SIZE + 5); - egot = uint_to_big(got, hp); - hp += BIG_UINT_HEAP_SIZE; - } - continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix), - tptr[3], - egot); - RET_TO_BIF(bif_trap1(&ets_select_delete_continue_exp, p, - continuation), - DB_ERROR_NONE); + if (arityval(*tptr) != 6) + goto badparam; -#undef RET_TO_BIF + if (!is_small(tptr[2]) || !is_small(tptr[3]) || + !(is_list(tptr[5]) || tptr[5] == NIL) || !is_small(tptr[6])) + goto badparam; + if ((chunk_size = signed_val(tptr[3])) < 0) + goto badparam; + + mp = erts_db_get_match_prog_binary(tptr[4]); + if (mp == NULL) + goto badparam; + if ((got = signed_val(tptr[6])) < 0) + goto badparam; + + tid = tptr[1]; + slot_ix = signed_val(tptr[2]); + match_list = tptr[5]; + + /* Proceed */ + sc_context.p = p; + sc_context.tb = &tbl->hash; + sc_context.tid = tid; + sc_context.hp = NULL; + sc_context.chunk_size = chunk_size; + sc_context.match_list = match_list; + sc_context.prev_continuation_tptr = tptr; + + return match_traverse_continue( + sc_context.p, sc_context.tb, sc_context.chunk_size, + iterations_left, &sc_context.hp, slot_ix, got, &mp, 0, + mtraversal_select_chunk_on_match_res, /* Reuse callback */ + mtraversal_select_chunk_continue_on_loop_ended, + mtraversal_select_chunk_on_trap, /* Reuse callback */ + &sc_context, ret); + +badparam: + *ret = NIL; + return DB_ERROR_BADPARAM; } - + +#undef MAX_SELECT_CHUNK_ITERATIONS + + /* -** This is called when select_count traps -*/ -static int db_select_count_continue_hash(Process *p, - DbTable *tbl, - Eterm continuation, - Eterm *ret) + * + * select_count match traversal + * + */ + +#define MAX_SELECT_COUNT_ITERATIONS 1000 + +typedef struct { + Process* p; + DbTableHash* tb; + Eterm tid; + Eterm* hp; + Eterm* prev_continuation_tptr; +} mtraversal_select_count_context_t; + +static int mtraversal_select_count_on_nothing_can_match(void* context_ptr, Eterm* ret) { + *ret = make_small(0); + return DB_ERROR_NONE; +} + +static int mtraversal_select_count_on_match_res(void* context_ptr, Sint slot_ix, + HashDbTerm*** current_ptr_ptr, + Eterm match_res) { - DbTableHash *tb = &tbl->hash; - Uint slot_ix; - HashDbTerm* current; - Eterm *hp; - int num_left = 1000; - Uint got; - Eterm *tptr; - Binary *mp; - Eterm egot; - erts_smp_rwmtx_t* lck; + return (match_res == am_true); +} -#define RET_TO_BIF(Term,RetVal) do { \ - *ret = (Term); \ - return RetVal; \ - } while(0) +static int mtraversal_select_count_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got, + Sint iterations_left, Binary** mpp, Eterm* ret) +{ + mtraversal_select_count_context_t* scnt_context_ptr = (mtraversal_select_count_context_t*) context_ptr; + ASSERT(iterations_left <= MAX_SELECT_COUNT_ITERATIONS); + BUMP_REDS(scnt_context_ptr->p, MAX_SELECT_COUNT_ITERATIONS - iterations_left); + *ret = erts_make_integer(got, scnt_context_ptr->p); + return DB_ERROR_NONE; +} - - tptr = tuple_val(continuation); - slot_ix = unsigned_val(tptr[2]); - mp = ((ProcBin *) binary_val(tptr[3]))->val; - if (is_big(tptr[4])) { - got = big_to_uint32(tptr[4]); - } else { - got = unsigned_val(tptr[4]); - } - +static int mtraversal_select_count_on_trap(void* context_ptr, Sint slot_ix, Sint got, + Binary** mpp, Eterm* ret) +{ + mtraversal_select_count_context_t* scnt_context_ptr = (mtraversal_select_count_context_t*) context_ptr; + return on_mtraversal_simple_trap( + &ets_select_count_continue_exp, + scnt_context_ptr->p, + scnt_context_ptr->tb, + scnt_context_ptr->tid, + scnt_context_ptr->prev_continuation_tptr, + slot_ix, got, mpp, ret); +} - lck = RLOCK_HASH(tb, slot_ix); - if (slot_ix >= NACTIVE(tb)) { /* Is this posible? */ - RUNLOCK_HASH(lck); - goto done; - } - current = BUCKET(tb,slot_ix); - - for(;;) { - if (current != NULL) { - if (current->hvalue == INVALID_HASH) { - current = current->next; - continue; - } - if (db_match_dbterm(&tb->common, p, mp, 0, ¤t->dbterm, - NULL, 0) == am_true) { - ++got; - } - --num_left; - current = current->next; - } - else { /* next bucket */ - if ((slot_ix = next_slot(tb,slot_ix,&lck)) == 0) { - goto done; - } - if (num_left <= 0) { - RUNLOCK_HASH(lck); - goto trap; - } - current = BUCKET(tb,slot_ix); - } - } -done: - BUMP_REDS(p, 1000 - num_left); - RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE); -trap: - BUMP_ALL_REDS(p); - if (IS_USMALL(0, got)) { - hp = HAlloc(p, 5); - egot = make_small(got); +static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { + mtraversal_select_count_context_t scnt_context = {0}; + Sint iterations_left = MAX_SELECT_COUNT_ITERATIONS; + Sint chunk_size = 0; + + scnt_context.p = p; + scnt_context.tb = &tbl->hash; + scnt_context.tid = tid; + scnt_context.hp = NULL; + scnt_context.prev_continuation_tptr = NULL; + + return match_traverse( + scnt_context.p, scnt_context.tb, + pattern, NULL, + chunk_size, iterations_left, NULL, 0, + mtraversal_select_count_on_nothing_can_match, + mtraversal_select_count_on_match_res, + mtraversal_select_count_on_loop_ended, + mtraversal_select_count_on_trap, + &scnt_context, ret); +} + +/* + * This is called when select_count traps + */ +static int db_select_count_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) { + mtraversal_select_count_context_t scnt_context = {0}; + Eterm* tptr; + Eterm tid; + Binary* mp; + Sint got; + Sint slot_ix; + Sint chunk_size = 0; + *ret = NIL; + + if (unpack_simple_mtraversal_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) { + *ret = NIL; + return DB_ERROR_BADPARAM; + } + + scnt_context.p = p; + scnt_context.tb = &tbl->hash; + scnt_context.tid = tid; + scnt_context.hp = NULL; + scnt_context.prev_continuation_tptr = tptr; + + return match_traverse_continue( + scnt_context.p, scnt_context.tb, chunk_size, + MAX_SELECT_COUNT_ITERATIONS, + NULL, slot_ix, got, &mp, 0, + mtraversal_select_count_on_match_res, /* Reuse callback */ + mtraversal_select_count_on_loop_ended, /* Reuse callback */ + mtraversal_select_count_on_trap, /* Reuse callback */ + &scnt_context, ret); +} + +#undef MAX_SELECT_COUNT_ITERATIONS + + +/* + * + * select_delete match traversal + * + */ + +#define MAX_SELECT_DELETE_ITERATIONS 1000 + +typedef struct { + Process* p; + DbTableHash* tb; + Eterm tid; + Eterm* hp; + Eterm* prev_continuation_tptr; + erts_aint_t fixated_by_me; + Uint last_pseudo_delete; +} mtraversal_select_delete_context_t; + +static int mtraversal_select_delete_on_nothing_can_match(void* context_ptr, Eterm* ret) { + *ret = make_small(0); + return DB_ERROR_NONE; +} + +static int mtraversal_select_delete_on_match_res(void* context_ptr, Sint slot_ix, + HashDbTerm*** current_ptr_ptr, + Eterm match_res) +{ + HashDbTerm** current_ptr = *current_ptr_ptr; + mtraversal_select_delete_context_t* sd_context_ptr = (mtraversal_select_delete_context_t*) context_ptr; + HashDbTerm* del; + if (match_res != am_true) + return 0; + + if (NFIXED(sd_context_ptr->tb) > sd_context_ptr->fixated_by_me) { /* fixated by others? */ + if (slot_ix != sd_context_ptr->last_pseudo_delete) { + if (!add_fixed_deletion(sd_context_ptr->tb, slot_ix, sd_context_ptr->fixated_by_me)) + goto do_erase; + sd_context_ptr->last_pseudo_delete = slot_ix; + } + (*current_ptr)->hvalue = INVALID_HASH; } else { - hp = HAlloc(p, BIG_UINT_HEAP_SIZE + 5); - egot = uint_to_big(got, hp); - hp += BIG_UINT_HEAP_SIZE; + do_erase: + del = *current_ptr; + *current_ptr = (*current_ptr)->next; // replace pointer to term using next + free_term(sd_context_ptr->tb, del); } - continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix), - tptr[3], - egot); - RET_TO_BIF(bif_trap1(&ets_select_count_continue_exp, p, - continuation), - DB_ERROR_NONE); + erts_atomic_dec_nob(&sd_context_ptr->tb->common.nitems); -#undef RET_TO_BIF + return 1; +} +static int mtraversal_select_delete_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got, + Sint iterations_left, Binary** mpp, Eterm* ret) +{ + mtraversal_select_delete_context_t* sd_context_ptr = (mtraversal_select_delete_context_t*) context_ptr; + ASSERT(iterations_left <= MAX_SELECT_DELETE_ITERATIONS); + BUMP_REDS(sd_context_ptr->p, MAX_SELECT_DELETE_ITERATIONS - iterations_left); + if (got) { + try_shrink(sd_context_ptr->tb); + } + *ret = erts_make_integer(got, sd_context_ptr->p); + return DB_ERROR_NONE; } - + +static int mtraversal_select_delete_on_trap(void* context_ptr, Sint slot_ix, Sint got, + Binary** mpp, Eterm* ret) +{ + mtraversal_select_delete_context_t* sd_context_ptr = (mtraversal_select_delete_context_t*) context_ptr; + return on_mtraversal_simple_trap( + &ets_select_delete_continue_exp, + sd_context_ptr->p, + sd_context_ptr->tb, + sd_context_ptr->tid, + sd_context_ptr->prev_continuation_tptr, + slot_ix, got, mpp, ret); +} + +static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { + mtraversal_select_delete_context_t sd_context = {0}; + Sint chunk_size = 0; + + sd_context.p = p; + sd_context.tb = &tbl->hash; + sd_context.tid = tid; + sd_context.hp = NULL; + sd_context.prev_continuation_tptr = NULL; + sd_context.fixated_by_me = sd_context.tb->common.is_thread_safe ? 0 : 1; /* TODO: something nicer */ + sd_context.last_pseudo_delete = (Uint) -1; + + return match_traverse( + sd_context.p, sd_context.tb, + pattern, NULL, + chunk_size, + MAX_SELECT_DELETE_ITERATIONS, NULL, 1, + mtraversal_select_delete_on_nothing_can_match, + mtraversal_select_delete_on_match_res, + mtraversal_select_delete_on_loop_ended, + mtraversal_select_delete_on_trap, + &sd_context, ret); +} + +/* + * This is called when select_delete traps + */ +static int db_select_delete_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) { + mtraversal_select_delete_context_t sd_context = {0}; + Eterm* tptr; + Eterm tid; + Binary* mp; + Sint got; + Sint slot_ix; + Sint chunk_size = 0; + + if (unpack_simple_mtraversal_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) { + *ret = NIL; + return DB_ERROR_BADPARAM; + } + + sd_context.p = p; + sd_context.tb = &tbl->hash; + sd_context.tid = tid; + sd_context.hp = NULL; + sd_context.prev_continuation_tptr = tptr; + sd_context.fixated_by_me = ONLY_WRITER(p, sd_context.tb) ? 0 : 1; /* TODO: something nicer */ + sd_context.last_pseudo_delete = (Uint) -1; + + return match_traverse_continue( + sd_context.p, sd_context.tb, chunk_size, + MAX_SELECT_DELETE_ITERATIONS, + NULL, slot_ix, got, &mp, 1, + mtraversal_select_delete_on_match_res, /* Reuse callback */ + mtraversal_select_delete_on_loop_ended, /* Reuse callback */ + mtraversal_select_delete_on_trap, /* Reuse callback */ + &sd_context, ret); +} + +#undef MAX_SELECT_DELETE_ITERATIONS + + +/* + * + * select_replace match traversal + * + */ + +#define MAX_SELECT_REPLACE_ITERATIONS 1000 + +typedef struct { + Process* p; + DbTableHash* tb; + Eterm tid; + Eterm* hp; + Eterm* prev_continuation_tptr; +} mtraversal_select_replace_context_t; + +static int mtraversal_select_replace_on_nothing_can_match(void* context_ptr, Eterm* ret) { + *ret = make_small(0); + return DB_ERROR_NONE; +} + +static int mtraversal_select_replace_on_match_res(void* context_ptr, Sint slot_ix, + HashDbTerm*** current_ptr_ptr, + Eterm match_res) +{ + mtraversal_select_replace_context_t* sr_context_ptr = (mtraversal_select_replace_context_t*) context_ptr; + DbTableHash* tb = sr_context_ptr->tb; + HashDbTerm* new; + HashDbTerm* next; + HashValue hval; + + if (is_value(match_res)) { +#ifdef DEBUG + Eterm key = db_getkey(tb->common.keypos, match_res); + ASSERT(is_value(key)); + ASSERT(eq(key, GETKEY(tb, (**current_ptr_ptr)->dbterm.tpl))); +#endif + next = (**current_ptr_ptr)->next; + hval = (**current_ptr_ptr)->hvalue; + new = new_dbterm(tb, match_res); + new->next = next; + new->hvalue = hval; + free_term(tb, **current_ptr_ptr); + **current_ptr_ptr = new; /* replace 'next' pointer in previous object */ + *current_ptr_ptr = &((**current_ptr_ptr)->next); /* advance to next object */ + return 1; + } + return 0; +} + +static int mtraversal_select_replace_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got, + Sint iterations_left, Binary** mpp, Eterm* ret) +{ + mtraversal_select_replace_context_t* sr_context_ptr = (mtraversal_select_replace_context_t*) context_ptr; + ASSERT(iterations_left <= MAX_SELECT_REPLACE_ITERATIONS); + /* the more objects we've replaced, the more reductions we've consumed */ + BUMP_REDS(sr_context_ptr->p, + MIN(MAX_SELECT_REPLACE_ITERATIONS * 2, + (MAX_SELECT_REPLACE_ITERATIONS - iterations_left) + (int)got)); + *ret = erts_make_integer(got, sr_context_ptr->p); + return DB_ERROR_NONE; +} + +static int mtraversal_select_replace_on_trap(void* context_ptr, Sint slot_ix, Sint got, + Binary** mpp, Eterm* ret) +{ + mtraversal_select_replace_context_t* sr_context_ptr = (mtraversal_select_replace_context_t*) context_ptr; + return on_mtraversal_simple_trap( + &ets_select_replace_continue_exp, + sr_context_ptr->p, + sr_context_ptr->tb, + sr_context_ptr->tid, + sr_context_ptr->prev_continuation_tptr, + slot_ix, got, mpp, ret); +} + +static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) +{ + mtraversal_select_replace_context_t sr_context = {0}; + Sint chunk_size = 0; + + /* Bag implementation presented both semantic consistency and performance issues, + * unsupported for now + */ + ASSERT(!(tbl->hash.common.status & DB_BAG)); + + sr_context.p = p; + sr_context.tb = &tbl->hash; + sr_context.tid = tid; + sr_context.hp = NULL; + sr_context.prev_continuation_tptr = NULL; + + return match_traverse( + sr_context.p, sr_context.tb, + pattern, db_match_keeps_key, + chunk_size, + MAX_SELECT_REPLACE_ITERATIONS, NULL, 1, + mtraversal_select_replace_on_nothing_can_match, + mtraversal_select_replace_on_match_res, + mtraversal_select_replace_on_loop_ended, + mtraversal_select_replace_on_trap, + &sr_context, ret); +} + +/* + * This is called when select_replace traps + */ +static int db_select_replace_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) +{ + mtraversal_select_replace_context_t sr_context = {0}; + Eterm* tptr; + Eterm tid ; + Binary* mp; + Sint got; + Sint slot_ix; + Sint chunk_size = 0; + *ret = NIL; + + if (unpack_simple_mtraversal_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) { + *ret = NIL; + return DB_ERROR_BADPARAM; + } + + /* Proceed */ + sr_context.p = p; + sr_context.tb = &tbl->hash; + sr_context.tid = tid; + sr_context.hp = NULL; + sr_context.prev_continuation_tptr = tptr; + + return match_traverse_continue( + sr_context.p, sr_context.tb, chunk_size, + MAX_SELECT_REPLACE_ITERATIONS, + NULL, slot_ix, got, &mp, 1, + mtraversal_select_replace_on_match_res, /* Reuse callback */ + mtraversal_select_replace_on_loop_ended, /* Reuse callback */ + mtraversal_select_replace_on_trap, /* Reuse callback */ + &sr_context, ret); +} + + static int db_take_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret) { DbTableHash *tb = &tbl->hash; HashDbTerm **bp, *b; HashValue hval = MAKE_HASH(key); - erts_smp_rwmtx_t *lck = WLOCK_HASH(tb, hval); + erts_rwmtx_t *lck = WLOCK_HASH(tb, hval); int ix = hash_to_ix(tb, hval); int nitems_diff = 0; @@ -2117,12 +2239,13 @@ static int db_take_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret) } WUNLOCK_HASH(lck); if (nitems_diff) { - erts_smp_atomic_add_nob(&tb->common.nitems, nitems_diff); + erts_atomic_add_nob(&tb->common.nitems, nitems_diff); try_shrink(tb); } return DB_ERROR_NONE; } + /* ** Other interface routines (not directly coupled to one bif) */ @@ -2138,7 +2261,7 @@ int db_mark_all_deleted_hash(DbTable *tbl) HashDbTerm* list; int i; - ERTS_SMP_LC_ASSERT(IS_TAB_WLOCKED(tb)); + ERTS_LC_ASSERT(IS_TAB_WLOCKED(tb)); for (i = 0; i < NACTIVE(tb); i++) { if ((list = BUCKET(tb,i)) != NULL) { @@ -2149,7 +2272,7 @@ int db_mark_all_deleted_hash(DbTable *tbl) }while(list != NULL); } } - erts_smp_atomic_set_nob(&tb->common.nitems, 0); + erts_atomic_set_nob(&tb->common.nitems, 0); return DB_ERROR_NONE; } @@ -2163,7 +2286,6 @@ static void db_print_hash(fmtfn_t to, void *to_arg, int show, DbTable *tbl) erts_print(to, to_arg, "Buckets: %d\n", NACTIVE(tb)); -#ifdef ERTS_SMP i = tbl->common.is_thread_safe; /* If crash dumping we set table to thread safe in order to avoid taking any locks */ @@ -2173,9 +2295,6 @@ static void db_print_hash(fmtfn_t to, void *to_arg, int show, DbTable *tbl) db_calc_stats_hash(&tbl->hash, &stats); tbl->common.is_thread_safe = i; -#else - db_calc_stats_hash(&tbl->hash, &stats); -#endif erts_print(to, to_arg, "Chain Length Avg: %f\n", stats.avg_chain_len); erts_print(to, to_arg, "Chain Length Max: %d\n", stats.max_chain_len); @@ -2219,19 +2338,17 @@ static void db_print_hash(fmtfn_t to, void *to_arg, int show, DbTable *tbl) /* release all memory occupied by a single table */ static int db_free_table_hash(DbTable *tbl) { - while (!db_free_table_continue_hash(tbl)) + while (db_free_table_continue_hash(tbl, ERTS_SWORD_MAX) < 0) ; return 0; } -static int db_free_table_continue_hash(DbTable *tbl) +static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds) { DbTableHash *tb = &tbl->hash; - int done; - FixedDeletion* fixdel = (FixedDeletion*) erts_smp_atomic_read_acqb(&tb->fixdel); - ERTS_SMP_LC_ASSERT(IS_TAB_WLOCKED(tb)); + FixedDeletion* fixdel = (FixedDeletion*) erts_atomic_read_acqb(&tb->fixdel); + ERTS_LC_ASSERT(IS_TAB_WLOCKED(tb) || (tb->common.status & DB_DELETE)); - done = 0; while (fixdel != NULL) { FixedDeletion *fx = fixdel; @@ -2241,25 +2358,23 @@ static int db_free_table_continue_hash(DbTable *tbl) (void *) fx, sizeof(FixedDeletion)); ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion)); - if (++done >= 2*DELETE_RECORD_LIMIT) { - erts_smp_atomic_set_relb(&tb->fixdel, (erts_aint_t)fixdel); - return 0; /* Not done */ + if (--reds < 0) { + erts_atomic_set_relb(&tb->fixdel, (erts_aint_t)fixdel); + return reds; /* Not done */ } } - erts_smp_atomic_set_relb(&tb->fixdel, (erts_aint_t)NULL); + erts_atomic_set_relb(&tb->fixdel, (erts_aint_t)NULL); - done /= 2; while(tb->nslots != 0) { - free_seg(tb, 1); + reds -= EXT_SEGSZ/64 + free_seg(tb, 1); /* * If we have done enough work, get out here. */ - if (++done >= (DELETE_RECORD_LIMIT / CHAIN_LEN / SEGSZ)) { - return 0; /* Not done */ + if (reds < 0) { + return reds; /* Not done */ } } -#ifdef ERTS_SMP if (tb->locks != NULL) { int i; for (i=0; i<DB_HASH_LOCK_CNT; ++i) { @@ -2269,9 +2384,8 @@ static int db_free_table_continue_hash(DbTable *tbl) (void*)tb->locks, sizeof(DbTableHashFineLocks)); tb->locks = NULL; } -#endif - ASSERT(erts_smp_atomic_read_nob(&tb->common.memory_size) == sizeof(DbTable)); - return 1; /* Done */ + ASSERT(erts_atomic_read_nob(&tb->common.memory_size) == sizeof(DbTable)); + return reds; /* Done */ } @@ -2284,7 +2398,8 @@ static int db_free_table_continue_hash(DbTable *tbl) ** slots should be searched. Also compiles the match program */ static int analyze_pattern(DbTableHash *tb, Eterm pattern, - struct mp_info *mpi) + extra_match_validator_t extra_validator, /* Optional callback */ + struct mp_info *mpi) { Eterm *ptpl; Eterm lst, tpl, ttpl; @@ -2322,7 +2437,10 @@ static int analyze_pattern(DbTableHash *tb, Eterm pattern, i = 0; for(lst = pattern; is_list(lst); lst = CDR(list_val(lst))) { - Eterm body; + Eterm match; + Eterm guard; + Eterm body; + ttpl = CAR(list_val(lst)); if (!is_tuple(ttpl)) { if (buff != sbuff) { @@ -2337,9 +2455,17 @@ static int analyze_pattern(DbTableHash *tb, Eterm pattern, } return DB_ERROR_BADPARAM; } - matches[i] = tpl = ptpl[1]; - guards[i] = ptpl[2]; + matches[i] = match = tpl = ptpl[1]; + guards[i] = guard = ptpl[2]; bodies[i] = body = ptpl[3]; + + if(extra_validator != NULL && !extra_validator(tb->common.keypos, match, guard, body)) { + if (buff != sbuff) { + erts_free(ERTS_ALC_T_DB_TMP, buff); + } + return DB_ERROR_BADPARAM; + } + if (!is_list(body) || CDR(list_val(body)) != NIL || CAR(list_val(body)) != am_DollarUnderscore) { mpi->all_objects = 0; @@ -2357,7 +2483,7 @@ static int analyze_pattern(DbTableHash *tb, Eterm pattern, if (!db_has_variable(key)) { /* Bound key */ int ix, search_slot; HashDbTerm** bp; - erts_smp_rwmtx_t* lck; + erts_rwmtx_t* lck; hval = MAKE_HASH(key); lck = RLOCK_HASH(tb,hval); ix = hash_to_ix(tb, hval); @@ -2414,69 +2540,58 @@ static int analyze_pattern(DbTableHash *tb, Eterm pattern, return DB_ERROR_NONE; } -static struct ext_segment* alloc_ext_seg(DbTableHash* tb, unsigned seg_ix, - struct segment** old_segtab) +static struct ext_segtab* alloc_ext_segtab(DbTableHash* tb, unsigned seg_ix) { - int nsegs; - struct ext_segment* eseg; + struct segment** old_segtab = SEGTAB(tb); + int nsegs = 0; + struct ext_segtab* est; + ASSERT(seg_ix >= NSEG_1); switch (seg_ix) { - case 0: nsegs = NSEG_1; break; - case 1: nsegs = NSEG_2; break; - default: nsegs = seg_ix + NSEG_INC; break; - } - eseg = (struct ext_segment*) erts_db_alloc_fnf(ERTS_ALC_T_DB_SEG, - (DbTable *) tb, - SIZEOF_EXTSEG(nsegs)); - ASSERT(eseg != NULL); - sys_memset(&eseg->s, 0, sizeof(struct segment)); - IF_DEBUG(eseg->s.is_ext_segment = 1); - eseg->prev_segtab = old_segtab; - eseg->nsegs = nsegs; - if (old_segtab) { - ASSERT(nsegs > tb->nsegs); - sys_memcpy(eseg->segtab, old_segtab, tb->nsegs*sizeof(struct segment*)); - } + case NSEG_1: nsegs = NSEG_2; break; + default: nsegs = seg_ix + NSEG_INC; break; + } + ASSERT(nsegs > tb->nsegs); + est = (struct ext_segtab*) erts_db_alloc(ERTS_ALC_T_DB_SEG, + (DbTable *) tb, + SIZEOF_EXT_SEGTAB(nsegs)); + est->nsegs = nsegs; + est->prev_segtab = old_segtab; + est->prev_nsegs = tb->nsegs; + sys_memcpy(est->segtab, old_segtab, tb->nsegs*sizeof(struct segment*)); #ifdef DEBUG - sys_memset(&eseg->segtab[seg_ix], 0, (nsegs-seg_ix)*sizeof(struct segment*)); + sys_memset(&est->segtab[seg_ix], 0, (nsegs-seg_ix)*sizeof(struct segment*)); #endif - eseg->segtab[seg_ix] = &eseg->s; - return eseg; + return est; } /* Extend table with one new segment */ -static int alloc_seg(DbTableHash *tb) +static void alloc_seg(DbTableHash *tb) { - int seg_ix = tb->nslots >> SEGSZ_EXP; - - if (seg_ix+1 == tb->nsegs) { /* New segtab needed (extended segment) */ - struct segment** segtab = SEGTAB(tb); - struct ext_segment* seg = alloc_ext_seg(tb, seg_ix, segtab); - if (seg == NULL) return 0; - segtab[seg_ix] = &seg->s; - /* We don't use the new segtab until next call (see "shrink race") */ - } - else { /* Just a new plain segment */ - struct segment** segtab; - if (seg_ix == tb->nsegs) { /* Time to start use segtab from last call */ - struct ext_segment* eseg; - eseg = (struct ext_segment*) SEGTAB(tb)[seg_ix-1]; - MY_ASSERT(eseg!=NULL && eseg->s.is_ext_segment); - SET_SEGTAB(tb, eseg->segtab); - tb->nsegs = eseg->nsegs; - } - ASSERT(seg_ix < tb->nsegs); - segtab = SEGTAB(tb); - ASSERT(segtab[seg_ix] == NULL); - segtab[seg_ix] = (struct segment*) erts_db_alloc_fnf(ERTS_ALC_T_DB_SEG, - (DbTable *) tb, - sizeof(struct segment)); - if (segtab[seg_ix] == NULL) return 0; - sys_memset(segtab[seg_ix], 0, sizeof(struct segment)); - } - tb->nslots += SEGSZ; - return 1; + int seg_ix = SLOT_IX_TO_SEG_IX(tb->nslots); + struct segment** segtab; + + ASSERT(seg_ix > 0); + if (seg_ix == tb->nsegs) { /* New segtab needed */ + struct ext_segtab* est = alloc_ext_segtab(tb, seg_ix); + SET_SEGTAB(tb, est->segtab); + tb->nsegs = est->nsegs; + } + ASSERT(seg_ix < tb->nsegs); + segtab = SEGTAB(tb); + segtab[seg_ix] = (struct segment*) erts_db_alloc(ERTS_ALC_T_DB_SEG, + (DbTable *) tb, + SIZEOF_SEGMENT(EXT_SEGSZ)); + sys_memset(segtab[seg_ix], 0, SIZEOF_SEGMENT(EXT_SEGSZ)); + tb->nslots += EXT_SEGSZ; +} + +static void dealloc_ext_segtab(void* lop_data) +{ + struct ext_segtab* est = (struct ext_segtab*) lop_data; + + erts_free(ERTS_ALC_T_DB_SEG, est); } /* Shrink table by freeing the top segment @@ -2484,20 +2599,20 @@ static int alloc_seg(DbTableHash *tb) */ static int free_seg(DbTableHash *tb, int free_records) { - const int seg_ix = (tb->nslots >> SEGSZ_EXP) - 1; + const int seg_ix = SLOT_IX_TO_SEG_IX(tb->nslots) - 1; struct segment** const segtab = SEGTAB(tb); - struct ext_segment* const top = (struct ext_segment*) segtab[seg_ix]; - int bytes; + struct segment* const segp = segtab[seg_ix]; + Uint seg_sz; int nrecords = 0; - ASSERT(top != NULL); + ASSERT(segp != NULL); #ifndef DEBUG if (free_records) #endif { - int i; - for (i=0; i<SEGSZ; ++i) { - HashDbTerm* p = top->s.buckets[i]; + int i = (seg_ix == 0) ? FIRST_SEGSZ : EXT_SEGSZ; + while (i--) { + HashDbTerm* p = segp->buckets[i]; while(p != 0) { HashDbTerm* nxt = p->next; ASSERT(free_records); /* segment not empty as assumed? */ @@ -2507,55 +2622,44 @@ static int free_seg(DbTableHash *tb, int free_records) } } } - - /* The "shrink race": - * We must avoid deallocating an extended segment while its segtab may - * still be used by other threads. - * The trick is to stop use a segtab one call earlier. That is, stop use - * a segtab when the segment above it is deallocated. When the segtab is - * later deallocated, it has not been used for a very long time. - * It is even theoretically safe as we have by then rehashed the entire - * segment, seizing *all* locks, so there cannot exist any retarded threads - * still hanging in BUCKET macro with an old segtab pointer. - * For this to work, we must of course allocate a new segtab one call - * earlier in alloc_seg() as well. And this is also the reason why - * the minimum size of the first segtab is 2 and not 1 (NSEG_1). - */ - if (seg_ix == tb->nsegs-1 || seg_ix==0) { /* Dealloc extended segment */ - MY_ASSERT(top->s.is_ext_segment); - ASSERT(segtab != top->segtab || seg_ix==0); - bytes = SIZEOF_EXTSEG(top->nsegs); - } - else { /* Dealloc plain segment */ - struct ext_segment* newtop = (struct ext_segment*) segtab[seg_ix-1]; - MY_ASSERT(!top->s.is_ext_segment); - - if (segtab == newtop->segtab) { /* New top segment is extended */ - MY_ASSERT(newtop->s.is_ext_segment); - if (newtop->prev_segtab != NULL) { - /* Time to use a smaller segtab */ - SET_SEGTAB(tb, newtop->prev_segtab); - tb->nsegs = seg_ix; - ASSERT(tb->nsegs == EXTSEG(SEGTAB(tb))->nsegs); - } - else { - ASSERT(NSEG_1 > 2 && seg_ix==1); - } - } - bytes = sizeof(struct segment); + if (seg_ix >= NSEG_1) { + struct ext_segtab* est = ErtsContainerStruct_(segtab,struct ext_segtab,segtab); + + if (seg_ix == est->prev_nsegs) { /* Dealloc extended segtab */ + ASSERT(est->prev_segtab != NULL); + SET_SEGTAB(tb, est->prev_segtab); + tb->nsegs = est->prev_nsegs; + + if (!tb->common.is_thread_safe) { + /* + * Table is doing a graceful shrink operation and we must avoid + * deallocating this segtab while it may still be read by other + * threads. Schedule deallocation with thread progress to make + * sure no lingering threads are still hanging in BUCKET macro + * with an old segtab pointer. + */ + Uint sz = SIZEOF_EXT_SEGTAB(est->nsegs); + ASSERT(sz == ERTS_ALC_DBG_BLK_SZ(est)); + ERTS_DB_ALC_MEM_UPDATE_(tb, sz, 0); + erts_schedule_thr_prgr_later_cleanup_op(dealloc_ext_segtab, + est, + &est->lop, + sz); + } + else + erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable*)tb, est, + SIZEOF_EXT_SEGTAB(est->nsegs)); + } } + seg_sz = (seg_ix == 0) ? FIRST_SEGSZ : EXT_SEGSZ; + erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb, segp, SIZEOF_SEGMENT(seg_sz)); - erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb, - (void*)top, bytes); #ifdef DEBUG - if (seg_ix > 0) { - segtab[seg_ix] = NULL; - } else { - SET_SEGTAB(tb, NULL); - } + if (seg_ix < tb->nsegs) + SEGTAB(tb)[seg_ix] = NULL; #endif - tb->nslots -= SEGSZ; + tb->nslots -= seg_sz; ASSERT(tb->nslots >= 0); return nrecords; } @@ -2605,103 +2709,106 @@ static ERTS_INLINE int begin_resizing(DbTableHash* tb) { if (DB_USING_FINE_LOCKING(tb)) - return !erts_smp_atomic_xchg_acqb(&tb->is_resizing, 1); - else { - if (erts_smp_atomic_read_nob(&tb->is_resizing)) - return 0; - erts_smp_atomic_set_nob(&tb->is_resizing, 1); - return 1; - } + return !erts_atomic_xchg_acqb(&tb->is_resizing, 1); + else + ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock)); + return 1; } static ERTS_INLINE void done_resizing(DbTableHash* tb) { if (DB_USING_FINE_LOCKING(tb)) - erts_smp_atomic_set_relb(&tb->is_resizing, 0); - else - erts_smp_atomic_set_nob(&tb->is_resizing, 0); + erts_atomic_set_relb(&tb->is_resizing, 0); } -/* Grow table with one new bucket. +/* Grow table with one or more new buckets. ** Allocate new segment if needed. */ -static void grow(DbTableHash* tb, int nactive) +static void grow(DbTableHash* tb, int nitems) { HashDbTerm** pnext; HashDbTerm** to_pnext; HashDbTerm* p; - erts_smp_rwmtx_t* lck; - int from_ix; + erts_rwmtx_t* lck; + int nactive; + int from_ix, to_ix; int szm; + int loop_limit = 5; - if (!begin_resizing(tb)) - return; /* already in progress */ - if (NACTIVE(tb) != nactive) { - goto abort; /* already done (race) */ - } - - /* Ensure that the slot nactive exists */ - if (nactive == tb->nslots) { - /* Time to get a new segment */ - ASSERT((nactive & SEGSZ_MASK) == 0); - if (!alloc_seg(tb)) goto abort; - } - ASSERT(nactive < tb->nslots); + do { + if (!begin_resizing(tb)) + return; /* already in progress */ + nactive = NACTIVE(tb); + if (nitems <= GROW_LIMIT(nactive)) { + goto abort; /* already done (race) */ + } - szm = erts_smp_atomic_read_nob(&tb->szm); - if (nactive <= szm) { - from_ix = nactive & (szm >> 1); - } else { - ASSERT(nactive == szm+1); - from_ix = 0; - szm = (szm<<1) | 1; - } + /* Ensure that the slot nactive exists */ + if (nactive == tb->nslots) { + /* Time to get a new segment */ + ASSERT(((nactive-FIRST_SEGSZ) & EXT_SEGSZ_MASK) == 0); + alloc_seg(tb); + } + ASSERT(nactive < tb->nslots); - lck = WLOCK_HASH(tb, from_ix); - /* Now a final double check (with the from_ix lock held) - * that we did not get raced by a table fixer. - */ - if (IS_FIXED(tb)) { - WUNLOCK_HASH(lck); - goto abort; - } - erts_smp_atomic_inc_nob(&tb->nactive); - if (from_ix == 0) { - if (DB_USING_FINE_LOCKING(tb)) - erts_smp_atomic_set_relb(&tb->szm, szm); - else - erts_smp_atomic_set_nob(&tb->szm, szm); - } - done_resizing(tb); + szm = erts_atomic_read_nob(&tb->szm); + if (nactive <= szm) { + from_ix = nactive & (szm >> 1); + } else { + ASSERT(nactive == szm+1); + from_ix = 0; + szm = (szm<<1) | 1; + } + to_ix = nactive; + + lck = WLOCK_HASH(tb, from_ix); + ERTS_ASSERT(lck == GET_LOCK_MAYBE(tb,to_ix)); + /* Now a final double check (with the from_ix lock held) + * that we did not get raced by a table fixer. + */ + if (IS_FIXED(tb)) { + WUNLOCK_HASH(lck); + goto abort; + } + erts_atomic_set_nob(&tb->nactive, ++nactive); + if (from_ix == 0) { + if (DB_USING_FINE_LOCKING(tb)) + erts_atomic_set_relb(&tb->szm, szm); + else + erts_atomic_set_nob(&tb->szm, szm); + } + done_resizing(tb); + + /* Finally, let's split the bucket. We try to do it in a smart way + to keep link order and avoid unnecessary updates of next-pointers */ + pnext = &BUCKET(tb, from_ix); + p = *pnext; + to_pnext = &BUCKET(tb, to_ix); + while (p != NULL) { + if (p->hvalue == INVALID_HASH) { /* rare but possible with fine locking */ + *pnext = p->next; + free_term(tb, p); + p = *pnext; + } + else { + int ix = p->hvalue & szm; + if (ix != from_ix) { + ASSERT(ix == (from_ix ^ ((szm+1)>>1))); + *to_pnext = p; + /* Swap "from" and "to": */ + from_ix = ix; + to_pnext = pnext; + } + pnext = &p->next; + p = *pnext; + } + } + *to_pnext = NULL; + WUNLOCK_HASH(lck); - /* Finally, let's split the bucket. We try to do it in a smart way - to keep link order and avoid unnecessary updates of next-pointers */ - pnext = &BUCKET(tb, from_ix); - p = *pnext; - to_pnext = &BUCKET(tb, nactive); - while (p != NULL) { - if (p->hvalue == INVALID_HASH) { /* rare but possible with fine locking */ - *pnext = p->next; - free_term(tb, p); - p = *pnext; - } - else { - int ix = p->hvalue & szm; - if (ix != from_ix) { - ASSERT(ix == (from_ix ^ ((szm+1)>>1))); - *to_pnext = p; - /* Swap "from" and "to": */ - from_ix = ix; - to_pnext = pnext; - } - pnext = &p->next; - p = *pnext; - } - } - *to_pnext = NULL; + }while (--loop_limit && nitems > GROW_LIMIT(nactive)); - WUNLOCK_HASH(lck); return; abort: @@ -2712,60 +2819,78 @@ abort: /* Shrink table by joining top bucket. ** Remove top segment if it gets empty. */ -static void shrink(DbTableHash* tb, int nactive) -{ - if (!begin_resizing(tb)) - return; /* already in progress */ - if (NACTIVE(tb) == nactive) { - erts_smp_rwmtx_t* lck; - int src_ix = nactive - 1; - int low_szm = erts_smp_atomic_read_nob(&tb->szm) >> 1; - int dst_ix = src_ix & low_szm; - - ASSERT(dst_ix < src_ix); - ASSERT(nactive > SEGSZ); - lck = WLOCK_HASH(tb, dst_ix); - /* Double check for racing table fixers */ - if (!IS_FIXED(tb)) { - HashDbTerm** src_bp = &BUCKET(tb, src_ix); - HashDbTerm** dst_bp = &BUCKET(tb, dst_ix); - HashDbTerm** bp = src_bp; - - /* Q: Why join lists by appending "dst" at the end of "src"? - A: Must step through "src" anyway to purge pseudo deleted. */ - while(*bp != NULL) { - if ((*bp)->hvalue == INVALID_HASH) { - HashDbTerm* deleted = *bp; - *bp = deleted->next; - free_term(tb, deleted); - } else { - bp = &(*bp)->next; - } - } - *bp = *dst_bp; - *dst_bp = *src_bp; - *src_bp = NULL; - - erts_smp_atomic_set_nob(&tb->nactive, src_ix); - if (dst_ix == 0) { - erts_smp_atomic_set_relb(&tb->szm, low_szm); - } - WUNLOCK_HASH(lck); - - if (tb->nslots - src_ix >= SEGSZ) { - free_seg(tb, 0); - } - } - else { - WUNLOCK_HASH(lck); - } +static void shrink(DbTableHash* tb, int nitems) +{ + HashDbTerm** src_bp; + HashDbTerm** dst_bp; + HashDbTerm** bp; + erts_rwmtx_t* lck; + int src_ix, dst_ix, low_szm; + int nactive; + int loop_limit = 5; - } - /*else already done */ + do { + if (!begin_resizing(tb)) + return; /* already in progress */ + nactive = NACTIVE(tb); + if (!(nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive))) { + goto abort; /* already done (race) */ + } + src_ix = nactive - 1; + low_szm = erts_atomic_read_nob(&tb->szm) >> 1; + dst_ix = src_ix & low_szm; + + ASSERT(dst_ix < src_ix); + ASSERT(nactive > FIRST_SEGSZ); + lck = WLOCK_HASH(tb, dst_ix); + ERTS_ASSERT(lck == GET_LOCK_MAYBE(tb,src_ix)); + /* Double check for racing table fixers */ + if (IS_FIXED(tb)) { + WUNLOCK_HASH(lck); + goto abort; + } + + src_bp = &BUCKET(tb, src_ix); + dst_bp = &BUCKET(tb, dst_ix); + bp = src_bp; + + /* + * We join lists by appending "dst" at the end of "src" + * as we must step through "src" anyway to purge pseudo deleted. + */ + while(*bp != NULL) { + if ((*bp)->hvalue == INVALID_HASH) { + HashDbTerm* deleted = *bp; + *bp = deleted->next; + free_term(tb, deleted); + } else { + bp = &(*bp)->next; + } + } + *bp = *dst_bp; + *dst_bp = *src_bp; + *src_bp = NULL; + + nactive = src_ix; + erts_atomic_set_nob(&tb->nactive, nactive); + if (dst_ix == 0) { + erts_atomic_set_relb(&tb->szm, low_szm); + } + WUNLOCK_HASH(lck); + + if (tb->nslots - src_ix >= EXT_SEGSZ) { + free_seg(tb, 0); + } + done_resizing(tb); + + } while (--loop_limit + && nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive)); + return; + +abort: done_resizing(tb); } - /* Search a list of tuples for a matching key */ static HashDbTerm* search_list(DbTableHash* tb, Eterm key, @@ -2784,14 +2909,14 @@ static HashDbTerm* search_list(DbTableHash* tb, Eterm key, /* It return the next live object in a table, NULL if no more */ /* In-bucket: RLOCKED */ /* Out-bucket: RLOCKED unless NULL */ -static HashDbTerm* next(DbTableHash *tb, Uint *iptr, erts_smp_rwmtx_t** lck_ptr, - HashDbTerm *list) +static HashDbTerm* next_live(DbTableHash *tb, Uint *iptr, erts_rwmtx_t** lck_ptr, + HashDbTerm *list) { int i; - ERTS_SMP_LC_ASSERT(IS_HASH_RLOCKED(tb,*iptr)); + ERTS_LC_ASSERT(IS_HASH_RLOCKED(tb,*iptr)); - for (list = list->next; list != NULL; list = list->next) { + for ( ; list != NULL; list = list->next) { if (list->hvalue != INVALID_HASH) return list; } @@ -2819,7 +2944,7 @@ db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj, DbTableHash *tb = &tbl->hash; HashValue hval; HashDbTerm **bp, *b; - erts_smp_rwmtx_t* lck; + erts_rwmtx_t* lck; int flags = 0; ASSERT(tb->common.status & DB_SET); @@ -2875,7 +3000,7 @@ db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj, q->next = next; q->hvalue = hval; *bp = b = q; - erts_smp_atomic_inc_nob(&tb->common.nitems); + erts_atomic_inc_nob(&tb->common.nitems); } HRelease(p, hend, htop); @@ -2901,10 +3026,10 @@ db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle) DbTableHash *tb = &tbl->hash; HashDbTerm **bp = (HashDbTerm **) handle->bp; HashDbTerm *b = *bp; - erts_smp_rwmtx_t* lck = (erts_smp_rwmtx_t*) handle->lck; + erts_rwmtx_t* lck = (erts_rwmtx_t*) handle->lck; HashDbTerm* free_me = NULL; - ERTS_SMP_LC_ASSERT(IS_HASH_WLOCKED(tb, lck)); /* locked by db_lookup_dbterm_hash */ + ERTS_LC_ASSERT(IS_HASH_WLOCKED(tb, lck)); /* locked by db_lookup_dbterm_hash */ ASSERT((&b->dbterm == handle->dbterm) == !(tb->common.compress && handle->flags & DB_MUST_RESIZE)); @@ -2918,7 +3043,7 @@ db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle) } WUNLOCK_HASH(lck); - erts_smp_atomic_dec_nob(&tb->common.nitems); + erts_atomic_dec_nob(&tb->common.nitems); try_shrink(tb); } else { if (handle->flags & DB_MUST_RESIZE) { @@ -2927,12 +3052,12 @@ db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle) } if (handle->flags & DB_INC_TRY_GROW) { int nactive; - int nitems = erts_smp_atomic_inc_read_nob(&tb->common.nitems); + int nitems = erts_atomic_inc_read_nob(&tb->common.nitems); WUNLOCK_HASH(lck); nactive = NACTIVE(tb); - if (nitems > nactive * (CHAIN_LEN + 1) && !IS_FIXED(tb)) { - grow(tb, nactive); + if (nitems > GROW_LIMIT(nactive) && !IS_FIXED(tb)) { + grow(tb, nitems); } } else { WUNLOCK_HASH(lck); @@ -2955,7 +3080,7 @@ static int db_delete_all_objects_hash(Process* p, DbTable* tbl) } else { db_free_table_hash(tbl); db_create_hash(p, tbl); - erts_smp_atomic_set_nob(&tbl->hash.common.nitems, 0); + erts_atomic_set_nob(&tbl->hash.common.nitems, 0); } return 0; } @@ -2985,7 +3110,7 @@ void db_foreach_offheap_hash(DbTable *tbl, void db_calc_stats_hash(DbTableHash* tb, DbHashStats* stats) { HashDbTerm* b; - erts_smp_rwmtx_t* lck; + erts_rwmtx_t* lck; int sum = 0; int sq_sum = 0; int kept_items = 0; @@ -3016,24 +3141,30 @@ void db_calc_stats_hash(DbTableHash* tb, DbHashStats* stats) stats->std_dev_expected = sqrt(stats->avg_chain_len * (1 - 1.0/NACTIVE(tb))); stats->kept_items = kept_items; } -#ifdef HARDDEBUG -void db_check_table_hash(DbTable *tbl) +/* For testing only */ +Eterm erts_ets_hash_sizeof_ext_segtab(void) { - DbTableHash *tb = &tbl->hash; - HashDbTerm* list; - int j; - - for (j = 0; j < tb->nactive; j++) { - if ((list = BUCKET(tb,j)) != 0) { - while (list != 0) { - if (!is_tuple(make_tuple(list->dbterm.tpl))) { - erts_exit(ERTS_ERROR_EXIT, "Bad term in slot %d of ets table", j); - } - list = list->next; - } - } - } + return make_small(((SIZEOF_EXT_SEGTAB(0)-1) / sizeof(UWord)) + 1); } -#endif +#ifdef ERTS_ENABLE_LOCK_COUNT +void erts_lcnt_enable_db_hash_lock_count(DbTableHash *tb, int enable) { + int i; + + if(tb->locks == NULL) { + return; + } + + for(i = 0; i < DB_HASH_LOCK_CNT; i++) { + erts_lcnt_ref_t *ref = &tb->locks->lck_vec[i].lck.lcnt; + + if(enable) { + erts_lcnt_install_new_lock_info(ref, "db_hash_slot", tb->common.the_name, + ERTS_LOCK_TYPE_RWMUTEX | ERTS_LOCK_FLAGS_CATEGORY_DB); + } else { + erts_lcnt_uninstall(ref); + } + } +} +#endif /* ERTS_ENABLE_LOCK_COUNT */ |