/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 1998-2018. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* %CopyrightEnd%
*/
/*
** Implementation of unordered ETS tables.
** The tables are implemented as linear dynamic hash tables.
** https://en.wikipedia.org/wiki/Linear_hashing
*/
/* SMP:
** The hash table supports two different locking "modes",
** coarse grained and fine grained locking.
**
** Coarse grained locking relies entirely on the caller (erl_db.c) to obtain
** the right kind of lock on the entire table depending on operation (reading
** or writing). No further locking is then done by the table itself.
**
** Fine grained locking is supported by this code to allow concurrent updates
** (and reading) to different parts of the table. This works by keeping one
** rw-mtx for every N'th bucket. Even dynamic growing and shrinking by
** rehashing buckets can be done without exclusive table lock.
**
** A table will support fine grained locking if it is created with flag
** DB_FINE_LOCKED set. The table variable is_thread_safe will then indicate
** if operations need to obtain fine grained locks or not. Some operations
** will for example always use exclusive table lock to guarantee
** a higher level of atomicity.
*/
/* FIXATION:
** Fixating the table, by ets:safe_fixtable or as done by select-operations,
** guarantees two things in current implementation.
** (1) Keys will not *totaly* disappear from the table. A key can thus be used
** as an iterator to find the next key in iteration sequence. Note however
** that this does not mean that (pointers to) table objects are guaranteed
** to be maintained while the table is fixated. A BAG or DBAG may actually
** remove objects as long as there is at least one object left in the table
** with the same key (alive or pseudo-deleted).
** (2) Objects will not be moved between buckets due to table grow/shrink.
** This will guarantee that iterations do not miss keys or get double-hits.
**
** With fine grained locking, a concurrent thread can fixate the table at any
** time. A "dangerous" operation (delete or move) therefore needs to check
** if the table is fixated while write-locking the bucket.
*/
/*
#ifdef DEBUG
#define HARDDEBUG 1
#endif
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "sys.h"
#include "erl_vm.h"
#include "global.h"
#include "erl_process.h"
#include "error.h"
#define ERTS_WANT_DB_INTERNAL__
#include "erl_db.h"
#include "bif.h"
#include "big.h"
#include "export.h"
#include "erl_binary.h"
#include "erl_db_hash.h"
/*
* The following symbols can be manipulated to "tune" the linear hash array
*/
#define GROW_LIMIT(NACTIVE) ((NACTIVE)*1)
#define SHRINK_LIMIT(NACTIVE) ((NACTIVE) / 2)
/*
** We want the first mandatory segment to be small (to reduce minimal footprint)
** and larger extra segments (to reduce number of alloc/free calls).
*/
/* Number of slots in first segment */
#define FIRST_SEGSZ_EXP 8
#define FIRST_SEGSZ (1 << FIRST_SEGSZ_EXP)
#define FIRST_SEGSZ_MASK (FIRST_SEGSZ - 1)
/* Number of slots per extra segment */
#define EXT_SEGSZ_EXP 11
#define EXT_SEGSZ (1 << EXT_SEGSZ_EXP)
#define EXT_SEGSZ_MASK (EXT_SEGSZ-1)
#define NSEG_1 (ErtsSizeofMember(DbTableHash,first_segtab) / sizeof(struct segment*))
#define NSEG_2 256 /* Size of second segment table */
#define NSEG_INC 128 /* Number of segments to grow after that */
# define DB_USING_FINE_LOCKING(TB) (((TB))->common.type & DB_FINE_LOCKED)
#ifdef ETHR_ORDERED_READ_DEPEND
#define SEGTAB(tb) ((struct segment**) erts_atomic_read_nob(&(tb)->segtab))
#else
#define SEGTAB(tb) \
(DB_USING_FINE_LOCKING(tb) \
? ((struct segment**) erts_atomic_read_ddrb(&(tb)->segtab)) \
: ((struct segment**) erts_atomic_read_nob(&(tb)->segtab)))
#endif
#define NACTIVE(tb) ((int)erts_atomic_read_nob(&(tb)->nactive))
#define NITEMS(tb) ((int)erts_atomic_read_nob(&(tb)->common.nitems))
#define SLOT_IX_TO_SEG_IX(i) (((i)+(EXT_SEGSZ-FIRST_SEGSZ)) >> EXT_SEGSZ_EXP)
#define BUCKET(tb, i) SEGTAB(tb)[SLOT_IX_TO_SEG_IX(i)]->buckets[(i) & EXT_SEGSZ_MASK]
/*
* When deleting a table, the number of records to delete.
* Approximate number, because we must delete entire buckets.
*/
#define DELETE_RECORD_LIMIT 10000
/* Calculate slot index from hash value.
** RLOCK_HASH or WLOCK_HASH must be done before.
*/
static ERTS_INLINE Uint hash_to_ix(DbTableHash* tb, HashValue hval)
{
Uint mask = (DB_USING_FINE_LOCKING(tb)
? erts_atomic_read_acqb(&tb->szm)
: erts_atomic_read_nob(&tb->szm));
Uint ix = hval & mask;
if (ix >= erts_atomic_read_nob(&tb->nactive)) {
ix &= mask>>1;
ASSERT(ix < erts_atomic_read_nob(&tb->nactive));
}
return ix;
}
static ERTS_INLINE int link_fixdel(DbTableHash* tb,
FixedDeletion* fixd,
erts_aint_t fixated_by_me)
{
erts_aint_t was_next;
erts_aint_t exp_next;
was_next = erts_atomic_read_acqb(&tb->fixdel);
do { /* Lockless atomic insertion in linked list: */
if (NFIXED(tb) <= fixated_by_me) {
erts_db_free(ERTS_ALC_T_DB_FIX_DEL, (DbTable*)tb,
fixd, sizeof(FixedDeletion));
return 0; /* raced by unfixer */
}
exp_next = was_next;
fixd->next = (FixedDeletion*) exp_next;
was_next = erts_atomic_cmpxchg_mb(&tb->fixdel,
(erts_aint_t) fixd,
exp_next);
}while (was_next != exp_next);
return 1;
}
/* Remember a slot containing a pseudo-deleted item
* Return false if we got raced by unfixing thread
* and the object should be deleted for real.
*/
static int add_fixed_deletion(DbTableHash* tb, int ix,
erts_aint_t fixated_by_me)
{
FixedDeletion* fixd = (FixedDeletion*) erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL,
(DbTable *) tb,
sizeof(FixedDeletion));
ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion));
fixd->slot = ix;
fixd->all = 0;
return link_fixdel(tb, fixd, fixated_by_me);
}
static ERTS_INLINE int is_pseudo_deleted(HashDbTerm* p)
{
return p->pseudo_deleted;
}
/* optimised version of make_hash (normal case? atomic key) */
#define MAKE_HASH(term) \
((is_atom(term) ? (atom_tab(atom_val(term))->slot.bucket.hvalue) : \
make_internal_hash(term, 0)) & MAX_HASH_MASK)
# define DB_HASH_LOCK_MASK (DB_HASH_LOCK_CNT-1)
# define GET_LOCK(tb,hval) (&(tb)->locks->lck_vec[(hval) & DB_HASH_LOCK_MASK].lck)
# define GET_LOCK_MAYBE(tb,hval) ((tb)->common.is_thread_safe ? NULL : GET_LOCK(tb,hval))
/* Fine grained read lock */
static ERTS_INLINE erts_rwmtx_t* RLOCK_HASH(DbTableHash* tb, HashValue hval)
{
if (tb->common.is_thread_safe) {
return NULL;
} else {
erts_rwmtx_t* lck = GET_LOCK(tb,hval);
ASSERT(tb->common.type & DB_FINE_LOCKED);
erts_rwmtx_rlock(lck);
return lck;
}
}
/* Fine grained write lock */
static ERTS_INLINE erts_rwmtx_t* WLOCK_HASH(DbTableHash* tb, HashValue hval)
{
if (tb->common.is_thread_safe) {
return NULL;
} else {
erts_rwmtx_t* lck = GET_LOCK(tb,hval);
ASSERT(tb->common.type & DB_FINE_LOCKED);
erts_rwmtx_rwlock(lck);
return lck;
}
}
static ERTS_INLINE void RUNLOCK_HASH(erts_rwmtx_t* lck)
{
if (lck != NULL) {
erts_rwmtx_runlock(lck);
}
}
static ERTS_INLINE void WUNLOCK_HASH(erts_rwmtx_t* lck)
{
if (lck != NULL) {
erts_rwmtx_rwunlock(lck);
}
}
#ifdef ERTS_ENABLE_LOCK_CHECK
# define IFN_EXCL(tb,cmd) (((tb)->common.is_thread_safe) || (cmd))
# define IS_HASH_RLOCKED(tb,hval) IFN_EXCL(tb,erts_lc_rwmtx_is_rlocked(GET_LOCK(tb,hval)))
# define IS_HASH_WLOCKED(tb,lck) IFN_EXCL(tb,erts_lc_rwmtx_is_rwlocked(lck))
# define IS_TAB_WLOCKED(tb) erts_lc_rwmtx_is_rwlocked(&(tb)->common.rwlock)
#else
# define IS_HASH_RLOCKED(tb,hval) (1)
# define IS_HASH_WLOCKED(tb,hval) (1)
# define IS_TAB_WLOCKED(tb) (1)
#endif
/* Iteration helper
** Returns "next" slot index or 0 if EOT reached.
** Slot READ locks updated accordingly, unlocked if EOT.
*/
static ERTS_INLINE Sint next_slot(DbTableHash* tb, Uint ix,
erts_rwmtx_t** lck_ptr)
{
ix += DB_HASH_LOCK_CNT;
if (ix < NACTIVE(tb)) return ix;
RUNLOCK_HASH(*lck_ptr);
ix = (ix + 1) & DB_HASH_LOCK_MASK;
if (ix != 0) *lck_ptr = RLOCK_HASH(tb,ix);
return ix;
}
/* Same as next_slot but with WRITE locking */
static ERTS_INLINE Sint next_slot_w(DbTableHash* tb, Uint ix,
erts_rwmtx_t** lck_ptr)
{
ix += DB_HASH_LOCK_CNT;
if (ix < NACTIVE(tb)) return ix;
WUNLOCK_HASH(*lck_ptr);
ix = (ix + 1) & DB_HASH_LOCK_MASK;
if (ix != 0) *lck_ptr = WLOCK_HASH(tb,ix);
return ix;
}
static ERTS_INLINE void free_term(DbTableHash *tb, HashDbTerm* p)
{
db_free_term((DbTable*)tb, p, offsetof(HashDbTerm, dbterm));
}
static ERTS_INLINE void free_term_list(DbTableHash *tb, HashDbTerm* p)
{
while (p) {
HashDbTerm* next = p->next;
free_term(tb, p);
p = next;
}
}
/*
* Local types
*/
struct mp_prefound {
HashDbTerm** bucket;
int ix;
};
struct mp_info {
int something_can_match; /* The match_spec is not "impossible" */
int key_given;
struct mp_prefound dlists[10]; /* Default list of "pre-found" buckets */
struct mp_prefound* lists; /* Buckets to search if keys are given,
* = dlists initially */
unsigned num_lists; /* Number of elements in "lists",
* = 0 initially */
Binary *mp; /* The compiled match program */
};
/* A table segment */
struct segment {
HashDbTerm* buckets[1];
};
#define SIZEOF_SEGMENT(N) \
(offsetof(struct segment,buckets) + sizeof(HashDbTerm*)*(N))
/* An extended segment table */
struct ext_segtab {
ErtsThrPrgrLaterOp lop;
struct segment** prev_segtab; /* Used when table is shrinking */
int prev_nsegs; /* Size of prev_segtab */
int nsegs; /* Size of this segtab */
struct segment* segtab[1]; /* The segment table */
};
#define SIZEOF_EXT_SEGTAB(NSEGS) \
(offsetof(struct ext_segtab,segtab) + sizeof(struct segment*)*(NSEGS))
static ERTS_INLINE void SET_SEGTAB(DbTableHash* tb,
struct segment** segtab)
{
if (DB_USING_FINE_LOCKING(tb))
erts_atomic_set_wb(&tb->segtab, (erts_aint_t) segtab);
else
erts_atomic_set_nob(&tb->segtab, (erts_aint_t) segtab);
}
/* Used by select_replace on analyze_pattern */
typedef int (*extra_match_validator_t)(int keypos, Eterm match, Eterm guard, Eterm body);
/*
** Forward decl's (static functions)
*/
static struct ext_segtab* alloc_ext_segtab(DbTableHash* tb, unsigned seg_ix);
static void alloc_seg(DbTableHash *tb);
static int free_seg(DbTableHash *tb, int free_records);
static HashDbTerm* next_live(DbTableHash *tb, Uint *iptr, erts_rwmtx_t** lck_ptr,
HashDbTerm *list);
static HashDbTerm* search_list(DbTableHash* tb, Eterm key,
HashValue hval, HashDbTerm *list);
static void shrink(DbTableHash* tb, int nitems);
static void grow(DbTableHash* tb, int nitems);
static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2,
Uint sz, DbTableHash*);
static int analyze_pattern(DbTableHash *tb, Eterm pattern,
extra_match_validator_t extra_validator, /* Optional callback */
struct mp_info *mpi);
/*
* Method interface functions
*/
static int db_first_hash(Process *p,
DbTable *tbl,
Eterm *ret);
static int db_next_hash(Process *p,
DbTable *tbl,
Eterm key,
Eterm *ret);
static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret);
static int db_get_element_hash(Process *p, DbTable *tbl,
Eterm key, int ndex, Eterm *ret);
static int db_erase_object_hash(DbTable *tbl, Eterm object,Eterm *ret);
static int db_slot_hash(Process *p, DbTable *tbl,
Eterm slot_term, Eterm *ret);
static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Sint chunk_size,
int reverse, Eterm *ret);
static int db_select_hash(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, int reverse, Eterm *ret);
static int db_select_continue_hash(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret);
static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret);
static int db_select_count_continue_hash(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret);
static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret);
static int db_select_delete_continue_hash(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret);
static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret);
static int db_select_replace_continue_hash(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret);
static int db_take_hash(Process *, DbTable *, Eterm, Eterm *);
static void db_print_hash(fmtfn_t to,
void *to_arg,
int show,
DbTable *tbl);
static int db_free_empty_table_hash(DbTable *tbl);
static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds);
static void db_foreach_offheap_hash(DbTable *,
void (*)(ErlOffHeap *, void *),
void *);
static SWord db_delete_all_objects_hash(Process* p, DbTable* tbl, SWord reds);
#ifdef HARDDEBUG
static void db_check_table_hash(DbTableHash *tb);
#endif
static int
db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj,
DbUpdateHandle* handle);
static void
db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle);
static ERTS_INLINE void try_shrink(DbTableHash* tb)
{
int nactive = NACTIVE(tb);
int nitems = NITEMS(tb);
if (nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive)
&& !IS_FIXED(tb)) {
shrink(tb, nitems);
}
}
/* Is this a live object (not pseodo-deleted) with the specified key?
*/
static ERTS_INLINE int has_live_key(DbTableHash* tb, HashDbTerm* b,
Eterm key, HashValue hval)
{
if (b->hvalue != hval || is_pseudo_deleted(b))
return 0;
else {
Eterm itemKey = GETKEY(tb, b->dbterm.tpl);
ASSERT(!is_header(itemKey));
return EQ(key, itemKey);
}
}
/* Has this object the specified key? Can be pseudo-deleted.
*/
static ERTS_INLINE int has_key(DbTableHash* tb, HashDbTerm* b,
Eterm key, HashValue hval)
{
if (b->hvalue != hval)
return 0;
else {
Eterm itemKey = GETKEY(tb, b->dbterm.tpl);
ASSERT(!is_header(itemKey));
return EQ(key, itemKey);
}
}
static ERTS_INLINE HashDbTerm* new_dbterm(DbTableHash* tb, Eterm obj)
{
HashDbTerm* p;
if (tb->common.compress) {
p = db_store_term_comp(&tb->common, NULL, offsetof(HashDbTerm,dbterm), obj);
}
else {
p = db_store_term(&tb->common, NULL, offsetof(HashDbTerm,dbterm), obj);
}
return p;
}
static ERTS_INLINE HashDbTerm* replace_dbterm(DbTableHash* tb, HashDbTerm* old,
Eterm obj)
{
HashDbTerm* ret;
ASSERT(old != NULL);
if (tb->common.compress) {
ret = db_store_term_comp(&tb->common, &(old->dbterm), offsetof(HashDbTerm,dbterm), obj);
}
else {
ret = db_store_term(&tb->common, &(old->dbterm), offsetof(HashDbTerm,dbterm), obj);
}
return ret;
}
/*
** External interface
*/
DbTableMethod db_hash =
{
db_create_hash,
db_first_hash,
db_next_hash,
db_first_hash, /* last == first */
db_next_hash, /* prev == next */
db_put_hash,
db_get_hash,
db_get_element_hash,
db_member_hash,
db_erase_hash,
db_erase_object_hash,
db_slot_hash,
db_select_chunk_hash,
db_select_hash,
db_select_delete_hash,
db_select_continue_hash, /* hmm continue_hash? */
db_select_delete_continue_hash,
db_select_count_hash,
db_select_count_continue_hash,
db_select_replace_hash,
db_select_replace_continue_hash,
db_take_hash,
db_delete_all_objects_hash,
db_free_empty_table_hash,
db_free_table_continue_hash,
db_print_hash,
db_foreach_offheap_hash,
db_lookup_dbterm_hash,
db_finalize_dbterm_hash
};
#ifdef DEBUG
/* Wait a while to provoke race and get code coverage */
static void DEBUG_WAIT(void)
{
unsigned long spin = 1UL << 20;
while (--spin);
}
#else
# define DEBUG_WAIT()
#endif
/* Rare case of restoring the rest of the fixdel list
when "unfixer" gets interrupted by "fixer" */
static void restore_fixdel(DbTableHash* tb, FixedDeletion* fixdel)
{
/*int tries = 0;*/
DEBUG_WAIT();
if (erts_atomic_cmpxchg_relb(&tb->fixdel,
(erts_aint_t) fixdel,
(erts_aint_t) NULL) != (erts_aint_t) NULL) {
/* Oboy, must join lists */
FixedDeletion* last = fixdel;
erts_aint_t was_tail;
erts_aint_t exp_tail;
while (last->next != NULL) last = last->next;
was_tail = erts_atomic_read_acqb(&tb->fixdel);
do { /* Lockless atomic list insertion */
exp_tail = was_tail;
last->next = (FixedDeletion*) exp_tail;
/*++tries;*/
DEBUG_WAIT();
was_tail = erts_atomic_cmpxchg_relb(&tb->fixdel,
(erts_aint_t) fixdel,
exp_tail);
}while (was_tail != exp_tail);
}
/*erts_fprintf(stderr,"erl_db_hash: restore_fixdel tries=%d\r\n", tries);*/
}
/*
** Table interface routines ie what's called by the bif's
*/
SWord db_unfix_table_hash(DbTableHash *tb)
{
FixedDeletion* fixdel;
SWord work = 0;
ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock)
|| (erts_lc_rwmtx_is_rlocked(&tb->common.rwlock)
&& !tb->common.is_thread_safe));
restart:
fixdel = (FixedDeletion*) erts_atomic_xchg_mb(&tb->fixdel,
(erts_aint_t) NULL);
while (fixdel) {
FixedDeletion *free_me;
do {
HashDbTerm **bp;
HashDbTerm *b;
HashDbTerm *free_us = NULL;
erts_rwmtx_t* lck;
lck = WLOCK_HASH(tb, fixdel->slot);
if (IS_FIXED(tb)) { /* interrupted by fixer */
WUNLOCK_HASH(lck);
restore_fixdel(tb,fixdel);
if (!IS_FIXED(tb)) {
goto restart; /* unfixed again! */
}
return work;
}
if (fixdel->slot < NACTIVE(tb)) {
bp = &BUCKET(tb, fixdel->slot);
b = *bp;
while (b != NULL) {
if (is_pseudo_deleted(b)) {
HashDbTerm* nxt = b->next;
b->next = free_us;
free_us = b;
work++;
b = *bp = nxt;
} else {
bp = &b->next;
b = b->next;
}
}
}
/* else slot has been joined and purged by shrink() */
WUNLOCK_HASH(lck);
free_term_list(tb, free_us);
}while (fixdel->all && fixdel->slot-- > 0);
free_me = fixdel;
fixdel = fixdel->next;
erts_db_free(ERTS_ALC_T_DB_FIX_DEL,
(DbTable *) tb,
(void *) free_me,
sizeof(FixedDeletion));
ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion));
work++;
}
/* ToDo: Maybe try grow/shrink the table as well */
return work;
}
int db_create_hash(Process *p, DbTable *tbl)
{
DbTableHash *tb = &tbl->hash;
erts_atomic_init_nob(&tb->szm, FIRST_SEGSZ_MASK);
erts_atomic_init_nob(&tb->nactive, FIRST_SEGSZ);
erts_atomic_init_nob(&tb->fixdel, (erts_aint_t)NULL);
erts_atomic_init_nob(&tb->segtab, (erts_aint_t)NULL);
SET_SEGTAB(tb, tb->first_segtab);
tb->nsegs = NSEG_1;
tb->nslots = FIRST_SEGSZ;
tb->first_segtab[0] = (struct segment*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
(DbTable *) tb,
SIZEOF_SEGMENT(FIRST_SEGSZ));
sys_memset(tb->first_segtab[0], 0, SIZEOF_SEGMENT(FIRST_SEGSZ));
erts_atomic_init_nob(&tb->is_resizing, 0);
if (tb->common.type & DB_FINE_LOCKED) {
erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
int i;
if (tb->common.type & DB_FREQ_READ)
rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
if (erts_ets_rwmtx_spin_count >= 0)
rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
tb->locks = (DbTableHashFineLocks*) erts_db_alloc(ERTS_ALC_T_DB_SEG, /* Other type maybe? */
(DbTable *) tb,
sizeof(DbTableHashFineLocks));
for (i=0; i<DB_HASH_LOCK_CNT; ++i) {
erts_rwmtx_init_opt(&tb->locks->lck_vec[i].lck, &rwmtx_opt,
"db_hash_slot", tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB);
}
/* This important property is needed to guarantee the two buckets
* involved in a grow/shrink operation it protected by the same lock:
*/
ASSERT(erts_atomic_read_nob(&tb->nactive) % DB_HASH_LOCK_CNT == 0);
}
else { /* coarse locking */
tb->locks = NULL;
}
ERTS_THR_MEMORY_BARRIER;
return DB_ERROR_NONE;
}
static int db_first_hash(Process *p, DbTable *tbl, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
Uint ix = 0;
erts_rwmtx_t* lck = RLOCK_HASH(tb,ix);
HashDbTerm* list;
list = BUCKET(tb,ix);
list = next_live(tb, &ix, &lck, list);
if (list != NULL) {
*ret = db_copy_key(p, tbl, &list->dbterm);
RUNLOCK_HASH(lck);
}
else {
*ret = am_EOT;
}
return DB_ERROR_NONE;
}
static int db_next_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
HashValue hval;
Uint ix;
HashDbTerm* b;
erts_rwmtx_t* lck;
hval = MAKE_HASH(key);
lck = RLOCK_HASH(tb,hval);
ix = hash_to_ix(tb, hval);
b = BUCKET(tb, ix);
for (;;) {
if (b == NULL) {
RUNLOCK_HASH(lck);
return DB_ERROR_BADKEY;
}
if (has_key(tb, b, key, hval)) {
break;
}
b = b->next;
}
/* Key found */
b = next_live(tb, &ix, &lck, b->next);
if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
while (b != 0) {
if (!has_live_key(tb, b, key, hval)) {
break;
}
b = next_live(tb, &ix, &lck, b->next);
}
}
if (b == NULL) {
*ret = am_EOT;
}
else {
*ret = db_copy_key(p, tbl, &b->dbterm);
RUNLOCK_HASH(lck);
}
return DB_ERROR_NONE;
}
int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail)
{
DbTableHash *tb = &tbl->hash;
HashValue hval;
int ix;
Eterm key;
HashDbTerm** bp;
HashDbTerm* b;
HashDbTerm* q;
erts_rwmtx_t* lck;
int nitems;
int ret = DB_ERROR_NONE;
key = GETKEY(tb, tuple_val(obj));
hval = MAKE_HASH(key);
lck = WLOCK_HASH(tb, hval);
ix = hash_to_ix(tb, hval);
bp = &BUCKET(tb, ix);
b = *bp;
for (;;) {
if (b == NULL) {
goto Lnew;
}
if (has_key(tb,b,key,hval)) {
break;
}
bp = &b->next;
b = b->next;
}
/* Key found
*/
if (tb->common.status & DB_SET) {
HashDbTerm* bnext = b->next;
if (is_pseudo_deleted(b)) {
erts_atomic_inc_nob(&tb->common.nitems);
b->pseudo_deleted = 0;
}
else if (key_clash_fail) {
ret = DB_ERROR_BADKEY;
goto Ldone;
}
q = replace_dbterm(tb, b, obj);
q->next = bnext;
ASSERT(q->hvalue == hval);
*bp = q;
goto Ldone;
}
else if (key_clash_fail) { /* && (DB_BAG || DB_DUPLICATE_BAG) */
q = b;
do {
if (!is_pseudo_deleted(q)) {
ret = DB_ERROR_BADKEY;
goto Ldone;
}
q = q->next;
}while (q != NULL && has_key(tb,q,key,hval));
}
else if (tb->common.status & DB_BAG) {
HashDbTerm** qp = bp;
q = b;
do {
if (db_eq(&tb->common,obj,&q->dbterm)) {
if (is_pseudo_deleted(q)) {
erts_atomic_inc_nob(&tb->common.nitems);
q->pseudo_deleted = 0;
ASSERT(q->hvalue == hval);
if (q != b) { /* must move to preserve key insertion order */
*qp = q->next;
q->next = b;
*bp = q;
}
}
goto Ldone;
}
qp = &q->next;
q = *qp;
}while (q != NULL && has_key(tb,q,key,hval));
}
/*else DB_DUPLICATE_BAG */
Lnew:
q = new_dbterm(tb, obj);
q->hvalue = hval;
q->pseudo_deleted = 0;
q->next = b;
*bp = q;
nitems = erts_atomic_inc_read_nob(&tb->common.nitems);
WUNLOCK_HASH(lck);
{
int nactive = NACTIVE(tb);
if (nitems > GROW_LIMIT(nactive) && !IS_FIXED(tb)) {
grow(tb, nitems);
}
}
return DB_ERROR_NONE;
Ldone:
WUNLOCK_HASH(lck);
return ret;
}
static Eterm
get_term_list(Process *p, DbTableHash *tb, Eterm key, HashValue hval,
HashDbTerm *b1, HashDbTerm **bend)
{
HashDbTerm* b2 = b1->next;
Eterm copy;
Uint sz = b1->dbterm.size + 2;
if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
while (b2 && has_key(tb, b2, key, hval)) {
if (!is_pseudo_deleted(b2))
sz += b2->dbterm.size + 2;
b2 = b2->next;
}
}
copy = build_term_list(p, b1, b2, sz, tb);
if (bend) {
*bend = b2;
}
return copy;
}
int db_get_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
HashValue hval;
int ix;
HashDbTerm* b;
erts_rwmtx_t* lck;
hval = MAKE_HASH(key);
lck = RLOCK_HASH(tb,hval);
ix = hash_to_ix(tb, hval);
b = BUCKET(tb, ix);
while(b != 0) {
if (has_live_key(tb, b, key, hval)) {
*ret = get_term_list(p, tb, key, hval, b, NULL);
goto done;
}
b = b->next;
}
*ret = NIL;
done:
RUNLOCK_HASH(lck);
return DB_ERROR_NONE;
}
static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
HashValue hval;
int ix;
HashDbTerm* b1;
erts_rwmtx_t* lck;
hval = MAKE_HASH(key);
ix = hash_to_ix(tb, hval);
lck = RLOCK_HASH(tb, hval);
b1 = BUCKET(tb, ix);
while(b1 != 0) {
if (has_live_key(tb,b1,key,hval)) {
*ret = am_true;
goto done;
}
b1 = b1->next;
}
*ret = am_false;
done:
RUNLOCK_HASH(lck);
return DB_ERROR_NONE;
}
static int db_get_element_hash(Process *p, DbTable *tbl,
Eterm key,
int ndex,
Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
HashValue hval;
int ix;
HashDbTerm* b1;
erts_rwmtx_t* lck;
int retval;
hval = MAKE_HASH(key);
lck = RLOCK_HASH(tb, hval);
ix = hash_to_ix(tb, hval);
b1 = BUCKET(tb, ix);
while(b1 != 0) {
if (has_live_key(tb,b1,key,hval)) {
if (ndex > arityval(b1->dbterm.tpl[0])) {
retval = DB_ERROR_BADITEM;
goto done;
}
if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
HashDbTerm* b;
HashDbTerm* b2 = b1->next;
Eterm elem_list = NIL;
while(b2 != NULL && has_key(tb,b2,key,hval)) {
if (ndex > arityval(b2->dbterm.tpl[0])
&& !is_pseudo_deleted(b2)) {
retval = DB_ERROR_BADITEM;
goto done;
}
b2 = b2->next;
}
b = b1;
while(b != b2) {
if (!is_pseudo_deleted(b)) {
Eterm *hp;
Eterm copy = db_copy_element_from_ets(&tb->common, p,
&b->dbterm, ndex, &hp, 2);
elem_list = CONS(hp, copy, elem_list);
}
b = b->next;
}
*ret = elem_list;
}
else {
Eterm* hp;
*ret = db_copy_element_from_ets(&tb->common, p, &b1->dbterm, ndex, &hp, 0);
}
retval = DB_ERROR_NONE;
goto done;
}
b1 = b1->next;
}
retval = DB_ERROR_BADKEY;
done:
RUNLOCK_HASH(lck);
return retval;
}
/*
** NB, this is for the db_erase/2 bif.
*/
int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
HashValue hval;
int ix;
HashDbTerm** bp;
HashDbTerm* b;
HashDbTerm* free_us = NULL;
erts_rwmtx_t* lck;
int nitems_diff = 0;
hval = MAKE_HASH(key);
lck = WLOCK_HASH(tb,hval);
ix = hash_to_ix(tb, hval);
bp = &BUCKET(tb, ix);
b = *bp;
while(b != 0) {
if (has_live_key(tb,b,key,hval)) {
--nitems_diff;
if (nitems_diff == -1 && IS_FIXED(tb)
&& add_fixed_deletion(tb, ix, 0)) {
/* Pseudo remove (no need to keep several of same key) */
b->pseudo_deleted = 1;
} else {
HashDbTerm* next = b->next;
b->next = free_us;
free_us = b;
b = *bp = next;
continue;
}
}
else {
if (nitems_diff && !is_pseudo_deleted(b))
break;
}
bp = &b->next;
b = b->next;
}
WUNLOCK_HASH(lck);
if (nitems_diff) {
erts_atomic_add_nob(&tb->common.nitems, nitems_diff);
try_shrink(tb);
}
free_term_list(tb, free_us);
*ret = am_true;
return DB_ERROR_NONE;
}
/*
** This is for the ets:delete_object BIF
*/
static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
HashValue hval;
int ix;
HashDbTerm** bp;
HashDbTerm* b;
HashDbTerm* free_us = NULL;
erts_rwmtx_t* lck;
int nitems_diff = 0;
int nkeys = 0;
Eterm key;
key = GETKEY(tb, tuple_val(object));
hval = MAKE_HASH(key);
lck = WLOCK_HASH(tb,hval);
ix = hash_to_ix(tb, hval);
bp = &BUCKET(tb, ix);
b = *bp;
while(b != 0) {
if (has_live_key(tb,b,key,hval)) {
++nkeys;
if (db_eq(&tb->common,object, &b->dbterm)) {
--nitems_diff;
if (nkeys==1 && IS_FIXED(tb) && add_fixed_deletion(tb,ix,0)) {
b->pseudo_deleted = 1;
bp = &b->next;
b = b->next;
} else {
HashDbTerm* next = b->next;
b->next = free_us;
free_us = b;
b = *bp = next;
}
if (tb->common.status & (DB_DUPLICATE_BAG)) {
continue;
} else {
break;
}
}
}
else if (nitems_diff && !is_pseudo_deleted(b)) {
break;
}
bp = &b->next;
b = b->next;
}
WUNLOCK_HASH(lck);
if (nitems_diff) {
erts_atomic_add_nob(&tb->common.nitems, nitems_diff);
try_shrink(tb);
}
free_term_list(tb, free_us);
*ret = am_true;
return DB_ERROR_NONE;
}
static int db_slot_hash(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
erts_rwmtx_t* lck;
Sint slot;
int retval;
int nactive;
if (is_not_small(slot_term) || ((slot = signed_val(slot_term)) < 0)) {
return DB_ERROR_BADPARAM;
}
lck = RLOCK_HASH(tb, slot);
nactive = NACTIVE(tb);
if (slot < nactive) {
*ret = build_term_list(p, BUCKET(tb, slot), NULL, 0, tb);
retval = DB_ERROR_NONE;
}
else if (slot == nactive) {
*ret = am_EOT;
retval = DB_ERROR_NONE;
}
else {
retval = DB_ERROR_BADPARAM;
}
RUNLOCK_HASH(lck);
return retval;
}
/*
* Match traversal callbacks
*/
typedef struct match_callbacks_t_ match_callbacks_t;
struct match_callbacks_t_
{
/* Called when no match is possible.
* context_ptr: Pointer to context
* ret: Pointer to traversal function term return.
*
* Both the direct return value and 'ret' are used as the traversal function return values.
*/
int (*on_nothing_can_match)(match_callbacks_t* ctx, Eterm* ret);
/* Called for each match result.
* context_ptr: Pointer to context
* slot_ix: Current slot index
* current_ptr_ptr: Triple pointer to either the bucket or the 'next' pointer in the previous element;
* can be (carefully) used to adjust iteration when deleting or replacing elements.
* match_res: The result of running the match program against the current term.
*
* Should return 1 for successful match, 0 otherwise.
*/
int (*on_match_res)(match_callbacks_t* ctx, Sint slot_ix,
HashDbTerm*** current_ptr_ptr, Eterm match_res);
/* Called when either we've matched enough elements in this cycle or EOT was reached.
* context_ptr: Pointer to context
* slot_ix: Current slot index
* got: How many elements have been matched so far
* iterations_left: Number of intended iterations (down from an initial max.) left in this traversal cycle
* mpp: Double pointer to the compiled match program
* ret: Pointer to traversal function term return.
*
* Both the direct return value and 'ret' are used as the traversal function return values.
* If *mpp is set to NULL, it won't be deallocated (useful for trapping.)
*/
int (*on_loop_ended)(match_callbacks_t* ctx, Sint slot_ix, Sint got,
Sint iterations_left, Binary** mpp, Eterm* ret);
/* Called when it's time to trap
* context_ptr: Pointer to context
* slot_ix: Current slot index
* got: How many elements have been matched so far
* mpp: Double pointer to the compiled match program
* ret: Pointer to traversal function term return.
*
* Both the direct return value and 'ret' are used as the traversal function return values.
* If *mpp is set to NULL, it won't be deallocated (useful for trapping.)
*/
int (*on_trap)(match_callbacks_t* ctx, Sint slot_ix, Sint got, Binary** mpp,
Eterm* ret);
};
/*
* Begin hash table match traversal
*/
static int match_traverse(Process* p, DbTableHash* tb,
Eterm pattern,
extra_match_validator_t extra_match_validator, /* Optional */
Sint chunk_size, /* If 0, no chunking */
Sint iterations_left, /* Nr. of iterations left */
Eterm** hpp, /* Heap */
int lock_for_write, /* Set to 1 if we're going to delete or
modify existing terms */
match_callbacks_t* ctx,
Eterm* ret)
{
Sint slot_ix; /* Slot index */
HashDbTerm** current_ptr; /* Refers to either the bucket pointer or
* the 'next' pointer in the previous term
*/
HashDbTerm* saved_current; /* Helper to avoid double skip on match */
struct mp_info mpi;
unsigned current_list_pos = 0; /* Prefound buckets list index */
Eterm match_res;
Sint got = 0; /* Matched terms counter */
erts_rwmtx_t* lck; /* Slot lock */
int ret_value;
erts_rwmtx_t* (*lock_hash_function)(DbTableHash*, HashValue)
= (lock_for_write ? WLOCK_HASH : RLOCK_HASH);
void (*unlock_hash_function)(erts_rwmtx_t*)
= (lock_for_write ? WUNLOCK_HASH : RUNLOCK_HASH);
Sint (*next_slot_function)(DbTableHash*, Uint, erts_rwmtx_t**)
= (lock_for_write ? next_slot_w : next_slot);
if ((ret_value = analyze_pattern(tb, pattern, extra_match_validator, &mpi))
!= DB_ERROR_NONE)
{
*ret = NIL;
goto done;
}
if (!mpi.something_can_match) {
/* Can't possibly match anything */
ret_value = ctx->on_nothing_can_match(ctx, ret);
goto done;
}
/*
* Look for initial slot / bucket
*/
if (!mpi.key_given) {
/* Run this code if pattern is variable or GETKEY(pattern) */
/* is a variable */
slot_ix = 0;
lck = lock_hash_function(tb,slot_ix);
for (;;) {
ASSERT(slot_ix < NACTIVE(tb));
if (*(current_ptr = &BUCKET(tb,slot_ix)) != NULL) {
break;
}
slot_ix = next_slot_function(tb,slot_ix,&lck);
if (slot_ix == 0) {
ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left,
&mpi.mp, ret);
goto done;
}
}
} else {
/* We have at least one */
slot_ix = mpi.lists[current_list_pos].ix;
lck = lock_hash_function(tb, slot_ix);
current_ptr = mpi.lists[current_list_pos].bucket;
ASSERT(*current_ptr == BUCKET(tb,slot_ix));
++current_list_pos;
}
/*
* Execute traversal cycle
*/
for(;;) {
if (*current_ptr != NULL) {
if (!is_pseudo_deleted(*current_ptr)) {
match_res = db_match_dbterm(&tb->common, p, mpi.mp,
&(*current_ptr)->dbterm, hpp, 2);
saved_current = *current_ptr;
if (ctx->on_match_res(ctx, slot_ix, ¤t_ptr, match_res)) {
++got;
}
--iterations_left;
if (*current_ptr != saved_current) {
/* Don't advance to next, the callback did it already */
continue;
}
}
current_ptr = &((*current_ptr)->next);
}
else if (mpi.key_given) { /* Key is bound */
unlock_hash_function(lck);
if (current_list_pos == mpi.num_lists) {
ret_value = ctx->on_loop_ended(ctx, -1, got, iterations_left, &mpi.mp, ret);
goto done;
} else {
slot_ix = mpi.lists[current_list_pos].ix;
lck = lock_hash_function(tb, slot_ix);
current_ptr = mpi.lists[current_list_pos].bucket;
ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix));
++current_list_pos;
}
}
else { /* Key is variable */
if ((slot_ix = next_slot_function(tb,slot_ix,&lck)) == 0) {
slot_ix = -1;
break;
}
if (chunk_size && got >= chunk_size) {
unlock_hash_function(lck);
break;
}
if (iterations_left <= 0 || MBUF(p)) {
/*
* We have either reached our limit, or just created some heap fragments.
* Since many heap fragments will make the GC slower, trap and GC now.
*/
unlock_hash_function(lck);
ret_value = ctx->on_trap(ctx, slot_ix, got, &mpi.mp, ret);
goto done;
}
current_ptr = &BUCKET(tb,slot_ix);
}
}
ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, &mpi.mp, ret);
done:
/* We should only jump directly to this label if
* we've already called ctx->nothing_can_match / loop_ended / trap
*/
if (mpi.mp != NULL) {
erts_bin_free(mpi.mp);
}
if (mpi.lists != mpi.dlists) {
erts_free(ERTS_ALC_T_DB_SEL_LIST,
(void *) mpi.lists);
}
return ret_value;
}
/*
* Continue hash table match traversal
*/
static int match_traverse_continue(Process* p, DbTableHash* tb,
Sint chunk_size, /* If 0, no chunking */
Sint iterations_left, /* Nr. of iterations left */
Eterm** hpp, /* Heap */
Sint slot_ix, /* Slot index to resume traversal from */
Sint got, /* Matched terms counter */
Binary** mpp, /* Existing match program */
int lock_for_write, /* Set to 1 if we're going to delete or
modify existing terms */
match_callbacks_t* ctx,
Eterm* ret)
{
HashDbTerm** current_ptr; /* Refers to either the bucket pointer or
* the 'next' pointer in the previous term
*/
HashDbTerm* saved_current; /* Helper to avoid double skip on match */
Eterm match_res;
erts_rwmtx_t* lck;
int ret_value;
erts_rwmtx_t* (*lock_hash_function)(DbTableHash*, HashValue)
= (lock_for_write ? WLOCK_HASH : RLOCK_HASH);
void (*unlock_hash_function)(erts_rwmtx_t*)
= (lock_for_write ? WUNLOCK_HASH : RUNLOCK_HASH);
Sint (*next_slot_function)(DbTableHash* tb, Uint ix, erts_rwmtx_t** lck_ptr)
= (lock_for_write ? next_slot_w : next_slot);
if (got < 0) {
*ret = NIL;
return DB_ERROR_BADPARAM;
}
if (slot_ix < 0 /* EOT */
|| (chunk_size && got >= chunk_size))
{
/* Already got all or enough in the match_list */
ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, mpp, ret);
goto done;
}
lck = lock_hash_function(tb, slot_ix);
if (slot_ix >= NACTIVE(tb)) { /* Is this possible? */
unlock_hash_function(lck);
*ret = NIL;
ret_value = DB_ERROR_BADPARAM;
goto done;
}
/*
* Resume traversal cycle from where we left
*/
current_ptr = &BUCKET(tb,slot_ix);
for(;;) {
if (*current_ptr != NULL) {
if (!is_pseudo_deleted(*current_ptr)) {
match_res = db_match_dbterm(&tb->common, p, *mpp,
&(*current_ptr)->dbterm, hpp, 2);
saved_current = *current_ptr;
if (ctx->on_match_res(ctx, slot_ix, ¤t_ptr, match_res)) {
++got;
}
--iterations_left;
if (*current_ptr != saved_current) {
/* Don't advance to next, the callback did it already */
continue;
}
}
current_ptr = &((*current_ptr)->next);
}
else {
if ((slot_ix=next_slot_function(tb,slot_ix,&lck)) == 0) {
slot_ix = -1;
break;
}
if (chunk_size && got >= chunk_size) {
unlock_hash_function(lck);
break;
}
if (iterations_left <= 0 || MBUF(p)) {
/*
* We have either reached our limit, or just created some heap fragments.
* Since many heap fragments will make the GC slower, trap and GC now.
*/
unlock_hash_function(lck);
ret_value = ctx->on_trap(ctx, slot_ix, got, mpp, ret);
goto done;
}
current_ptr = &BUCKET(tb,slot_ix);
}
}
ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, mpp, ret);
done:
/* We should only jump directly to this label if
* we've already called on_loop_ended / on_trap
*/
return ret_value;
}
/*
* Common traversal trapping/continuation code;
* used by select_count, select_delete and select_replace,
* as well as their continuation-handling counterparts.
*/
static ERTS_INLINE int on_simple_trap(Export* trap_function,
Process* p,
DbTableHash* tb,
Eterm tid,
Eterm* prev_continuation_tptr,
Sint slot_ix,
Sint got,
Binary** mpp,
Eterm* ret)
{
Eterm* hp;
Eterm egot;
Eterm mpb;
Eterm continuation;
int is_first_trap = (prev_continuation_tptr == NULL);
size_t base_halloc_sz = (is_first_trap ? ERTS_MAGIC_REF_THING_SIZE : 0);
BUMP_ALL_REDS(p);
if (IS_USMALL(0, got)) {
hp = HAllocX(p, base_halloc_sz + 5, ERTS_MAGIC_REF_THING_SIZE);
egot = make_small(got);
}
else {
hp = HAllocX(p, base_halloc_sz + BIG_UINT_HEAP_SIZE + 5,
ERTS_MAGIC_REF_THING_SIZE);
egot = uint_to_big(got, hp);
hp += BIG_UINT_HEAP_SIZE;
}
if (is_first_trap) {
if (is_atom(tid))
tid = erts_db_make_tid(p, &tb->common);
mpb = erts_db_make_match_prog_ref(p, *mpp, &hp);
*mpp = NULL; /* otherwise the caller will destroy it */
}
else {
ASSERT(!is_atom(tid));
mpb = prev_continuation_tptr[3];
}
continuation = TUPLE4(
hp,
tid,
make_small(slot_ix),
mpb,
egot);
ERTS_BIF_PREP_TRAP1(*ret, trap_function, p, continuation);
return DB_ERROR_NONE;
}
static ERTS_INLINE int unpack_simple_continuation(Eterm continuation,
Eterm** tptr_ptr,
Eterm* tid_ptr,
Sint* slot_ix_p,
Binary** mpp,
Sint* got_p)
{
Eterm* tptr;
ASSERT(is_tuple(continuation));
tptr = tuple_val(continuation);
if (arityval(*tptr) != 4)
return 1;
if (! is_small(tptr[2]) || !(is_big(tptr[4]) || is_small(tptr[4]))) {
return 1;
}
*tptr_ptr = tptr;
*tid_ptr = tptr[1];
*slot_ix_p = unsigned_val(tptr[2]);
*mpp = erts_db_get_match_prog_binary_unchecked(tptr[3]);
if (is_big(tptr[4])) {
*got_p = big_to_uint32(tptr[4]);
}
else {
*got_p = unsigned_val(tptr[4]);
}
return 0;
}
/*
*
* select / select_chunk match traversal
*
*/
#define MAX_SELECT_CHUNK_ITERATIONS 1000
typedef struct {
match_callbacks_t base;
Process* p;
DbTableHash* tb;
Eterm tid;
Eterm* hp;
Sint chunk_size;
Eterm match_list;
Eterm* prev_continuation_tptr;
} select_chunk_context_t;
static int select_chunk_on_nothing_can_match(match_callbacks_t* ctx_base, Eterm* ret)
{
select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
*ret = (ctx->chunk_size > 0 ? am_EOT : NIL);
return DB_ERROR_NONE;
}
static int select_chunk_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
HashDbTerm*** current_ptr_ptr,
Eterm match_res)
{
select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
if (is_value(match_res)) {
ctx->match_list = CONS(ctx->hp, match_res, ctx->match_list);
return 1;
}
return 0;
}
static int select_chunk_on_loop_ended(match_callbacks_t* ctx_base,
Sint slot_ix, Sint got,
Sint iterations_left, Binary** mpp,
Eterm* ret)
{
select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
Eterm mpb;
if (iterations_left == MAX_SELECT_CHUNK_ITERATIONS) {
/* We didn't get to iterate a single time, which means EOT */
ASSERT(ctx->match_list == NIL);
*ret = (ctx->chunk_size > 0 ? am_EOT : NIL);
return DB_ERROR_NONE;
}
else {
ASSERT(iterations_left < MAX_SELECT_CHUNK_ITERATIONS);
BUMP_REDS(ctx->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left);
if (ctx->chunk_size) {
Eterm continuation;
Eterm rest = NIL;
Sint rest_size = 0;
if (got > ctx->chunk_size) { /* Split list in return value and 'rest' */
Eterm tmp = ctx->match_list;
rest = ctx->match_list;
while (got-- > ctx->chunk_size + 1) {
tmp = CDR(list_val(tmp));
++rest_size;
}
++rest_size;
ctx->match_list = CDR(list_val(tmp));
CDR(list_val(tmp)) = NIL; /* Destructive, the list has never
been in 'user space' */
}
if (rest != NIL || slot_ix >= 0) { /* Need more calls */
Eterm tid = ctx->tid;
ctx->hp = HAllocX(ctx->p,
3 + 7 + ERTS_MAGIC_REF_THING_SIZE,
ERTS_MAGIC_REF_THING_SIZE);
mpb = erts_db_make_match_prog_ref(ctx->p, *mpp, &ctx->hp);
if (is_atom(tid))
tid = erts_db_make_tid(ctx->p,
&ctx->tb->common);
continuation = TUPLE6(
ctx->hp,
tid,
make_small(slot_ix),
make_small(ctx->chunk_size),
mpb, rest,
make_small(rest_size));
*mpp = NULL; /* Otherwise the caller will destroy it */
ctx->hp += 7;
*ret = TUPLE2(ctx->hp, ctx->match_list, continuation);
return DB_ERROR_NONE;
} else { /* All data is exhausted */
if (ctx->match_list != NIL) { /* No more data to search but still a
result to return to the caller */
ctx->hp = HAlloc(ctx->p, 3);
*ret = TUPLE2(ctx->hp, ctx->match_list, am_EOT);
return DB_ERROR_NONE;
} else { /* Reached the end of the ttable with no data to return */
*ret = am_EOT;
return DB_ERROR_NONE;
}
}
}
*ret = ctx->match_list;
return DB_ERROR_NONE;
}
}
static int select_chunk_on_trap(match_callbacks_t* ctx_base,
Sint slot_ix, Sint got,
Binary** mpp, Eterm* ret)
{
select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
Eterm mpb;
Eterm continuation;
Eterm* hp;
BUMP_ALL_REDS(ctx->p);
if (ctx->prev_continuation_tptr == NULL) {
Eterm tid = ctx->tid;
/* First time we're trapping */
hp = HAllocX(ctx->p, 7 + ERTS_MAGIC_REF_THING_SIZE,
ERTS_MAGIC_REF_THING_SIZE);
if (is_atom(tid))
tid = erts_db_make_tid(ctx->p, &ctx->tb->common);
mpb = erts_db_make_match_prog_ref(ctx->p, *mpp, &hp);
continuation = TUPLE6(
hp,
tid,
make_small(slot_ix),
make_small(ctx->chunk_size),
mpb,
ctx->match_list,
make_small(got));
*mpp = NULL; /* otherwise the caller will destroy it */
}
else {
/* Not the first time we're trapping; reuse continuation terms */
hp = HAlloc(ctx->p, 7);
continuation = TUPLE6(
hp,
ctx->prev_continuation_tptr[1],
make_small(slot_ix),
ctx->prev_continuation_tptr[3],
ctx->prev_continuation_tptr[4],
ctx->match_list,
make_small(got));
}
ERTS_BIF_PREP_TRAP1(*ret, &ets_select_continue_exp, ctx->p,
continuation);
return DB_ERROR_NONE;
}
static int db_select_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern,
int reverse, Eterm *ret)
{
return db_select_chunk_hash(p, tbl, tid, pattern, 0, reverse, ret);
}
static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Sint chunk_size,
int reverse, Eterm *ret)
{
select_chunk_context_t ctx;
ctx.base.on_nothing_can_match = select_chunk_on_nothing_can_match;
ctx.base.on_match_res = select_chunk_on_match_res;
ctx.base.on_loop_ended = select_chunk_on_loop_ended;
ctx.base.on_trap = select_chunk_on_trap,
ctx.p = p;
ctx.tb = &tbl->hash;
ctx.tid = tid;
ctx.hp = NULL;
ctx.chunk_size = chunk_size;
ctx.match_list = NIL;
ctx.prev_continuation_tptr = NULL;
return match_traverse(
ctx.p, ctx.tb,
pattern, NULL,
ctx.chunk_size,
MAX_SELECT_CHUNK_ITERATIONS,
&ctx.hp, 0,
&ctx.base, ret);
}
/*
*
* select_continue match traversal
*
*/
static
int select_chunk_continue_on_loop_ended(match_callbacks_t* ctx_base,
Sint slot_ix, Sint got,
Sint iterations_left, Binary** mpp,
Eterm* ret)
{
select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
Eterm continuation;
Eterm rest = NIL;
Eterm* hp;
ASSERT(iterations_left <= MAX_SELECT_CHUNK_ITERATIONS);
BUMP_REDS(ctx->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left);
if (ctx->chunk_size) {
Sint rest_size = 0;
if (got > ctx->chunk_size) {
/* Cannot write destructively here,
the list may have
been in user space */
hp = HAlloc(ctx->p, (got - ctx->chunk_size) * 2);
while (got-- > ctx->chunk_size) {
rest = CONS(hp, CAR(list_val(ctx->match_list)), rest);
hp += 2;
ctx->match_list = CDR(list_val(ctx->match_list));
++rest_size;
}
}
if (rest != NIL || slot_ix >= 0) {
hp = HAlloc(ctx->p, 3 + 7);
continuation = TUPLE6(
hp,
ctx->prev_continuation_tptr[1],
make_small(slot_ix),
ctx->prev_continuation_tptr[3],
ctx->prev_continuation_tptr[4],
rest,
make_small(rest_size));
hp += 7;
*ret = TUPLE2(hp, ctx->match_list, continuation);
return DB_ERROR_NONE;
} else {
if (ctx->match_list != NIL) {
hp = HAlloc(ctx->p, 3);
*ret = TUPLE2(hp, ctx->match_list, am_EOT);
return DB_ERROR_NONE;
} else {
*ret = am_EOT;
return DB_ERROR_NONE;
}
}
}
*ret = ctx->match_list;
return DB_ERROR_NONE;
}
/*
* This is called when select traps
*/
static int db_select_continue_hash(Process* p, DbTable* tbl, Eterm continuation,
Eterm* ret)
{
select_chunk_context_t ctx;
Eterm* tptr;
Eterm tid;
Binary* mp;
Sint got;
Sint slot_ix;
Sint chunk_size;
Eterm match_list;
Sint iterations_left = MAX_SELECT_CHUNK_ITERATIONS;
/* Decode continuation. We know it's a tuple but not the arity or anything else */
ASSERT(is_tuple(continuation));
tptr = tuple_val(continuation);
if (arityval(*tptr) != 6)
goto badparam;
if (!is_small(tptr[2]) || !is_small(tptr[3]) ||
!(is_list(tptr[5]) || tptr[5] == NIL) || !is_small(tptr[6]))
goto badparam;
if ((chunk_size = signed_val(tptr[3])) < 0)
goto badparam;
mp = erts_db_get_match_prog_binary(tptr[4]);
if (mp == NULL)
goto badparam;
if ((got = signed_val(tptr[6])) < 0)
goto badparam;
tid = tptr[1];
slot_ix = signed_val(tptr[2]);
match_list = tptr[5];
/* Proceed */
ctx.base.on_match_res = select_chunk_on_match_res;
ctx.base.on_loop_ended = select_chunk_continue_on_loop_ended;
ctx.base.on_trap = select_chunk_on_trap;
ctx.p = p;
ctx.tb = &tbl->hash;
ctx.tid = tid;
ctx.hp = NULL;
ctx.chunk_size = chunk_size;
ctx.match_list = match_list;
ctx.prev_continuation_tptr = tptr;
return match_traverse_continue(
ctx.p, ctx.tb, ctx.chunk_size,
iterations_left, &ctx.hp, slot_ix, got, &mp, 0,
&ctx.base, ret);
badparam:
*ret = NIL;
return DB_ERROR_BADPARAM;
}
#undef MAX_SELECT_CHUNK_ITERATIONS
/*
*
* select_count match traversal
*
*/
#define MAX_SELECT_COUNT_ITERATIONS 1000
typedef struct {
match_callbacks_t base;
Process* p;
DbTableHash* tb;
Eterm tid;
Eterm* prev_continuation_tptr;
} select_count_context_t;
static int select_count_on_nothing_can_match(match_callbacks_t* ctx_base,
Eterm* ret)
{
*ret = make_small(0);
return DB_ERROR_NONE;
}
static int select_count_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
HashDbTerm*** current_ptr_ptr,
Eterm match_res)
{
return (match_res == am_true);
}
static int select_count_on_loop_ended(match_callbacks_t* ctx_base,
Sint slot_ix, Sint got,
Sint iterations_left, Binary** mpp,
Eterm* ret)
{
select_count_context_t* ctx = (select_count_context_t*) ctx_base;
ASSERT(iterations_left <= MAX_SELECT_COUNT_ITERATIONS);
BUMP_REDS(ctx->p, MAX_SELECT_COUNT_ITERATIONS - iterations_left);
*ret = erts_make_integer(got, ctx->p);
return DB_ERROR_NONE;
}
static int select_count_on_trap(match_callbacks_t* ctx_base,
Sint slot_ix, Sint got,
Binary** mpp, Eterm* ret)
{
select_count_context_t* ctx = (select_count_context_t*) ctx_base;
return on_simple_trap(
&ets_select_count_continue_exp,
ctx->p,
ctx->tb,
ctx->tid,
ctx->prev_continuation_tptr,
slot_ix, got, mpp, ret);
}
static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
select_count_context_t ctx;
Sint iterations_left = MAX_SELECT_COUNT_ITERATIONS;
Sint chunk_size = 0;
ctx.base.on_nothing_can_match = select_count_on_nothing_can_match;
ctx.base.on_match_res = select_count_on_match_res;
ctx.base.on_loop_ended = select_count_on_loop_ended;
ctx.base.on_trap = select_count_on_trap;
ctx.p = p;
ctx.tb = &tbl->hash;
ctx.tid = tid;
ctx.prev_continuation_tptr = NULL;
return match_traverse(
ctx.p, ctx.tb,
pattern, NULL,
chunk_size, iterations_left, NULL, 0,
&ctx.base, ret);
}
/*
* This is called when select_count traps
*/
static int db_select_count_continue_hash(Process* p, DbTable* tbl,
Eterm continuation, Eterm* ret)
{
select_count_context_t ctx;
Eterm* tptr;
Eterm tid;
Binary* mp;
Sint got;
Sint slot_ix;
Sint chunk_size = 0;
*ret = NIL;
if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
*ret = NIL;
return DB_ERROR_BADPARAM;
}
ctx.base.on_match_res = select_count_on_match_res;
ctx.base.on_loop_ended = select_count_on_loop_ended;
ctx.base.on_trap = select_count_on_trap;
ctx.p = p;
ctx.tb = &tbl->hash;
ctx.tid = tid;
ctx.prev_continuation_tptr = tptr;
return match_traverse_continue(
ctx.p, ctx.tb, chunk_size,
MAX_SELECT_COUNT_ITERATIONS,
NULL, slot_ix, got, &mp, 0,
&ctx.base, ret);
}
#undef MAX_SELECT_COUNT_ITERATIONS
/*
*
* select_delete match traversal
*
*/
#define MAX_SELECT_DELETE_ITERATIONS 1000
typedef struct {
match_callbacks_t base;
Process* p;
DbTableHash* tb;
Eterm tid;
Eterm* prev_continuation_tptr;
erts_aint_t fixated_by_me;
Uint last_pseudo_delete;
HashDbTerm* free_us;
} select_delete_context_t;
static int select_delete_on_nothing_can_match(match_callbacks_t* ctx_base,
Eterm* ret)
{
*ret = make_small(0);
return DB_ERROR_NONE;
}
static int select_delete_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
HashDbTerm*** current_ptr_ptr,
Eterm match_res)
{
HashDbTerm** current_ptr = *current_ptr_ptr;
select_delete_context_t* ctx = (select_delete_context_t*) ctx_base;
HashDbTerm* del;
if (match_res != am_true)
return 0;
if (NFIXED(ctx->tb) > ctx->fixated_by_me) { /* fixated by others? */
if (slot_ix != ctx->last_pseudo_delete) {
if (!add_fixed_deletion(ctx->tb, slot_ix, ctx->fixated_by_me))
goto do_erase;
ctx->last_pseudo_delete = slot_ix;
}
(*current_ptr)->pseudo_deleted = 1;
}
else {
do_erase:
del = *current_ptr;
*current_ptr = (*current_ptr)->next; // replace pointer to term using next
del->next = ctx->free_us;
ctx->free_us = del;
}
erts_atomic_dec_nob(&ctx->tb->common.nitems);
return 1;
}
static int select_delete_on_loop_ended(match_callbacks_t* ctx_base,
Sint slot_ix, Sint got,
Sint iterations_left, Binary** mpp,
Eterm* ret)
{
select_delete_context_t* ctx = (select_delete_context_t*) ctx_base;
free_term_list(ctx->tb, ctx->free_us);
ctx->free_us = NULL;
ASSERT(iterations_left <= MAX_SELECT_DELETE_ITERATIONS);
BUMP_REDS(ctx->p, MAX_SELECT_DELETE_ITERATIONS - iterations_left);
if (got) {
try_shrink(ctx->tb);
}
*ret = erts_make_integer(got, ctx->p);
return DB_ERROR_NONE;
}
static int select_delete_on_trap(match_callbacks_t* ctx_base,
Sint slot_ix, Sint got,
Binary** mpp, Eterm* ret)
{
select_delete_context_t* ctx = (select_delete_context_t*) ctx_base;
free_term_list(ctx->tb, ctx->free_us);
ctx->free_us = NULL;
return on_simple_trap(
&ets_select_delete_continue_exp,
ctx->p,
ctx->tb,
ctx->tid,
ctx->prev_continuation_tptr,
slot_ix, got, mpp, ret);
}
static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
select_delete_context_t ctx;
Sint chunk_size = 0;
ctx.base.on_nothing_can_match = select_delete_on_nothing_can_match;
ctx.base.on_match_res = select_delete_on_match_res;
ctx.base.on_loop_ended = select_delete_on_loop_ended;
ctx.base.on_trap = select_delete_on_trap;
ctx.p = p;
ctx.tb = &tbl->hash;
ctx.tid = tid;
ctx.prev_continuation_tptr = NULL;
ctx.fixated_by_me = ctx.tb->common.is_thread_safe ? 0 : 1; /* TODO: something nicer */
ctx.last_pseudo_delete = (Uint) -1;
ctx.free_us = NULL;
return match_traverse(
ctx.p, ctx.tb,
pattern, NULL,
chunk_size,
MAX_SELECT_DELETE_ITERATIONS, NULL, 1,
&ctx.base, ret);
}
/*
* This is called when select_delete traps
*/
static int db_select_delete_continue_hash(Process* p, DbTable* tbl,
Eterm continuation, Eterm* ret)
{
select_delete_context_t ctx;
Eterm* tptr;
Eterm tid;
Binary* mp;
Sint got;
Sint slot_ix;
Sint chunk_size = 0;
if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
*ret = NIL;
return DB_ERROR_BADPARAM;
}
ctx.base.on_match_res = select_delete_on_match_res;
ctx.base.on_loop_ended = select_delete_on_loop_ended;
ctx.base.on_trap = select_delete_on_trap;
ctx.p = p;
ctx.tb = &tbl->hash;
ctx.tid = tid;
ctx.prev_continuation_tptr = tptr;
ctx.fixated_by_me = ONLY_WRITER(p, ctx.tb) ? 0 : 1; /* TODO: something nicer */
ctx.last_pseudo_delete = (Uint) -1;
ctx.free_us = NULL;
return match_traverse_continue(
ctx.p, ctx.tb, chunk_size,
MAX_SELECT_DELETE_ITERATIONS,
NULL, slot_ix, got, &mp, 1,
&ctx.base, ret);
}
#undef MAX_SELECT_DELETE_ITERATIONS
/*
*
* select_replace match traversal
*
*/
#define MAX_SELECT_REPLACE_ITERATIONS 1000
typedef struct {
match_callbacks_t base;
Process* p;
DbTableHash* tb;
Eterm tid;
Eterm* prev_continuation_tptr;
} select_replace_context_t;
static int select_replace_on_nothing_can_match(match_callbacks_t* ctx_base,
Eterm* ret)
{
*ret = make_small(0);
return DB_ERROR_NONE;
}
static int select_replace_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
HashDbTerm*** current_ptr_ptr,
Eterm match_res)
{
select_replace_context_t* ctx = (select_replace_context_t*) ctx_base;
DbTableHash* tb = ctx->tb;
HashDbTerm* new;
HashDbTerm* next;
HashValue hval;
if (is_value(match_res)) {
#ifdef DEBUG
Eterm key = db_getkey(tb->common.keypos, match_res);
ASSERT(is_value(key));
ASSERT(eq(key, GETKEY(tb, (**current_ptr_ptr)->dbterm.tpl)));
#endif
next = (**current_ptr_ptr)->next;
hval = (**current_ptr_ptr)->hvalue;
new = new_dbterm(tb, match_res);
new->next = next;
new->hvalue = hval;
new->pseudo_deleted = 0;
free_term(tb, **current_ptr_ptr);
**current_ptr_ptr = new; /* replace 'next' pointer in previous object */
*current_ptr_ptr = &((**current_ptr_ptr)->next); /* advance to next object */
return 1;
}
return 0;
}
static int select_replace_on_loop_ended(match_callbacks_t* ctx_base, Sint slot_ix,
Sint got, Sint iterations_left,
Binary** mpp, Eterm* ret)
{
select_replace_context_t* ctx = (select_replace_context_t*) ctx_base;
ASSERT(iterations_left <= MAX_SELECT_REPLACE_ITERATIONS);
/* the more objects we've replaced, the more reductions we've consumed */
BUMP_REDS(ctx->p,
MIN(MAX_SELECT_REPLACE_ITERATIONS * 2,
(MAX_SELECT_REPLACE_ITERATIONS - iterations_left) + (int)got));
*ret = erts_make_integer(got, ctx->p);
return DB_ERROR_NONE;
}
static int select_replace_on_trap(match_callbacks_t* ctx_base,
Sint slot_ix, Sint got,
Binary** mpp, Eterm* ret)
{
select_replace_context_t* ctx = (select_replace_context_t*) ctx_base;
return on_simple_trap(
&ets_select_replace_continue_exp,
ctx->p,
ctx->tb,
ctx->tid,
ctx->prev_continuation_tptr,
slot_ix, got, mpp, ret);
}
static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret)
{
select_replace_context_t ctx;
Sint chunk_size = 0;
/* Bag implementation presented both semantic consistency and performance issues,
* unsupported for now
*/
ASSERT(!(tbl->hash.common.status & DB_BAG));
ctx.base.on_nothing_can_match = select_replace_on_nothing_can_match;
ctx.base.on_match_res = select_replace_on_match_res;
ctx.base.on_loop_ended = select_replace_on_loop_ended;
ctx.base.on_trap = select_replace_on_trap;
ctx.p = p;
ctx.tb = &tbl->hash;
ctx.tid = tid;
ctx.prev_continuation_tptr = NULL;
return match_traverse(
ctx.p, ctx.tb,
pattern, db_match_keeps_key,
chunk_size,
MAX_SELECT_REPLACE_ITERATIONS, NULL, 1,
&ctx.base, ret);
}
/*
* This is called when select_replace traps
*/
static int db_select_replace_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret)
{
select_replace_context_t ctx;
Eterm* tptr;
Eterm tid ;
Binary* mp;
Sint got;
Sint slot_ix;
Sint chunk_size = 0;
*ret = NIL;
if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
*ret = NIL;
return DB_ERROR_BADPARAM;
}
/* Proceed */
ctx.base.on_match_res = select_replace_on_match_res;
ctx.base.on_loop_ended = select_replace_on_loop_ended;
ctx.base.on_trap = select_replace_on_trap;
ctx.p = p;
ctx.tb = &tbl->hash;
ctx.tid = tid;
ctx.prev_continuation_tptr = tptr;
return match_traverse_continue(
ctx.p, ctx.tb, chunk_size,
MAX_SELECT_REPLACE_ITERATIONS,
NULL, slot_ix, got, &mp, 1,
&ctx.base, ret);
}
static int db_take_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
HashDbTerm **bp, *b;
HashDbTerm *free_us = NULL;
HashValue hval = MAKE_HASH(key);
erts_rwmtx_t *lck = WLOCK_HASH(tb, hval);
int ix = hash_to_ix(tb, hval);
int nitems_diff = 0;
*ret = NIL;
for (bp = &BUCKET(tb, ix), b = *bp; b; bp = &b->next, b = b->next) {
if (has_live_key(tb, b, key, hval)) {
HashDbTerm *bend;
*ret = get_term_list(p, tb, key, hval, b, &bend);
while (b != bend) {
--nitems_diff;
if (nitems_diff == -1 && IS_FIXED(tb)
&& add_fixed_deletion(tb, ix, 0)) {
/* Pseudo remove (no need to keep several of same key) */
bp = &b->next;
b->pseudo_deleted = 1;
b = b->next;
} else {
HashDbTerm* next = b->next;
b->next = free_us;
free_us = b;
b = *bp = next;
}
}
break;
}
}
WUNLOCK_HASH(lck);
if (nitems_diff) {
erts_atomic_add_nob(&tb->common.nitems, nitems_diff);
try_shrink(tb);
}
free_term_list(tb, free_us);
return DB_ERROR_NONE;
}
/*
** Other interface routines (not directly coupled to one bif)
*/
void db_initialize_hash(void)
{
}
static SWord db_mark_all_deleted_hash(DbTable *tbl, SWord reds)
{
const int LOOPS_PER_REDUCTION = 8;
DbTableHash *tb = &tbl->hash;
FixedDeletion* fixdel;
SWord loops = reds * LOOPS_PER_REDUCTION;
int i;
ERTS_LC_ASSERT(IS_TAB_WLOCKED(tb));
fixdel = (FixedDeletion*) erts_atomic_read_nob(&tb->fixdel);
if (fixdel && fixdel->trap) {
/* Continue after trap */
ASSERT(fixdel->all);
ASSERT(fixdel->slot < NACTIVE(tb));
i = fixdel->slot;
}
else {
/* First call */
fixdel = erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL,
(DbTable *) tb,
sizeof(FixedDeletion));
ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion));
link_fixdel(tb, fixdel, 0);
i = 0;
}
do {
HashDbTerm* b;
for (b = BUCKET(tb,i); b; b = b->next)
b->pseudo_deleted = 1;
} while (++i < NACTIVE(tb) && --loops > 0);
if (i < NACTIVE(tb)) {
/* Yield */
fixdel->slot = i;
fixdel->all = 0;
fixdel->trap = 1;
return -1;
}
fixdel->slot = NACTIVE(tb) - 1;
fixdel->all = 1;
fixdel->trap = 0;
erts_atomic_set_nob(&tb->common.nitems, 0);
return loops < 0 ? 0 : loops / LOOPS_PER_REDUCTION;
}
/* Display hash table contents (for dump) */
static void db_print_hash(fmtfn_t to, void *to_arg, int show, DbTable *tbl)
{
DbTableHash *tb = &tbl->hash;
DbHashStats stats;
int i;
erts_print(to, to_arg, "Buckets: %d\n", NACTIVE(tb));
i = tbl->common.is_thread_safe;
/* If crash dumping we set table to thread safe in order to
avoid taking any locks */
if (ERTS_IS_CRASH_DUMPING)
tbl->common.is_thread_safe = 1;
db_calc_stats_hash(&tbl->hash, &stats);
tbl->common.is_thread_safe = i;
erts_print(to, to_arg, "Chain Length Avg: %f\n", stats.avg_chain_len);
erts_print(to, to_arg, "Chain Length Max: %d\n", stats.max_chain_len);
erts_print(to, to_arg, "Chain Length Min: %d\n", stats.min_chain_len);
erts_print(to, to_arg, "Chain Length Std Dev: %f\n",
stats.std_dev_chain_len);
erts_print(to, to_arg, "Chain Length Expected Std Dev: %f\n",
stats.std_dev_expected);
if (IS_FIXED(tb))
erts_print(to, to_arg, "Fixed: %d\n", stats.kept_items);
else
erts_print(to, to_arg, "Fixed: false\n");
if (show) {
for (i = 0; i < NACTIVE(tb); i++) {
HashDbTerm* list = BUCKET(tb,i);
if (list == NULL)
continue;
erts_print(to, to_arg, "%d: [", i);
while(list != 0) {
if (is_pseudo_deleted(list))
erts_print(to, to_arg, "*");
if (tb->common.compress) {
Eterm key = GETKEY(tb, list->dbterm.tpl);
erts_print(to, to_arg, "key=%T", key);
}
else {
Eterm obj = make_tuple(list->dbterm.tpl);
erts_print(to, to_arg, "%T", obj);
}
if (list->next != 0)
erts_print(to, to_arg, ",");
list = list->next;
}
erts_print(to, to_arg, "]\n");
}
}
}
static int db_free_empty_table_hash(DbTable *tbl)
{
ASSERT(NITEMS(tbl) == 0);
while (db_free_table_continue_hash(tbl, ERTS_SWORD_MAX) < 0)
;
return 0;
}
static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds)
{
DbTableHash *tb = &tbl->hash;
FixedDeletion* fixdel = (FixedDeletion*) erts_atomic_read_acqb(&tb->fixdel);
ERTS_LC_ASSERT(IS_TAB_WLOCKED(tb) || (tb->common.status & DB_DELETE));
while (fixdel != NULL) {
FixedDeletion *fx = fixdel;
fixdel = fx->next;
erts_db_free(ERTS_ALC_T_DB_FIX_DEL,
(DbTable *) tb,
(void *) fx,
sizeof(FixedDeletion));
ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion));
if (--reds < 0) {
erts_atomic_set_relb(&tb->fixdel, (erts_aint_t)fixdel);
return reds; /* Not done */
}
}
erts_atomic_set_relb(&tb->fixdel, (erts_aint_t)NULL);
while(tb->nslots != 0) {
reds -= EXT_SEGSZ/64 + free_seg(tb, 1);
/*
* If we have done enough work, get out here.
*/
if (reds < 0) {
return reds; /* Not done */
}
}
if (tb->locks != NULL) {
int i;
for (i=0; i<DB_HASH_LOCK_CNT; ++i) {
erts_rwmtx_destroy(GET_LOCK(tb,i));
}
erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb,
(void*)tb->locks, sizeof(DbTableHashFineLocks));
tb->locks = NULL;
}
ASSERT(erts_atomic_read_nob(&tb->common.memory_size) == sizeof(DbTable));
return reds; /* Done */
}
/*
** Utility routines. (static)
*/
/*
** For the select functions, analyzes the pattern and determines which
** slots should be searched. Also compiles the match program
*/
static int analyze_pattern(DbTableHash *tb, Eterm pattern,
extra_match_validator_t extra_validator, /* Optional callback */
struct mp_info *mpi)
{
Eterm *ptpl;
Eterm lst, tpl, ttpl;
Eterm *matches,*guards, *bodies;
Eterm sbuff[30];
Eterm *buff = sbuff;
Eterm key = NIL;
HashValue hval = NIL;
int num_heads = 0;
int i;
mpi->lists = mpi->dlists;
mpi->num_lists = 0;
mpi->key_given = 1;
mpi->something_can_match = 0;
mpi->mp = NULL;
for (lst = pattern; is_list(lst); lst = CDR(list_val(lst)))
++num_heads;
if (lst != NIL) {/* proper list... */
return DB_ERROR_BADPARAM;
}
if (num_heads > 10) {
buff = erts_alloc(ERTS_ALC_T_DB_TMP, sizeof(Eterm) * num_heads * 3);
mpi->lists = erts_alloc(ERTS_ALC_T_DB_SEL_LIST,
sizeof(*(mpi->lists)) * num_heads);
}
matches = buff;
guards = buff + num_heads;
bodies = buff + (num_heads * 2);
i = 0;
for(lst = pattern; is_list(lst); lst = CDR(list_val(lst))) {
Eterm match;
Eterm guard;
Eterm body;
ttpl = CAR(list_val(lst));
if (!is_tuple(ttpl)) {
if (buff != sbuff) {
erts_free(ERTS_ALC_T_DB_TMP, buff);
}
return DB_ERROR_BADPARAM;
}
ptpl = tuple_val(ttpl);
if (ptpl[0] != make_arityval(3U)) {
if (buff != sbuff) {
erts_free(ERTS_ALC_T_DB_TMP, buff);
}
return DB_ERROR_BADPARAM;
}
matches[i] = match = tpl = ptpl[1];
guards[i] = guard = ptpl[2];
bodies[i] = body = ptpl[3];
if(extra_validator != NULL && !extra_validator(tb->common.keypos, match, guard, body)) {
if (buff != sbuff) {
erts_free(ERTS_ALC_T_DB_TMP, buff);
}
return DB_ERROR_BADPARAM;
}
if (!is_list(body) || CDR(list_val(body)) != NIL ||
CAR(list_val(body)) != am_DollarUnderscore) {
}
++i;
if (!(mpi->key_given)) {
continue;
}
if (tpl == am_Underscore || db_is_variable(tpl) != -1) {
(mpi->key_given) = 0;
(mpi->something_can_match) = 1;
} else {
key = db_getkey(tb->common.keypos, tpl);
if (is_value(key)) {
if (!db_has_variable(key)) { /* Bound key */
int ix, search_slot;
HashDbTerm** bp;
erts_rwmtx_t* lck;
hval = MAKE_HASH(key);
lck = RLOCK_HASH(tb,hval);
ix = hash_to_ix(tb, hval);
bp = &BUCKET(tb,ix);
if (lck == NULL) {
search_slot = search_list(tb,key,hval,*bp) != NULL;
} else {
/* No point to verify if key exist now as there may be
concurrent inserters/deleters anyway */
RUNLOCK_HASH(lck);
search_slot = 1;
}
if (search_slot) {
int j;
for (j=0; ; ++j) {
if (j == mpi->num_lists) {
mpi->lists[mpi->num_lists].bucket = bp;
mpi->lists[mpi->num_lists].ix = ix;
++mpi->num_lists;
break;
}
if (mpi->lists[j].bucket == bp) {
ASSERT(mpi->lists[j].ix == ix);
break;
}
ASSERT(mpi->lists[j].ix != ix);
}
mpi->something_can_match = 1;
}
} else {
mpi->key_given = 0;
mpi->something_can_match = 1;
}
}
}
}
/*
* It would be nice not to compile the match_spec if nothing could match,
* but then the select calls would not fail like they should on bad
* match specs that happen to specify non existent keys etc.
*/
if ((mpi->mp = db_match_compile(matches, guards, bodies,
num_heads, DCOMP_TABLE, NULL))
== NULL) {
if (buff != sbuff) {
erts_free(ERTS_ALC_T_DB_TMP, buff);
}
return DB_ERROR_BADPARAM;
}
if (buff != sbuff) {
erts_free(ERTS_ALC_T_DB_TMP, buff);
}
return DB_ERROR_NONE;
}
static struct ext_segtab* alloc_ext_segtab(DbTableHash* tb, unsigned seg_ix)
{
struct segment** old_segtab = SEGTAB(tb);
int nsegs = 0;
struct ext_segtab* est;
ASSERT(seg_ix >= NSEG_1);
switch (seg_ix) {
case NSEG_1: nsegs = NSEG_2; break;
default: nsegs = seg_ix + NSEG_INC; break;
}
ASSERT(nsegs > tb->nsegs);
est = (struct ext_segtab*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
(DbTable *) tb,
SIZEOF_EXT_SEGTAB(nsegs));
est->nsegs = nsegs;
est->prev_segtab = old_segtab;
est->prev_nsegs = tb->nsegs;
sys_memcpy(est->segtab, old_segtab, tb->nsegs*sizeof(struct segment*));
#ifdef DEBUG
sys_memset(&est->segtab[seg_ix], 0, (nsegs-seg_ix)*sizeof(struct segment*));
#endif
return est;
}
/* Extend table with one new segment
*/
static void alloc_seg(DbTableHash *tb)
{
int seg_ix = SLOT_IX_TO_SEG_IX(tb->nslots);
struct segment** segtab;
ASSERT(seg_ix > 0);
if (seg_ix == tb->nsegs) { /* New segtab needed */
struct ext_segtab* est = alloc_ext_segtab(tb, seg_ix);
SET_SEGTAB(tb, est->segtab);
tb->nsegs = est->nsegs;
}
ASSERT(seg_ix < tb->nsegs);
segtab = SEGTAB(tb);
segtab[seg_ix] = (struct segment*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
(DbTable *) tb,
SIZEOF_SEGMENT(EXT_SEGSZ));
sys_memset(segtab[seg_ix], 0, SIZEOF_SEGMENT(EXT_SEGSZ));
tb->nslots += EXT_SEGSZ;
}
static void dealloc_ext_segtab(void* lop_data)
{
struct ext_segtab* est = (struct ext_segtab*) lop_data;
erts_free(ERTS_ALC_T_DB_SEG, est);
}
/* Shrink table by freeing the top segment
** free_records: 1=free any records in segment, 0=assume segment is empty
*/
static int free_seg(DbTableHash *tb, int free_records)
{
const int seg_ix = SLOT_IX_TO_SEG_IX(tb->nslots) - 1;
struct segment** const segtab = SEGTAB(tb);
struct segment* const segp = segtab[seg_ix];
Uint seg_sz;
int nrecords = 0;
ASSERT(segp != NULL);
#ifndef DEBUG
if (free_records)
#endif
{
int i = (seg_ix == 0) ? FIRST_SEGSZ : EXT_SEGSZ;
while (i--) {
HashDbTerm* p = segp->buckets[i];
while(p != 0) {
HashDbTerm* nxt = p->next;
ASSERT(free_records); /* segment not empty as assumed? */
free_term(tb, p);
p = nxt;
++nrecords;
}
}
}
if (seg_ix >= NSEG_1) {
struct ext_segtab* est = ErtsContainerStruct_(segtab,struct ext_segtab,segtab);
if (seg_ix == est->prev_nsegs) { /* Dealloc extended segtab */
ASSERT(est->prev_segtab != NULL);
SET_SEGTAB(tb, est->prev_segtab);
tb->nsegs = est->prev_nsegs;
if (!tb->common.is_thread_safe) {
/*
* Table is doing a graceful shrink operation and we must avoid
* deallocating this segtab while it may still be read by other
* threads. Schedule deallocation with thread progress to make
* sure no lingering threads are still hanging in BUCKET macro
* with an old segtab pointer.
*/
Uint sz = SIZEOF_EXT_SEGTAB(est->nsegs);
ASSERT(sz == ERTS_ALC_DBG_BLK_SZ(est));
ERTS_DB_ALC_MEM_UPDATE_(tb, sz, 0);
erts_schedule_thr_prgr_later_cleanup_op(dealloc_ext_segtab,
est,
&est->lop,
sz);
}
else
erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable*)tb, est,
SIZEOF_EXT_SEGTAB(est->nsegs));
}
}
seg_sz = (seg_ix == 0) ? FIRST_SEGSZ : EXT_SEGSZ;
erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb, segp, SIZEOF_SEGMENT(seg_sz));
#ifdef DEBUG
if (seg_ix < tb->nsegs)
SEGTAB(tb)[seg_ix] = NULL;
#endif
tb->nslots -= seg_sz;
ASSERT(tb->nslots >= 0);
return nrecords;
}
/*
** Copy terms from ptr1 until ptr2
** works for ptr1 == ptr2 == 0 => []
** or ptr2 == 0
** sz is either precalculated heap size or 0 if not known
*/
static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2,
Uint sz, DbTableHash* tb)
{
HashDbTerm* ptr;
Eterm list = NIL;
Eterm copy;
Eterm *hp, *hend;
if (!sz) {
ptr = ptr1;
while(ptr != ptr2) {
if (!is_pseudo_deleted(ptr))
sz += ptr->dbterm.size + 2;
ptr = ptr->next;
}
}
hp = HAlloc(p, sz);
hend = hp + sz;
ptr = ptr1;
while(ptr != ptr2) {
if (!is_pseudo_deleted(ptr)) {
copy = db_copy_object_from_ets(&tb->common, &ptr->dbterm, &hp, &MSO(p));
list = CONS(hp, copy, list);
hp += 2;
}
ptr = ptr->next;
}
HRelease(p,hend,hp);
return list;
}
static ERTS_INLINE int
begin_resizing(DbTableHash* tb)
{
if (DB_USING_FINE_LOCKING(tb))
return !erts_atomic_xchg_acqb(&tb->is_resizing, 1);
else
ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock));
return 1;
}
static ERTS_INLINE void
done_resizing(DbTableHash* tb)
{
if (DB_USING_FINE_LOCKING(tb))
erts_atomic_set_relb(&tb->is_resizing, 0);
}
/* Grow table with one or more new buckets.
** Allocate new segment if needed.
*/
static void grow(DbTableHash* tb, int nitems)
{
HashDbTerm** pnext;
HashDbTerm** to_pnext;
HashDbTerm* p;
erts_rwmtx_t* lck;
int nactive;
int from_ix, to_ix;
int szm;
int loop_limit = 5;
do {
if (!begin_resizing(tb))
return; /* already in progress */
nactive = NACTIVE(tb);
if (nitems <= GROW_LIMIT(nactive)) {
goto abort; /* already done (race) */
}
/* Ensure that the slot nactive exists */
if (nactive == tb->nslots) {
/* Time to get a new segment */
ASSERT(((nactive-FIRST_SEGSZ) & EXT_SEGSZ_MASK) == 0);
alloc_seg(tb);
}
ASSERT(nactive < tb->nslots);
szm = erts_atomic_read_nob(&tb->szm);
if (nactive <= szm) {
from_ix = nactive & (szm >> 1);
} else {
ASSERT(nactive == szm+1);
from_ix = 0;
szm = (szm<<1) | 1;
}
to_ix = nactive;
lck = WLOCK_HASH(tb, from_ix);
ERTS_ASSERT(lck == GET_LOCK_MAYBE(tb,to_ix));
/* Now a final double check (with the from_ix lock held)
* that we did not get raced by a table fixer.
*/
if (IS_FIXED(tb)) {
WUNLOCK_HASH(lck);
goto abort;
}
erts_atomic_set_nob(&tb->nactive, ++nactive);
if (from_ix == 0) {
if (DB_USING_FINE_LOCKING(tb))
erts_atomic_set_relb(&tb->szm, szm);
else
erts_atomic_set_nob(&tb->szm, szm);
}
done_resizing(tb);
/* Finally, let's split the bucket. We try to do it in a smart way
to keep link order and avoid unnecessary updates of next-pointers */
pnext = &BUCKET(tb, from_ix);
p = *pnext;
to_pnext = &BUCKET(tb, to_ix);
while (p != NULL) {
if (is_pseudo_deleted(p)) { /* rare but possible with fine locking */
*pnext = p->next;
free_term(tb, p);
p = *pnext;
}
else {
int ix = p->hvalue & szm;
if (ix != from_ix) {
ASSERT(ix == (from_ix ^ ((szm+1)>>1)));
*to_pnext = p;
/* Swap "from" and "to": */
from_ix = ix;
to_pnext = pnext;
}
pnext = &p->next;
p = *pnext;
}
}
*to_pnext = NULL;
WUNLOCK_HASH(lck);
}while (--loop_limit && nitems > GROW_LIMIT(nactive));
return;
abort:
done_resizing(tb);
}
/* Shrink table by joining top bucket.
** Remove top segment if it gets empty.
*/
static void shrink(DbTableHash* tb, int nitems)
{
HashDbTerm** src_bp;
HashDbTerm** dst_bp;
HashDbTerm** bp;
erts_rwmtx_t* lck;
int src_ix, dst_ix, low_szm;
int nactive;
int loop_limit = 5;
do {
if (!begin_resizing(tb))
return; /* already in progress */
nactive = NACTIVE(tb);
if (!(nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive))) {
goto abort; /* already done (race) */
}
src_ix = nactive - 1;
low_szm = erts_atomic_read_nob(&tb->szm) >> 1;
dst_ix = src_ix & low_szm;
ASSERT(dst_ix < src_ix);
ASSERT(nactive > FIRST_SEGSZ);
lck = WLOCK_HASH(tb, dst_ix);
ERTS_ASSERT(lck == GET_LOCK_MAYBE(tb,src_ix));
/* Double check for racing table fixers */
if (IS_FIXED(tb)) {
WUNLOCK_HASH(lck);
goto abort;
}
src_bp = &BUCKET(tb, src_ix);
dst_bp = &BUCKET(tb, dst_ix);
bp = src_bp;
/*
* We join lists by appending "dst" at the end of "src"
* as we must step through "src" anyway to purge pseudo deleted.
*/
while(*bp != NULL) {
if (is_pseudo_deleted(*bp)) {
HashDbTerm* deleted = *bp;
*bp = deleted->next;
free_term(tb, deleted);
} else {
bp = &(*bp)->next;
}
}
*bp = *dst_bp;
*dst_bp = *src_bp;
*src_bp = NULL;
nactive = src_ix;
erts_atomic_set_nob(&tb->nactive, nactive);
if (dst_ix == 0) {
erts_atomic_set_relb(&tb->szm, low_szm);
}
WUNLOCK_HASH(lck);
if (tb->nslots - src_ix >= EXT_SEGSZ) {
free_seg(tb, 0);
}
done_resizing(tb);
} while (--loop_limit
&& nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive));
return;
abort:
done_resizing(tb);
}
/* Search a list of tuples for a matching key */
static HashDbTerm* search_list(DbTableHash* tb, Eterm key,
HashValue hval, HashDbTerm *list)
{
while (list != 0) {
if (has_live_key(tb,list,key,hval))
return list;
list = list->next;
}
return 0;
}
/* This function is called by the next AND the select BIF */
/* It return the next live object in a table, NULL if no more */
/* In-bucket: RLOCKED */
/* Out-bucket: RLOCKED unless NULL */
static HashDbTerm* next_live(DbTableHash *tb, Uint *iptr, erts_rwmtx_t** lck_ptr,
HashDbTerm *list)
{
int i;
ERTS_LC_ASSERT(IS_HASH_RLOCKED(tb,*iptr));
for ( ; list != NULL; list = list->next) {
if (!is_pseudo_deleted(list))
return list;
}
i = *iptr;
while ((i=next_slot(tb, i, lck_ptr)) != 0) {
list = BUCKET(tb,i);
while (list != NULL) {
if (!is_pseudo_deleted(list)) {
*iptr = i;
return list;
}
list = list->next;
}
}
/* *iptr = ??? */
return NULL;
}
static int
db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj,
DbUpdateHandle* handle)
{
DbTableHash *tb = &tbl->hash;
HashValue hval;
HashDbTerm **bp, *b;
erts_rwmtx_t* lck;
int flags = 0;
ASSERT(tb->common.status & DB_SET);
hval = MAKE_HASH(key);
lck = WLOCK_HASH(tb, hval);
bp = &BUCKET(tb, hash_to_ix(tb, hval));
b = *bp;
for (;;) {
if (b == NULL) {
break;
}
if (has_key(tb, b, key, hval)) {
if (!is_pseudo_deleted(b)) {
goto Ldone;
}
break;
}
bp = &b->next;
b = *bp;
}
if (obj == THE_NON_VALUE) {
WUNLOCK_HASH(lck);
return 0;
}
{
Eterm *objp = tuple_val(obj);
int arity = arityval(*objp);
Eterm *htop, *hend;
ASSERT(arity >= tb->common.keypos);
htop = HAlloc(p, arity + 1);
hend = htop + arity + 1;
sys_memcpy(htop, objp, sizeof(Eterm) * (arity + 1));
htop[tb->common.keypos] = key;
obj = make_tuple(htop);
if (b == NULL) {
HashDbTerm *q = new_dbterm(tb, obj);
q->hvalue = hval;
q->pseudo_deleted = 0;
q->next = NULL;
*bp = b = q;
flags |= DB_INC_TRY_GROW;
} else {
HashDbTerm *q, *next = b->next;
ASSERT(is_pseudo_deleted(b));
q = replace_dbterm(tb, b, obj);
q->next = next;
ASSERT(q->hvalue == hval);
q->pseudo_deleted = 0;
*bp = b = q;
erts_atomic_inc_nob(&tb->common.nitems);
}
HRelease(p, hend, htop);
flags |= DB_NEW_OBJECT;
}
Ldone:
handle->tb = tbl;
handle->bp = (void **)bp;
handle->dbterm = &b->dbterm;
handle->flags = flags;
handle->new_size = b->dbterm.size;
handle->lck = lck;
return 1;
}
/* Must be called after call to db_lookup_dbterm
*/
static void
db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle)
{
DbTable* tbl = handle->tb;
DbTableHash *tb = &tbl->hash;
HashDbTerm **bp = (HashDbTerm **) handle->bp;
HashDbTerm *b = *bp;
erts_rwmtx_t* lck = (erts_rwmtx_t*) handle->lck;
HashDbTerm* free_me = NULL;
ERTS_LC_ASSERT(IS_HASH_WLOCKED(tb, lck)); /* locked by db_lookup_dbterm_hash */
ASSERT((&b->dbterm == handle->dbterm) == !(tb->common.compress && handle->flags & DB_MUST_RESIZE));
if (handle->flags & DB_NEW_OBJECT && cret != DB_ERROR_NONE) {
if (IS_FIXED(tb) && add_fixed_deletion(tb, hash_to_ix(tb, b->hvalue),
0)) {
b->pseudo_deleted = 1;
} else {
*bp = b->next;
free_me = b;
}
WUNLOCK_HASH(lck);
erts_atomic_dec_nob(&tb->common.nitems);
try_shrink(tb);
} else {
if (handle->flags & DB_MUST_RESIZE) {
db_finalize_resize(handle, offsetof(HashDbTerm,dbterm));
free_me = b;
}
if (handle->flags & DB_INC_TRY_GROW) {
int nactive;
int nitems = erts_atomic_inc_read_nob(&tb->common.nitems);
WUNLOCK_HASH(lck);
nactive = NACTIVE(tb);
if (nitems > GROW_LIMIT(nactive) && !IS_FIXED(tb)) {
grow(tb, nitems);
}
} else {
WUNLOCK_HASH(lck);
}
}
if (free_me)
free_term(tb, free_me);
#ifdef DEBUG
handle->dbterm = 0;
#endif
return;
}
static SWord db_delete_all_objects_hash(Process* p, DbTable* tbl, SWord reds)
{
if (IS_FIXED(tbl)) {
reds = db_mark_all_deleted_hash(tbl, reds);
} else {
reds = db_free_table_continue_hash(tbl, reds);
if (reds < 0)
return reds;
db_create_hash(p, tbl);
erts_atomic_set_nob(&tbl->hash.common.nitems, 0);
}
return reds;
}
void db_foreach_offheap_hash(DbTable *tbl,
void (*func)(ErlOffHeap *, void *),
void * arg)
{
DbTableHash *tb = &tbl->hash;
HashDbTerm* list;
int i;
int nactive = NACTIVE(tb);
for (i = 0; i < nactive; i++) {
list = BUCKET(tb,i);
while(list != 0) {
ErlOffHeap tmp_offheap;
tmp_offheap.first = list->dbterm.first_oh;
tmp_offheap.overhead = 0;
(*func)(&tmp_offheap, arg);
list->dbterm.first_oh = tmp_offheap.first;
list = list->next;
}
}
}
void db_calc_stats_hash(DbTableHash* tb, DbHashStats* stats)
{
HashDbTerm* b;
erts_rwmtx_t* lck;
int sum = 0;
int sq_sum = 0;
int kept_items = 0;
int ix;
int len;
stats->min_chain_len = INT_MAX;
stats->max_chain_len = 0;
ix = 0;
lck = RLOCK_HASH(tb,ix);
do {
len = 0;
for (b = BUCKET(tb,ix); b!=NULL; b=b->next) {
len++;
if (is_pseudo_deleted(b))
++kept_items;
}
sum += len;
sq_sum += len*len;
if (len < stats->min_chain_len) stats->min_chain_len = len;
if (len > stats->max_chain_len) stats->max_chain_len = len;
ix = next_slot(tb,ix,&lck);
}while (ix);
stats->avg_chain_len = (float)sum / NACTIVE(tb);
stats->std_dev_chain_len = sqrt((sq_sum - stats->avg_chain_len*sum) / NACTIVE(tb));
/* Expected standard deviation from a good uniform hash function,
ie binomial distribution (not taking the linear hashing into acount) */
stats->std_dev_expected = sqrt(stats->avg_chain_len * (1 - 1.0/NACTIVE(tb)));
stats->kept_items = kept_items;
}
/* For testing only */
Eterm erts_ets_hash_sizeof_ext_segtab(void)
{
return make_small(((SIZEOF_EXT_SEGTAB(0)-1) / sizeof(UWord)) + 1);
}
#ifdef ERTS_ENABLE_LOCK_COUNT
void erts_lcnt_enable_db_hash_lock_count(DbTableHash *tb, int enable) {
int i;
if(tb->locks == NULL) {
return;
}
for(i = 0; i < DB_HASH_LOCK_CNT; i++) {
erts_lcnt_ref_t *ref = &tb->locks->lck_vec[i].lck.lcnt;
if(enable) {
erts_lcnt_install_new_lock_info(ref, "db_hash_slot", tb->common.the_name,
ERTS_LOCK_TYPE_RWMUTEX | ERTS_LOCK_FLAGS_CATEGORY_DB);
} else {
erts_lcnt_uninstall(ref);
}
}
}
#endif /* ERTS_ENABLE_LOCK_COUNT */