/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 1998-2016. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* %CopyrightEnd%
*/
/*
** Implementation of unordered ETS tables.
** The tables are implemented as linear dynamic hash tables.
*/
/* SMP:
** The hash table supports two different locking "modes",
** coarse grained and fine grained locking.
**
** Coarse grained locking relies entirely on the caller (erl_db.c) to obtain
** the right kind of lock on the entire table depending on operation (reading
** or writing). No further locking is then done by the table itself.
**
** Fine grained locking is supported by this code to allow concurrent updates
** (and reading) to different parts of the table. This works by keeping one
** rw-mtx for every N'th bucket. Even dynamic growing and shrinking by
** rehashing buckets can be done without exclusive table lock.
**
** A table will support fine grained locking if it is created with flag
** DB_FINE_LOCKED set. The table variable is_thread_safe will then indicate
** if operations need to obtain fine grained locks or not. Some operations
** will for example always use exclusive table lock to guarantee
** a higher level of atomicity.
*/
/* FIXATION:
** Fixating the table, by ets:safe_fixtable or as done by select-operations,
** guarantees two things in current implementation.
** (1) Keys will not *totaly* disappear from the table. A key can thus be used
** as an iterator to find the next key in iteration sequence. Note however
** that this does not mean that (pointers to) table objects are guaranteed
** to be maintained while the table is fixated. A BAG or DBAG may actually
** remove objects as long as there is at least one object left in the table
** with the same key (alive or pseudo-deleted).
** (2) Objects will not be moved between buckets due to table grow/shrink.
** This will guarantee that iterations do not miss keys or get double-hits.
**
** With fine grained locking, a concurrent thread can fixate the table at any
** time. A "dangerous" operation (delete or move) therefore needs to check
** if the table is fixated while write-locking the bucket.
*/
/*
#ifdef DEBUG
#define HARDDEBUG 1
#endif
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "sys.h"
#include "erl_vm.h"
#include "global.h"
#include "erl_process.h"
#include "error.h"
#define ERTS_WANT_DB_INTERNAL__
#include "erl_db.h"
#include "bif.h"
#include "big.h"
#include "export.h"
#include "erl_binary.h"
#include "erl_db_hash.h"
/*
* The following symbols can be manipulated to "tune" the linear hash array
*/
#define GROW_LIMIT(NACTIVE) ((NACTIVE)*1)
#define SHRINK_LIMIT(NACTIVE) ((NACTIVE) / 2)
/*
** We want the first mandatory segment to be small (to reduce minimal footprint)
** and larger extra segments (to reduce number of alloc/free calls).
*/
/* Number of slots in first segment */
#define FIRST_SEGSZ_EXP 8
#define FIRST_SEGSZ (1 << FIRST_SEGSZ_EXP)
#define FIRST_SEGSZ_MASK (FIRST_SEGSZ - 1)
/* Number of slots per extra segment */
#define EXT_SEGSZ_EXP 11
#define EXT_SEGSZ (1 << EXT_SEGSZ_EXP)
#define EXT_SEGSZ_MASK (EXT_SEGSZ-1)
#define NSEG_1 (ErtsSizeofMember(DbTableHash,first_segtab) / sizeof(struct segment*))
#define NSEG_2 256 /* Size of second segment table */
#define NSEG_INC 128 /* Number of segments to grow after that */
#ifdef ERTS_SMP
# define DB_USING_FINE_LOCKING(TB) (((TB))->common.type & DB_FINE_LOCKED)
#else
# define DB_USING_FINE_LOCKING(TB) 0
#endif
#ifdef ETHR_ORDERED_READ_DEPEND
#define SEGTAB(tb) ((struct segment**) erts_smp_atomic_read_nob(&(tb)->segtab))
#else
#define SEGTAB(tb) \
(DB_USING_FINE_LOCKING(tb) \
? ((struct segment**) erts_smp_atomic_read_ddrb(&(tb)->segtab)) \
: ((struct segment**) erts_smp_atomic_read_nob(&(tb)->segtab)))
#endif
#define NACTIVE(tb) ((int)erts_smp_atomic_read_nob(&(tb)->nactive))
#define NITEMS(tb) ((int)erts_smp_atomic_read_nob(&(tb)->common.nitems))
#define SLOT_IX_TO_SEG_IX(i) (((i)+(EXT_SEGSZ-FIRST_SEGSZ)) >> EXT_SEGSZ_EXP)
#define BUCKET(tb, i) SEGTAB(tb)[SLOT_IX_TO_SEG_IX(i)]->buckets[(i) & EXT_SEGSZ_MASK]
/*
* When deleting a table, the number of records to delete.
* Approximate number, because we must delete entire buckets.
*/
#define DELETE_RECORD_LIMIT 10000
/* Calculate slot index from hash value.
** RLOCK_HASH or WLOCK_HASH must be done before.
*/
static ERTS_INLINE Uint hash_to_ix(DbTableHash* tb, HashValue hval)
{
Uint mask = (DB_USING_FINE_LOCKING(tb)
? erts_smp_atomic_read_acqb(&tb->szm)
: erts_smp_atomic_read_nob(&tb->szm));
Uint ix = hval & mask;
if (ix >= erts_smp_atomic_read_nob(&tb->nactive)) {
ix &= mask>>1;
ASSERT(ix < erts_smp_atomic_read_nob(&tb->nactive));
}
return ix;
}
/* Remember a slot containing a pseudo-deleted item (INVALID_HASH)
* Return false if we got raced by unfixing thread
* and the object should be deleted for real.
*/
static ERTS_INLINE int add_fixed_deletion(DbTableHash* tb, int ix,
erts_aint_t fixated_by_me)
{
erts_aint_t was_next;
erts_aint_t exp_next;
FixedDeletion* fixd = (FixedDeletion*) erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL,
(DbTable *) tb,
sizeof(FixedDeletion));
ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion));
fixd->slot = ix;
was_next = erts_smp_atomic_read_acqb(&tb->fixdel);
do { /* Lockless atomic insertion in linked list: */
if (NFIXED(tb) <= fixated_by_me) {
erts_db_free(ERTS_ALC_T_DB_FIX_DEL, (DbTable*)tb,
fixd, sizeof(FixedDeletion));
return 0; /* raced by unfixer */
}
exp_next = was_next;
fixd->next = (FixedDeletion*) exp_next;
was_next = erts_smp_atomic_cmpxchg_mb(&tb->fixdel,
(erts_aint_t) fixd,
exp_next);
}while (was_next != exp_next);
return 1;
}
#define MAX_HASH 0xEFFFFFFFUL
#define INVALID_HASH 0xFFFFFFFFUL
/* optimised version of make_hash (normal case? atomic key) */
#define MAKE_HASH(term) \
((is_atom(term) ? (atom_tab(atom_val(term))->slot.bucket.hvalue) : \
make_internal_hash(term)) % MAX_HASH)
#ifdef ERTS_SMP
# define DB_HASH_LOCK_MASK (DB_HASH_LOCK_CNT-1)
# define GET_LOCK(tb,hval) (&(tb)->locks->lck_vec[(hval) & DB_HASH_LOCK_MASK].lck)
# define GET_LOCK_MAYBE(tb,hval) ((tb)->common.is_thread_safe ? NULL : GET_LOCK(tb,hval))
/* Fine grained read lock */
static ERTS_INLINE erts_smp_rwmtx_t* RLOCK_HASH(DbTableHash* tb, HashValue hval)
{
if (tb->common.is_thread_safe) {
return NULL;
} else {
erts_smp_rwmtx_t* lck = GET_LOCK(tb,hval);
ASSERT(tb->common.type & DB_FINE_LOCKED);
erts_smp_rwmtx_rlock(lck);
return lck;
}
}
/* Fine grained write lock */
static ERTS_INLINE erts_smp_rwmtx_t* WLOCK_HASH(DbTableHash* tb, HashValue hval)
{
if (tb->common.is_thread_safe) {
return NULL;
} else {
erts_smp_rwmtx_t* lck = GET_LOCK(tb,hval);
ASSERT(tb->common.type & DB_FINE_LOCKED);
erts_smp_rwmtx_rwlock(lck);
return lck;
}
}
static ERTS_INLINE void RUNLOCK_HASH(erts_smp_rwmtx_t* lck)
{
if (lck != NULL) {
erts_smp_rwmtx_runlock(lck);
}
}
static ERTS_INLINE void WUNLOCK_HASH(erts_smp_rwmtx_t* lck)
{
if (lck != NULL) {
erts_smp_rwmtx_rwunlock(lck);
}
}
#else /* ERTS_SMP */
# define RLOCK_HASH(tb,hval) NULL
# define WLOCK_HASH(tb,hval) NULL
# define RUNLOCK_HASH(lck) ((void)lck)
# define WUNLOCK_HASH(lck) ((void)lck)
#endif /* ERTS_SMP */
#ifdef ERTS_ENABLE_LOCK_CHECK
# define IFN_EXCL(tb,cmd) (((tb)->common.is_thread_safe) || (cmd))
# define IS_HASH_RLOCKED(tb,hval) IFN_EXCL(tb,erts_smp_lc_rwmtx_is_rlocked(GET_LOCK(tb,hval)))
# define IS_HASH_WLOCKED(tb,lck) IFN_EXCL(tb,erts_smp_lc_rwmtx_is_rwlocked(lck))
# define IS_TAB_WLOCKED(tb) erts_smp_lc_rwmtx_is_rwlocked(&(tb)->common.rwlock)
#else
# define IS_HASH_RLOCKED(tb,hval) (1)
# define IS_HASH_WLOCKED(tb,hval) (1)
# define IS_TAB_WLOCKED(tb) (1)
#endif
/* Iteration helper
** Returns "next" slot index or 0 if EOT reached.
** Slot READ locks updated accordingly, unlocked if EOT.
*/
static ERTS_INLINE Sint next_slot(DbTableHash* tb, Uint ix,
erts_smp_rwmtx_t** lck_ptr)
{
#ifdef ERTS_SMP
ix += DB_HASH_LOCK_CNT;
if (ix < NACTIVE(tb)) return ix;
RUNLOCK_HASH(*lck_ptr);
ix = (ix + 1) & DB_HASH_LOCK_MASK;
if (ix != 0) *lck_ptr = RLOCK_HASH(tb,ix);
return ix;
#else
return (++ix < NACTIVE(tb)) ? ix : 0;
#endif
}
/* Same as next_slot but with WRITE locking */
static ERTS_INLINE Sint next_slot_w(DbTableHash* tb, Uint ix,
erts_smp_rwmtx_t** lck_ptr)
{
#ifdef ERTS_SMP
ix += DB_HASH_LOCK_CNT;
if (ix < NACTIVE(tb)) return ix;
WUNLOCK_HASH(*lck_ptr);
ix = (ix + 1) & DB_HASH_LOCK_MASK;
if (ix != 0) *lck_ptr = WLOCK_HASH(tb,ix);
return ix;
#else
return next_slot(tb,ix,lck_ptr);
#endif
}
#ifndef MIN
#define MIN(X, Y) ((X) < (Y) ? (X) : (Y))
#endif
/*
* Some special binary flags
*/
#define BIN_FLAG_ALL_OBJECTS BIN_FLAG_USR1
static ERTS_INLINE void free_term(DbTableHash *tb, HashDbTerm* p)
{
db_free_term((DbTable*)tb, p, offsetof(HashDbTerm, dbterm));
}
/*
* Local types
*/
struct mp_prefound {
HashDbTerm** bucket;
int ix;
};
struct mp_info {
int all_objects; /* True if complete objects are always
* returned from the match_spec (can use
* copy_shallow on the return value) */
int something_can_match; /* The match_spec is not "impossible" */
int key_given;
struct mp_prefound dlists[10]; /* Default list of "pre-found" buckets */
struct mp_prefound* lists; /* Buckets to search if keys are given,
* = dlists initially */
unsigned num_lists; /* Number of elements in "lists",
* = 0 initially */
Binary *mp; /* The compiled match program */
};
/* A table segment */
struct segment {
HashDbTerm* buckets[1];
};
#define SIZEOF_SEGMENT(N) \
(offsetof(struct segment,buckets) + sizeof(HashDbTerm*)*(N))
/* An extended segment table */
struct ext_segtab {
#ifdef ERTS_SMP
ErtsThrPrgrLaterOp lop;
#endif
struct segment** prev_segtab; /* Used when table is shrinking */
int prev_nsegs; /* Size of prev_segtab */
int nsegs; /* Size of this segtab */
struct segment* segtab[1]; /* The segment table */
};
#define SIZEOF_EXT_SEGTAB(NSEGS) \
(offsetof(struct ext_segtab,segtab) + sizeof(struct segment*)*(NSEGS))
static ERTS_INLINE void SET_SEGTAB(DbTableHash* tb,
struct segment** segtab)
{
if (DB_USING_FINE_LOCKING(tb))
erts_smp_atomic_set_wb(&tb->segtab, (erts_aint_t) segtab);
else
erts_smp_atomic_set_nob(&tb->segtab, (erts_aint_t) segtab);
}
/* Used by select_replace on analyze_pattern */
typedef int (*extra_match_validator_t)(int keypos, Eterm match, Eterm guard, Eterm body);
/*
** Forward decl's (static functions)
*/
static struct ext_segtab* alloc_ext_segtab(DbTableHash* tb, unsigned seg_ix);
static void alloc_seg(DbTableHash *tb);
static int free_seg(DbTableHash *tb, int free_records);
static HashDbTerm* next(DbTableHash *tb, Uint *iptr, erts_smp_rwmtx_t** lck_ptr,
HashDbTerm *list);
static HashDbTerm* search_list(DbTableHash* tb, Eterm key,
HashValue hval, HashDbTerm *list);
static void shrink(DbTableHash* tb, int nitems);
static void grow(DbTableHash* tb, int nitems);
static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2,
Uint sz, DbTableHash*);
static int analyze_pattern(DbTableHash *tb, Eterm pattern,
extra_match_validator_t extra_validator, /* Optional callback */
struct mp_info *mpi);
/*
* Method interface functions
*/
static int db_first_hash(Process *p,
DbTable *tbl,
Eterm *ret);
static int db_next_hash(Process *p,
DbTable *tbl,
Eterm key,
Eterm *ret);
static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret);
static int db_get_element_hash(Process *p, DbTable *tbl,
Eterm key, int ndex, Eterm *ret);
static int db_erase_object_hash(DbTable *tbl, Eterm object,Eterm *ret);
static int db_slot_hash(Process *p, DbTable *tbl,
Eterm slot_term, Eterm *ret);
static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Sint chunk_size,
int reverse, Eterm *ret);
static int db_select_hash(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, int reverse, Eterm *ret);
static int db_select_continue_hash(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret);
static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret);
static int db_select_count_continue_hash(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret);
static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret);
static int db_select_delete_continue_hash(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret);
static int db_select_replace_hash(Process *p, DbTable *tbl,
Eterm pattern, Eterm *ret);
static int db_select_replace_continue_hash(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret);
static int db_take_hash(Process *, DbTable *, Eterm, Eterm *);
static void db_print_hash(fmtfn_t to,
void *to_arg,
int show,
DbTable *tbl);
static int db_free_table_hash(DbTable *tbl);
static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds);
static void db_foreach_offheap_hash(DbTable *,
void (*)(ErlOffHeap *, void *),
void *);
static int db_delete_all_objects_hash(Process* p, DbTable* tbl);
#ifdef HARDDEBUG
static void db_check_table_hash(DbTableHash *tb);
#endif
static int
db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj,
DbUpdateHandle* handle);
static void
db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle);
static ERTS_INLINE void try_shrink(DbTableHash* tb)
{
int nactive = NACTIVE(tb);
int nitems = NITEMS(tb);
if (nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive)
&& !IS_FIXED(tb)) {
shrink(tb, nitems);
}
}
/* Is this a live object (not pseodo-deleted) with the specified key?
*/
static ERTS_INLINE int has_live_key(DbTableHash* tb, HashDbTerm* b,
Eterm key, HashValue hval)
{
if (b->hvalue != hval) return 0;
else {
Eterm itemKey = GETKEY(tb, b->dbterm.tpl);
ASSERT(!is_header(itemKey));
return EQ(key, itemKey);
}
}
/* Has this object the specified key? Can be pseudo-deleted.
*/
static ERTS_INLINE int has_key(DbTableHash* tb, HashDbTerm* b,
Eterm key, HashValue hval)
{
if (b->hvalue != hval && b->hvalue != INVALID_HASH) return 0;
else {
Eterm itemKey = GETKEY(tb, b->dbterm.tpl);
ASSERT(!is_header(itemKey));
return EQ(key, itemKey);
}
}
static ERTS_INLINE HashDbTerm* new_dbterm(DbTableHash* tb, Eterm obj)
{
HashDbTerm* p;
if (tb->common.compress) {
p = db_store_term_comp(&tb->common, NULL, offsetof(HashDbTerm,dbterm), obj);
}
else {
p = db_store_term(&tb->common, NULL, offsetof(HashDbTerm,dbterm), obj);
}
return p;
}
static ERTS_INLINE HashDbTerm* replace_dbterm(DbTableHash* tb, HashDbTerm* old,
Eterm obj)
{
HashDbTerm* ret;
ASSERT(old != NULL);
if (tb->common.compress) {
ret = db_store_term_comp(&tb->common, &(old->dbterm), offsetof(HashDbTerm,dbterm), obj);
}
else {
ret = db_store_term(&tb->common, &(old->dbterm), offsetof(HashDbTerm,dbterm), obj);
}
return ret;
}
/*
** External interface
*/
DbTableMethod db_hash =
{
db_create_hash,
db_first_hash,
db_next_hash,
db_first_hash, /* last == first */
db_next_hash, /* prev == next */
db_put_hash,
db_get_hash,
db_get_element_hash,
db_member_hash,
db_erase_hash,
db_erase_object_hash,
db_slot_hash,
db_select_chunk_hash,
db_select_hash,
db_select_delete_hash,
db_select_continue_hash, /* hmm continue_hash? */
db_select_delete_continue_hash,
db_select_count_hash,
db_select_count_continue_hash,
db_select_replace_hash,
db_select_replace_continue_hash,
db_take_hash,
db_delete_all_objects_hash,
db_free_table_hash,
db_free_table_continue_hash,
db_print_hash,
db_foreach_offheap_hash,
db_lookup_dbterm_hash,
db_finalize_dbterm_hash
};
#ifdef DEBUG
/* Wait a while to provoke race and get code coverage */
static void DEBUG_WAIT(void)
{
unsigned long spin = 1UL << 20;
while (--spin);
}
#else
# define DEBUG_WAIT()
#endif
/* Rare case of restoring the rest of the fixdel list
when "unfixer" gets interrupted by "fixer" */
static void restore_fixdel(DbTableHash* tb, FixedDeletion* fixdel)
{
/*int tries = 0;*/
DEBUG_WAIT();
if (erts_smp_atomic_cmpxchg_relb(&tb->fixdel,
(erts_aint_t) fixdel,
(erts_aint_t) NULL) != (erts_aint_t) NULL) {
/* Oboy, must join lists */
FixedDeletion* last = fixdel;
erts_aint_t was_tail;
erts_aint_t exp_tail;
while (last->next != NULL) last = last->next;
was_tail = erts_smp_atomic_read_acqb(&tb->fixdel);
do { /* Lockless atomic list insertion */
exp_tail = was_tail;
last->next = (FixedDeletion*) exp_tail;
/*++tries;*/
DEBUG_WAIT();
was_tail = erts_smp_atomic_cmpxchg_relb(&tb->fixdel,
(erts_aint_t) fixdel,
exp_tail);
}while (was_tail != exp_tail);
}
/*erts_fprintf(stderr,"erl_db_hash: restore_fixdel tries=%d\r\n", tries);*/
}
/*
** Table interface routines ie what's called by the bif's
*/
SWord db_unfix_table_hash(DbTableHash *tb)
{
FixedDeletion* fixdel;
SWord work = 0;
ERTS_SMP_LC_ASSERT(erts_smp_lc_rwmtx_is_rwlocked(&tb->common.rwlock)
|| (erts_smp_lc_rwmtx_is_rlocked(&tb->common.rwlock)
&& !tb->common.is_thread_safe));
restart:
fixdel = (FixedDeletion*) erts_smp_atomic_xchg_mb(&tb->fixdel,
(erts_aint_t) NULL);
while (fixdel != NULL) {
FixedDeletion *fx = fixdel;
int ix = fx->slot;
HashDbTerm **bp;
HashDbTerm *b;
erts_smp_rwmtx_t* lck = WLOCK_HASH(tb,ix);
if (IS_FIXED(tb)) { /* interrupted by fixer */
WUNLOCK_HASH(lck);
restore_fixdel(tb,fixdel);
if (!IS_FIXED(tb)) {
goto restart; /* unfixed again! */
}
return work;
}
if (ix < NACTIVE(tb)) {
bp = &BUCKET(tb, ix);
b = *bp;
while (b != NULL) {
if (b->hvalue == INVALID_HASH) {
*bp = b->next;
free_term(tb, b);
work++;
b = *bp;
} else {
bp = &b->next;
b = b->next;
}
}
}
/* else slot has been joined and purged by shrink() */
WUNLOCK_HASH(lck);
fixdel = fx->next;
erts_db_free(ERTS_ALC_T_DB_FIX_DEL,
(DbTable *) tb,
(void *) fx,
sizeof(FixedDeletion));
ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion));
work++;
}
/* ToDo: Maybe try grow/shrink the table as well */
return work;
}
int db_create_hash(Process *p, DbTable *tbl)
{
DbTableHash *tb = &tbl->hash;
erts_smp_atomic_init_nob(&tb->szm, FIRST_SEGSZ_MASK);
erts_smp_atomic_init_nob(&tb->nactive, FIRST_SEGSZ);
erts_smp_atomic_init_nob(&tb->fixdel, (erts_aint_t)NULL);
erts_smp_atomic_init_nob(&tb->segtab, (erts_aint_t)NULL);
SET_SEGTAB(tb, tb->first_segtab);
tb->nsegs = NSEG_1;
tb->nslots = FIRST_SEGSZ;
tb->first_segtab[0] = (struct segment*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
(DbTable *) tb,
SIZEOF_SEGMENT(FIRST_SEGSZ));
sys_memset(tb->first_segtab[0], 0, SIZEOF_SEGMENT(FIRST_SEGSZ));
#ifdef ERTS_SMP
erts_smp_atomic_init_nob(&tb->is_resizing, 0);
if (tb->common.type & DB_FINE_LOCKED) {
erts_smp_rwmtx_opt_t rwmtx_opt = ERTS_SMP_RWMTX_OPT_DEFAULT_INITER;
int i;
if (tb->common.type & DB_FREQ_READ)
rwmtx_opt.type = ERTS_SMP_RWMTX_TYPE_FREQUENT_READ;
if (erts_ets_rwmtx_spin_count >= 0)
rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
tb->locks = (DbTableHashFineLocks*) erts_db_alloc_fnf(ERTS_ALC_T_DB_SEG, /* Other type maybe? */
(DbTable *) tb,
sizeof(DbTableHashFineLocks));
for (i=0; i<DB_HASH_LOCK_CNT; ++i) {
erts_smp_rwmtx_init_opt_x(&tb->locks->lck_vec[i].lck, &rwmtx_opt,
"db_hash_slot", make_small(i));
}
/* This important property is needed to guarantee the two buckets
* involved in a grow/shrink operation it protected by the same lock:
*/
ASSERT(erts_smp_atomic_read_nob(&tb->nactive) % DB_HASH_LOCK_CNT == 0);
}
else { /* coarse locking */
tb->locks = NULL;
}
ERTS_THR_MEMORY_BARRIER;
#endif /* ERST_SMP */
return DB_ERROR_NONE;
}
static int db_first_hash(Process *p, DbTable *tbl, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
Uint ix = 0;
erts_smp_rwmtx_t* lck = RLOCK_HASH(tb,ix);
HashDbTerm* list;
for (;;) {
list = BUCKET(tb,ix);
if (list != NULL) {
if (list->hvalue == INVALID_HASH) {
list = next(tb,&ix,&lck,list);
}
break;
}
if ((ix=next_slot(tb,ix,&lck)) == 0) {
list = NULL;
break;
}
}
if (list != NULL) {
*ret = db_copy_key(p, tbl, &list->dbterm);
RUNLOCK_HASH(lck);
}
else {
*ret = am_EOT;
}
return DB_ERROR_NONE;
}
static int db_next_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
HashValue hval;
Uint ix;
HashDbTerm* b;
erts_smp_rwmtx_t* lck;
hval = MAKE_HASH(key);
lck = RLOCK_HASH(tb,hval);
ix = hash_to_ix(tb, hval);
b = BUCKET(tb, ix);
for (;;) {
if (b == NULL) {
RUNLOCK_HASH(lck);
return DB_ERROR_BADKEY;
}
if (has_key(tb, b, key, hval)) {
break;
}
b = b->next;
}
/* Key found */
b = next(tb, &ix, &lck, b);
if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
while (b != 0) {
if (!has_live_key(tb, b, key, hval)) {
break;
}
b = next(tb, &ix, &lck, b);
}
}
if (b == NULL) {
*ret = am_EOT;
}
else {
*ret = db_copy_key(p, tbl, &b->dbterm);
RUNLOCK_HASH(lck);
}
return DB_ERROR_NONE;
}
int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail)
{
DbTableHash *tb = &tbl->hash;
HashValue hval;
int ix;
Eterm key;
HashDbTerm** bp;
HashDbTerm* b;
HashDbTerm* q;
erts_smp_rwmtx_t* lck;
int nitems;
int ret = DB_ERROR_NONE;
key = GETKEY(tb, tuple_val(obj));
hval = MAKE_HASH(key);
lck = WLOCK_HASH(tb, hval);
ix = hash_to_ix(tb, hval);
bp = &BUCKET(tb, ix);
b = *bp;
for (;;) {
if (b == NULL) {
goto Lnew;
}
if (has_key(tb,b,key,hval)) {
break;
}
bp = &b->next;
b = b->next;
}
/* Key found
*/
if (tb->common.status & DB_SET) {
HashDbTerm* bnext = b->next;
if (b->hvalue == INVALID_HASH) {
erts_smp_atomic_inc_nob(&tb->common.nitems);
}
else if (key_clash_fail) {
ret = DB_ERROR_BADKEY;
goto Ldone;
}
q = replace_dbterm(tb, b, obj);
q->next = bnext;
q->hvalue = hval; /* In case of INVALID_HASH */
*bp = q;
goto Ldone;
}
else if (key_clash_fail) { /* && (DB_BAG || DB_DUPLICATE_BAG) */
q = b;
do {
if (q->hvalue != INVALID_HASH) {
ret = DB_ERROR_BADKEY;
goto Ldone;
}
q = q->next;
}while (q != NULL && has_key(tb,q,key,hval));
}
else if (tb->common.status & DB_BAG) {
HashDbTerm** qp = bp;
q = b;
do {
if (db_eq(&tb->common,obj,&q->dbterm)) {
if (q->hvalue == INVALID_HASH) {
erts_smp_atomic_inc_nob(&tb->common.nitems);
q->hvalue = hval;
if (q != b) { /* must move to preserve key insertion order */
*qp = q->next;
q->next = b;
*bp = q;
}
}
goto Ldone;
}
qp = &q->next;
q = *qp;
}while (q != NULL && has_key(tb,q,key,hval));
}
/*else DB_DUPLICATE_BAG */
Lnew:
q = new_dbterm(tb, obj);
q->hvalue = hval;
q->next = b;
*bp = q;
nitems = erts_smp_atomic_inc_read_nob(&tb->common.nitems);
WUNLOCK_HASH(lck);
{
int nactive = NACTIVE(tb);
if (nitems > GROW_LIMIT(nactive) && !IS_FIXED(tb)) {
grow(tb, nitems);
}
}
return DB_ERROR_NONE;
Ldone:
WUNLOCK_HASH(lck);
return ret;
}
static Eterm
get_term_list(Process *p, DbTableHash *tb, Eterm key, HashValue hval,
HashDbTerm *b1, HashDbTerm **bend)
{
HashDbTerm* b2 = b1->next;
Eterm copy;
Uint sz = b1->dbterm.size + 2;
if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
while (b2 && has_key(tb, b2, key, hval)) {
if (b2->hvalue != INVALID_HASH)
sz += b2->dbterm.size + 2;
b2 = b2->next;
}
}
copy = build_term_list(p, b1, b2, sz, tb);
if (bend) {
*bend = b2;
}
return copy;
}
int db_get_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
HashValue hval;
int ix;
HashDbTerm* b;
erts_smp_rwmtx_t* lck;
hval = MAKE_HASH(key);
lck = RLOCK_HASH(tb,hval);
ix = hash_to_ix(tb, hval);
b = BUCKET(tb, ix);
while(b != 0) {
if (has_live_key(tb, b, key, hval)) {
*ret = get_term_list(p, tb, key, hval, b, NULL);
goto done;
}
b = b->next;
}
*ret = NIL;
done:
RUNLOCK_HASH(lck);
return DB_ERROR_NONE;
}
static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
HashValue hval;
int ix;
HashDbTerm* b1;
erts_smp_rwmtx_t* lck;
hval = MAKE_HASH(key);
ix = hash_to_ix(tb, hval);
lck = RLOCK_HASH(tb, hval);
b1 = BUCKET(tb, ix);
while(b1 != 0) {
if (has_live_key(tb,b1,key,hval)) {
*ret = am_true;
goto done;
}
b1 = b1->next;
}
*ret = am_false;
done:
RUNLOCK_HASH(lck);
return DB_ERROR_NONE;
}
static int db_get_element_hash(Process *p, DbTable *tbl,
Eterm key,
int ndex,
Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
HashValue hval;
int ix;
HashDbTerm* b1;
erts_smp_rwmtx_t* lck;
int retval;
hval = MAKE_HASH(key);
lck = RLOCK_HASH(tb, hval);
ix = hash_to_ix(tb, hval);
b1 = BUCKET(tb, ix);
while(b1 != 0) {
if (has_live_key(tb,b1,key,hval)) {
if (ndex > arityval(b1->dbterm.tpl[0])) {
retval = DB_ERROR_BADITEM;
goto done;
}
if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
HashDbTerm* b;
HashDbTerm* b2 = b1->next;
Eterm elem_list = NIL;
while(b2 != NULL && has_key(tb,b2,key,hval)) {
if (ndex > arityval(b2->dbterm.tpl[0])
&& b2->hvalue != INVALID_HASH) {
retval = DB_ERROR_BADITEM;
goto done;
}
b2 = b2->next;
}
b = b1;
while(b != b2) {
if (b->hvalue != INVALID_HASH) {
Eterm *hp;
Eterm copy = db_copy_element_from_ets(&tb->common, p,
&b->dbterm, ndex, &hp, 2);
elem_list = CONS(hp, copy, elem_list);
}
b = b->next;
}
*ret = elem_list;
}
else {
Eterm* hp;
*ret = db_copy_element_from_ets(&tb->common, p, &b1->dbterm, ndex, &hp, 0);
}
retval = DB_ERROR_NONE;
goto done;
}
b1 = b1->next;
}
retval = DB_ERROR_BADKEY;
done:
RUNLOCK_HASH(lck);
return retval;
}
/*
** NB, this is for the db_erase/2 bif.
*/
int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
HashValue hval;
int ix;
HashDbTerm** bp;
HashDbTerm* b;
erts_smp_rwmtx_t* lck;
int nitems_diff = 0;
hval = MAKE_HASH(key);
lck = WLOCK_HASH(tb,hval);
ix = hash_to_ix(tb, hval);
bp = &BUCKET(tb, ix);
b = *bp;
while(b != 0) {
if (has_live_key(tb,b,key,hval)) {
--nitems_diff;
if (nitems_diff == -1 && IS_FIXED(tb)
&& add_fixed_deletion(tb, ix, 0)) {
/* Pseudo remove (no need to keep several of same key) */
b->hvalue = INVALID_HASH;
} else {
*bp = b->next;
free_term(tb, b);
b = *bp;
continue;
}
}
else {
if (nitems_diff && b->hvalue != INVALID_HASH)
break;
}
bp = &b->next;
b = b->next;
}
WUNLOCK_HASH(lck);
if (nitems_diff) {
erts_smp_atomic_add_nob(&tb->common.nitems, nitems_diff);
try_shrink(tb);
}
*ret = am_true;
return DB_ERROR_NONE;
}
/*
** This is for the ets:delete_object BIF
*/
static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
HashValue hval;
int ix;
HashDbTerm** bp;
HashDbTerm* b;
erts_smp_rwmtx_t* lck;
int nitems_diff = 0;
int nkeys = 0;
Eterm key;
key = GETKEY(tb, tuple_val(object));
hval = MAKE_HASH(key);
lck = WLOCK_HASH(tb,hval);
ix = hash_to_ix(tb, hval);
bp = &BUCKET(tb, ix);
b = *bp;
while(b != 0) {
if (has_live_key(tb,b,key,hval)) {
++nkeys;
if (db_eq(&tb->common,object, &b->dbterm)) {
--nitems_diff;
if (nkeys==1 && IS_FIXED(tb) && add_fixed_deletion(tb,ix,0)) {
b->hvalue = INVALID_HASH; /* Pseudo remove */
bp = &b->next;
b = b->next;
} else {
*bp = b->next;
free_term(tb, b);
b = *bp;
}
if (tb->common.status & (DB_DUPLICATE_BAG)) {
continue;
} else {
break;
}
}
}
else if (nitems_diff && b->hvalue != INVALID_HASH) {
break;
}
bp = &b->next;
b = b->next;
}
WUNLOCK_HASH(lck);
if (nitems_diff) {
erts_smp_atomic_add_nob(&tb->common.nitems, nitems_diff);
try_shrink(tb);
}
*ret = am_true;
return DB_ERROR_NONE;
}
static int db_slot_hash(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
erts_smp_rwmtx_t* lck;
Sint slot;
int retval;
int nactive;
if (is_not_small(slot_term) || ((slot = signed_val(slot_term)) < 0)) {
return DB_ERROR_BADPARAM;
}
lck = RLOCK_HASH(tb, slot);
nactive = NACTIVE(tb);
if (slot < nactive) {
*ret = build_term_list(p, BUCKET(tb, slot), NULL, 0, tb);
retval = DB_ERROR_NONE;
}
else if (slot == nactive) {
*ret = am_EOT;
retval = DB_ERROR_NONE;
}
else {
retval = DB_ERROR_BADPARAM;
}
RUNLOCK_HASH(lck);
return retval;
}
/*
* This is just here so I can take care of the return value
* that is to be sent during a trap (the BIF_TRAP macros explicitly returns)
*/
static BIF_RETTYPE bif_trap1(Export *bif,
Process *p,
Eterm p1)
{
BIF_TRAP1(bif, p, p1);
}
/*
* Match traversal callbacks
*/
/* Called when no match is possible.
* context_ptr: Pointer to context
* ret: Pointer to traversal function term return.
*
* Both the direct return value and 'ret' are used as the traversal function return values.
*/
typedef int (*mtraversal_on_nothing_can_match_t)(void* context_ptr, Eterm* ret);
/* Called for each match result.
* context_ptr: Pointer to context
* slot_ix: Current slot index
* current_ptr_ptr: Triple pointer to either the bucket or the 'next' pointer in the previous element;
* can be (carefully) used to adjust iteration when deleting or replacing elements.
* match_res: The result of running the match program against the current term.
*
* Should return 1 for successful match, 0 otherwise.
*/
typedef int (*mtraversal_on_match_res_t)(void* context_ptr, Sint slot_ix, HashDbTerm*** current_ptr_ptr,
Eterm match_res);
/* Called when either we've matched enough elements in this cycle or EOT was reached.
* context_ptr: Pointer to context
* slot_ix: Current slot index
* got: How many elements have been matched so far
* iterations_left: Number of intended iterations (down from an initial max.) left in this traversal cycle
* mpp: Double pointer to the compiled match program
* ret: Pointer to traversal function term return.
*
* Both the direct return value and 'ret' are used as the traversal function return values.
* If *mpp is set to NULL, it won't be deallocated (useful for trapping.)
*/
typedef int (*mtraversal_on_loop_ended_t)(void* context_ptr, Sint slot_ix, Sint got,
Sint iterations_left, Binary** mpp, Eterm* ret);
/* Called when it's time to trap
* context_ptr: Pointer to context
* slot_ix: Current slot index
* got: How many elements have been matched so far
* mpp: Double pointer to the compiled match program
* ret: Pointer to traversal function term return.
*
* Both the direct return value and 'ret' are used as the traversal function return values.
* If *mpp is set to NULL, it won't be deallocated (useful for trapping.)
*/
typedef int (*mtraversal_on_trap_t)(void* context_ptr, Sint slot_ix, Sint got, Binary** mpp, Eterm* ret);
/*
* Begin hash table match traversal
*/
static int match_traverse(Process* p, DbTableHash* tb,
Eterm pattern,
extra_match_validator_t extra_match_validator, /* Optional */
Sint chunk_size, /* If 0, no chunking */
Sint iterations_left, /* Nr. of iterations left */
Eterm** hpp, /* Heap */
int lock_for_write, /* Set to 1 if we're going to delete or
modify existing terms */
mtraversal_on_nothing_can_match_t on_nothing_can_match,
mtraversal_on_match_res_t on_match_res,
mtraversal_on_loop_ended_t on_loop_ended,
mtraversal_on_trap_t on_trap,
void* context_ptr, /* State for callbacks above */
Eterm* ret)
{
Sint slot_ix = -1; /* Slot index */
HashDbTerm** current_ptr = NULL; /* Refers to either the bucket pointer or
* the 'next' pointer in the previous term
*/
HashDbTerm* saved_current = NULL; /* Helper to avoid double skip on match */
struct mp_info mpi;
unsigned current_list_pos = 0; /* Prefound buckets list index */
Eterm match_res;
Sint got = 0; /* Matched terms counter */
erts_smp_rwmtx_t* lck = NULL; /* Slot lock */
int ret_value = DB_ERROR_NONE;
#ifdef ERTS_SMP
erts_smp_rwmtx_t* (*lock_hash_function)(DbTableHash*, HashValue)
= (lock_for_write ? WLOCK_HASH : RLOCK_HASH);
void (*unlock_hash_function)(erts_smp_rwmtx_t*)
= (lock_for_write ? WUNLOCK_HASH : RUNLOCK_HASH);
#else
#define lock_hash_function(tb, hval) NULL
#define unlock_hash_function(lck) ((void)lck)
#endif
Sint (*next_slot_function)(DbTableHash*, Uint, erts_smp_rwmtx_t**)
= (lock_for_write ? next_slot_w : next_slot);
*ret = NIL;
if ((ret_value = analyze_pattern(tb, pattern, extra_match_validator, &mpi))
!= DB_ERROR_NONE)
{
goto done;
}
if (!mpi.something_can_match) {
/* Can't possibly match anything */
ret_value = on_nothing_can_match(context_ptr, ret);
goto done;
}
if (mpi.all_objects) {
mpi.mp->flags |= BIN_FLAG_ALL_OBJECTS;
}
/*
* Look for initial slot / bucket
*/
if (!mpi.key_given) {
/* Run this code if pattern is variable or GETKEY(pattern) */
/* is a variable */
slot_ix = 0;
lck = lock_hash_function(tb,slot_ix);
for (;;) {
ASSERT(slot_ix < NACTIVE(tb));
if (*(current_ptr = &BUCKET(tb,slot_ix)) != NULL) {
break;
}
slot_ix = next_slot_function(tb,slot_ix,&lck);
if (slot_ix == 0) {
ret_value = on_loop_ended(context_ptr, slot_ix, got, iterations_left, &mpi.mp, ret);
goto done;
}
}
} else {
/* We have at least one */
slot_ix = mpi.lists[current_list_pos].ix;
lck = lock_hash_function(tb, slot_ix);
current_ptr = mpi.lists[current_list_pos].bucket;
ASSERT(*current_ptr == BUCKET(tb,slot_ix));
++current_list_pos;
}
/*
* Execute traversal cycle
*/
for(;;) {
if (*current_ptr != NULL) {
if ((*current_ptr)->hvalue != INVALID_HASH) {
match_res = db_match_dbterm(&tb->common, p, mpi.mp, 0,
&(*current_ptr)->dbterm, hpp, 2);
saved_current = *current_ptr;
if (on_match_res(context_ptr, slot_ix, ¤t_ptr, match_res)) {
++got;
}
--iterations_left;
if (*current_ptr != saved_current) {
/* Don't advance to next, the callback did it already */
continue;
}
}
current_ptr = &((*current_ptr)->next);
}
else if (mpi.key_given) { /* Key is bound */
unlock_hash_function(lck);
if (current_list_pos == mpi.num_lists) {
ret_value = on_loop_ended(context_ptr, slot_ix, got, iterations_left, &mpi.mp, ret);
goto done;
} else {
slot_ix = mpi.lists[current_list_pos].ix;
lck = lock_hash_function(tb, slot_ix);
current_ptr = mpi.lists[current_list_pos].bucket;
ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix));
++current_list_pos;
}
}
else { /* Key is variable */
if ((slot_ix = next_slot_function(tb,slot_ix,&lck)) == 0) {
slot_ix = -1;
break;
}
if (chunk_size && got >= chunk_size) {
unlock_hash_function(lck);
break;
}
if (iterations_left <= 0 || MBUF(p)) {
/*
* We have either reached our limit, or just created some heap fragments.
* Since many heap fragments will make the GC slower, trap and GC now.
*/
unlock_hash_function(lck);
ret_value = on_trap(context_ptr, slot_ix, got, &mpi.mp, ret);
goto done;
}
current_ptr = &BUCKET(tb,slot_ix);
}
}
ret_value = on_loop_ended(context_ptr, slot_ix, got, iterations_left, &mpi.mp, ret);
done:
/* We should only jump directly to this label if
* we've already called on_nothing_can_match / on_loop_ended / on_trap
*/
if (mpi.mp != NULL) {
erts_bin_free(mpi.mp);
}
if (mpi.lists != mpi.dlists) {
erts_free(ERTS_ALC_T_DB_SEL_LIST,
(void *) mpi.lists);
}
return ret_value;
#ifndef SMP
#undef lock_hash_function
#undef unlock_hash_function
#endif
}
/*
* Continue hash table match traversal
*/
static int match_traverse_continue(Process* p, DbTableHash* tb,
Sint chunk_size, /* If 0, no chunking */
Sint iterations_left, /* Nr. of iterations left */
Eterm** hpp, /* Heap */
Sint slot_ix, /* Slot index to resume traversal from */
Sint got, /* Matched terms counter */
Binary** mpp, /* Existing match program */
int lock_for_write, /* Set to 1 if we're going to delete or
modify existing terms */
mtraversal_on_match_res_t on_match_res,
mtraversal_on_loop_ended_t on_loop_ended,
mtraversal_on_trap_t on_trap,
void* context_ptr, /* For callbacks */
Eterm* ret)
{
int all_objects = (*mpp)->flags & BIN_FLAG_ALL_OBJECTS;
HashDbTerm** current_ptr = NULL; /* Refers to either the bucket pointer or
* the 'next' pointer in the previous term
*/
HashDbTerm* saved_current = NULL; /* Helper to avoid double skip on match */
Eterm match_res = NIL;
erts_smp_rwmtx_t* lck = NULL;
int ret_value = DB_ERROR_NONE;
#ifdef ERTS_SMP
erts_smp_rwmtx_t* (*lock_hash_function)(DbTableHash*, HashValue)
= (lock_for_write ? WLOCK_HASH : RLOCK_HASH);
void (*unlock_hash_function)(erts_smp_rwmtx_t*)
= (lock_for_write ? WUNLOCK_HASH : RUNLOCK_HASH);
#else
#define lock_hash_function(tb, hval) NULL
#define unlock_hash_function(lck) ((void)lck)
#endif
Sint (*next_slot_function)(DbTableHash* tb, Uint ix, erts_smp_rwmtx_t** lck_ptr)
= (lock_for_write ? next_slot_w : next_slot);
*ret = NIL;
if (got < 0)
return DB_ERROR_BADPARAM;
if (slot_ix < 0 /* EOT */
|| (chunk_size && got >= chunk_size))
{
/* Already got all or enough in the match_list */
ret_value = on_loop_ended(context_ptr, slot_ix, got, iterations_left, mpp, ret);
goto done;
}
lck = lock_hash_function(tb, slot_ix);
if (slot_ix >= NACTIVE(tb)) { /* Is this possible? */
unlock_hash_function(lck);
ret_value = DB_ERROR_BADPARAM;
goto done;
}
/*
* Resume traversal cycle from where we left
*/
current_ptr = &BUCKET(tb,slot_ix);
for(;;) {
if (*current_ptr != NULL) {
if ((*current_ptr)->hvalue != INVALID_HASH) {
match_res = db_match_dbterm(&tb->common, p, *mpp, all_objects,
&(*current_ptr)->dbterm, hpp, 2);
saved_current = *current_ptr;
if (on_match_res(context_ptr, slot_ix, ¤t_ptr, match_res)) {
++got;
}
--iterations_left;
if (*current_ptr != saved_current) {
/* Don't advance to next, the callback did it already */
continue;
}
}
current_ptr = &((*current_ptr)->next);
}
else {
if ((slot_ix=next_slot_function(tb,slot_ix,&lck)) == 0) {
slot_ix = -1;
break;
}
if (chunk_size && got >= chunk_size) {
unlock_hash_function(lck);
break;
}
if (iterations_left <= 0 || MBUF(p)) {
/*
* We have either reached our limit, or just created some heap fragments.
* Since many heap fragments will make the GC slower, trap and GC now.
*/
unlock_hash_function(lck);
ret_value = on_trap(context_ptr, slot_ix, got, mpp, ret);
goto done;
}
current_ptr = &BUCKET(tb,slot_ix);
}
}
ret_value = on_loop_ended(context_ptr, slot_ix, got, iterations_left, mpp, ret);
done:
/* We should only jump directly to this label if
* we've already called on_loop_ended / on_trap
*/
return ret_value;
#ifndef SMP
#undef lock_hash_function
#undef unlock_hash_function
#endif
}
/*
* Common traversal trapping/continuation code;
* used by select_count, select_delete and select_replace,
* as well as their continuation-handling counterparts.
*/
static ERTS_INLINE int on_mtraversal_simple_trap(Export* trap_function,
Process* p,
DbTableHash* tb,
Eterm tid,
Eterm* prev_continuation_tptr,
Sint slot_ix,
Sint got,
Binary** mpp,
Eterm* ret)
{
Eterm* hp = NULL;
Eterm egot = NIL;
Eterm mpb = NIL;
Eterm continuation = NIL;
int is_first_trap = (prev_continuation_tptr == NULL);
size_t base_halloc_sz = (is_first_trap ? ERTS_MAGIC_REF_THING_SIZE : 0);
BUMP_ALL_REDS(p);
if (IS_USMALL(0, got)) {
hp = HAlloc(p, base_halloc_sz + 5);
egot = make_small(got);
}
else {
hp = HAlloc(p, base_halloc_sz + BIG_UINT_HEAP_SIZE + 5);
egot = uint_to_big(got, hp);
hp += BIG_UINT_HEAP_SIZE;
}
if (is_first_trap) {
mpb = erts_db_make_match_prog_ref(p, *mpp, &hp);
*mpp = NULL; /* otherwise the caller will destroy it */
}
else {
mpb = prev_continuation_tptr[3];
}
continuation = TUPLE4(
hp,
tid,
make_small(slot_ix),
mpb,
egot);
*ret = bif_trap1(trap_function, p, continuation);
return DB_ERROR_NONE;
}
static ERTS_INLINE int unpack_simple_mtraversal_continuation(Eterm continuation,
Eterm** tptr_ptr,
Eterm* tid_ptr,
Sint* slot_ix_p,
Binary** mpp,
Sint* got_p)
{
Eterm* tptr = NULL;
ASSERT(is_tuple(continuation));
tptr = tuple_val(continuation);
if (arityval(*tptr) != 4)
return 1;
if (! is_small(tptr[2]) || !(is_big(tptr[4]) || is_small(tptr[4]))) {
return 1;
}
*tptr_ptr = tptr;
*tid_ptr = tptr[1];
*slot_ix_p = unsigned_val(tptr[2]);
*mpp = erts_db_get_match_prog_binary_unchecked(tptr[3]);
if (is_big(tptr[4])) {
*got_p = big_to_uint32(tptr[4]);
}
else {
*got_p = unsigned_val(tptr[4]);
}
return 0;
}
/*
*
* select / select_chunk match traversal
*
*/
#define MAX_SELECT_CHUNK_ITERATIONS 1000
typedef struct {
Process* p;
DbTableHash* tb;
Eterm tid;
Eterm* hp;
Sint chunk_size;
Eterm match_list;
Eterm* prev_continuation_tptr;
} mtraversal_select_chunk_context_t;
static int mtraversal_select_chunk_on_nothing_can_match(void* context_ptr, Eterm* ret) {
mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr;
*ret = (sc_context_ptr->chunk_size > 0 ? am_EOT : NIL);
return DB_ERROR_NONE;
}
static int mtraversal_select_chunk_on_match_res(void* context_ptr, Sint slot_ix,
HashDbTerm*** current_ptr_ptr,
Eterm match_res)
{
mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr;
if (is_value(match_res)) {
sc_context_ptr->match_list = CONS(sc_context_ptr->hp, match_res, sc_context_ptr->match_list);
return 1;
}
return 0;
}
static int mtraversal_select_chunk_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got,
Sint iterations_left, Binary** mpp, Eterm* ret)
{
mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr;
Eterm mpb = NIL;
if (iterations_left == MAX_SELECT_CHUNK_ITERATIONS) {
/* We didn't get to iterate a single time, which means EOT */
ASSERT(sc_context_ptr->match_list == NIL);
*ret = (sc_context_ptr->chunk_size > 0 ? am_EOT : NIL);
return DB_ERROR_NONE;
}
else {
ASSERT(iterations_left < MAX_SELECT_CHUNK_ITERATIONS);
BUMP_REDS(sc_context_ptr->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left);
if (sc_context_ptr->chunk_size) {
Eterm continuation;
Eterm rest = NIL;
Sint rest_size = 0;
if (got > sc_context_ptr->chunk_size) { /* Split list in return value and 'rest' */
Eterm tmp = sc_context_ptr->match_list;
rest = sc_context_ptr->match_list;
while (got-- > sc_context_ptr->chunk_size + 1) {
tmp = CDR(list_val(tmp));
++rest_size;
}
++rest_size;
sc_context_ptr->match_list = CDR(list_val(tmp));
CDR(list_val(tmp)) = NIL; /* Destructive, the list has never
been in 'user space' */
}
if (rest != NIL || slot_ix >= 0) { /* Need more calls */
sc_context_ptr->hp = HAlloc(sc_context_ptr->p, 3 + 7 + ERTS_MAGIC_REF_THING_SIZE);
mpb = erts_db_make_match_prog_ref(sc_context_ptr->p, *mpp, &sc_context_ptr->hp);
continuation = TUPLE6(
sc_context_ptr->hp,
sc_context_ptr->tid,
make_small(slot_ix),
make_small(sc_context_ptr->chunk_size),
mpb, rest,
make_small(rest_size));
*mpp = NULL; /* Otherwise the caller will destroy it */
sc_context_ptr->hp += 7;
*ret = TUPLE2(sc_context_ptr->hp, sc_context_ptr->match_list, continuation);
return DB_ERROR_NONE;
} else { /* All data is exhausted */
if (sc_context_ptr->match_list != NIL) { /* No more data to search but still a
result to return to the caller */
sc_context_ptr->hp = HAlloc(sc_context_ptr->p, 3);
*ret = TUPLE2(sc_context_ptr->hp, sc_context_ptr->match_list, am_EOT);
return DB_ERROR_NONE;
} else { /* Reached the end of the ttable with no data to return */
*ret = am_EOT;
return DB_ERROR_NONE;
}
}
}
*ret = sc_context_ptr->match_list;
return DB_ERROR_NONE;
}
}
static int mtraversal_select_chunk_on_trap(void* context_ptr, Sint slot_ix, Sint got,
Binary** mpp, Eterm* ret)
{
mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr;
Eterm mpb = NIL;
Eterm continuation = NIL;
Eterm* hp = NULL;
BUMP_ALL_REDS(sc_context_ptr->p);
if (sc_context_ptr->prev_continuation_tptr == NULL) {
/* First time we're trapping */
hp = HAlloc(sc_context_ptr->p, 7 + ERTS_MAGIC_REF_THING_SIZE);
mpb = erts_db_make_match_prog_ref(sc_context_ptr->p, *mpp, &hp);
continuation = TUPLE6(
hp,
sc_context_ptr->tid,
make_small(slot_ix),
make_small(sc_context_ptr->chunk_size),
mpb,
sc_context_ptr->match_list,
make_small(got));
*mpp = NULL; /* otherwise the caller will destroy it */
}
else {
/* Not the first time we're trapping; reuse continuation terms */
hp = HAlloc(sc_context_ptr->p, 7);
continuation = TUPLE6(
hp,
sc_context_ptr->prev_continuation_tptr[1],
make_small(slot_ix),
sc_context_ptr->prev_continuation_tptr[3],
sc_context_ptr->prev_continuation_tptr[4],
sc_context_ptr->match_list,
make_small(got));
}
*ret = bif_trap1(&ets_select_continue_exp, sc_context_ptr->p, continuation);
return DB_ERROR_NONE;
}
static int db_select_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, int reverse, Eterm *ret) {
return db_select_chunk_hash(p, tbl, tid, pattern, 0, reverse, ret);
}
static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size,
int reverse, Eterm *ret)
{
mtraversal_select_chunk_context_t sc_context = {0};
sc_context.p = p;
sc_context.tb = &tbl->hash;
sc_context.tid = tid;
sc_context.hp = NULL;
sc_context.chunk_size = chunk_size;
sc_context.match_list = NIL;
sc_context.prev_continuation_tptr = NULL;
return match_traverse(
sc_context.p, sc_context.tb,
pattern, NULL,
sc_context.chunk_size,
MAX_SELECT_CHUNK_ITERATIONS,
&sc_context.hp, 0,
mtraversal_select_chunk_on_nothing_can_match,
mtraversal_select_chunk_on_match_res,
mtraversal_select_chunk_on_loop_ended,
mtraversal_select_chunk_on_trap,
&sc_context, ret);
}
/*
*
* select_continue match traversal
*
*/
static int mtraversal_select_chunk_continue_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got,
Sint iterations_left, Binary** mpp, Eterm* ret)
{
mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr;
Eterm continuation = NIL;
Eterm rest = NIL;
Eterm* hp = NULL;
ASSERT(iterations_left <= MAX_SELECT_CHUNK_ITERATIONS);
BUMP_REDS(sc_context_ptr->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left);
if (sc_context_ptr->chunk_size) {
Sint rest_size = 0;
if (got > sc_context_ptr->chunk_size) {
/* Cannot write destructively here,
the list may have
been in user space */
rest = NIL;
hp = HAlloc(sc_context_ptr->p, (got - sc_context_ptr->chunk_size) * 2);
while (got-- > sc_context_ptr->chunk_size) {
rest = CONS(hp, CAR(list_val(sc_context_ptr->match_list)), rest);
hp += 2;
sc_context_ptr->match_list = CDR(list_val(sc_context_ptr->match_list));
++rest_size;
}
}
if (rest != NIL || slot_ix >= 0) {
hp = HAlloc(sc_context_ptr->p, 3 + 7);
continuation = TUPLE6(
hp,
sc_context_ptr->prev_continuation_tptr[1],
make_small(slot_ix),
sc_context_ptr->prev_continuation_tptr[3],
sc_context_ptr->prev_continuation_tptr[4],
rest,
make_small(rest_size));
hp += 7;
*ret = TUPLE2(hp, sc_context_ptr->match_list, continuation);
return DB_ERROR_NONE;
} else {
if (sc_context_ptr->match_list != NIL) {
hp = HAlloc(sc_context_ptr->p, 3);
*ret = TUPLE2(hp, sc_context_ptr->match_list, am_EOT);
return DB_ERROR_NONE;
} else {
*ret = am_EOT;
return DB_ERROR_NONE;
}
}
}
*ret = sc_context_ptr->match_list;
return DB_ERROR_NONE;
}
/*
* This is called when select traps
*/
static int db_select_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) {
mtraversal_select_chunk_context_t sc_context = {0};
Eterm* tptr = NULL;
Eterm tid = NIL;
Binary* mp = NULL;
Sint got = 0;
Sint slot_ix = 0;
Sint chunk_size = 0;
Eterm match_list = NIL;
Sint iterations_left = MAX_SELECT_CHUNK_ITERATIONS;
*ret = NIL;
/* Decode continuation. We know it's a tuple but not the arity or anything else */
ASSERT(is_tuple(continuation));
tptr = tuple_val(continuation);
if (arityval(*tptr) != 6)
return DB_ERROR_BADPARAM;
if (!is_small(tptr[2]) || !is_small(tptr[3]) ||
!(is_list(tptr[5]) || tptr[5] == NIL) || !is_small(tptr[6]))
return DB_ERROR_BADPARAM;
if ((chunk_size = signed_val(tptr[3])) < 0)
return DB_ERROR_BADPARAM;
mp = erts_db_get_match_prog_binary(tptr[4]);
if (mp == NULL)
return DB_ERROR_BADPARAM;
if ((got = signed_val(tptr[6])) < 0)
return DB_ERROR_BADPARAM;
tid = tptr[1];
slot_ix = signed_val(tptr[2]);
match_list = tptr[5];
/* Proceed */
sc_context.p = p;
sc_context.tb = &tbl->hash;
sc_context.tid = tid;
sc_context.hp = NULL;
sc_context.chunk_size = chunk_size;
sc_context.match_list = match_list;
sc_context.prev_continuation_tptr = tptr;
return match_traverse_continue(
sc_context.p, sc_context.tb, sc_context.chunk_size,
iterations_left, &sc_context.hp, slot_ix, got, &mp, 0,
mtraversal_select_chunk_on_match_res, /* Reuse callback */
mtraversal_select_chunk_continue_on_loop_ended,
mtraversal_select_chunk_on_trap, /* Reuse callback */
&sc_context, ret);
}
#undef MAX_SELECT_CHUNK_ITERATIONS
/*
*
* select_count match traversal
*
*/
#define MAX_SELECT_COUNT_ITERATIONS 1000
typedef struct {
Process* p;
DbTableHash* tb;
Eterm tid;
Eterm* hp;
Eterm* prev_continuation_tptr;
} mtraversal_select_count_context_t;
static int mtraversal_select_count_on_nothing_can_match(void* context_ptr, Eterm* ret) {
*ret = make_small(0);
return DB_ERROR_NONE;
}
static int mtraversal_select_count_on_match_res(void* context_ptr, Sint slot_ix,
HashDbTerm*** current_ptr_ptr,
Eterm match_res)
{
return (match_res == am_true);
}
static int mtraversal_select_count_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got,
Sint iterations_left, Binary** mpp, Eterm* ret)
{
mtraversal_select_count_context_t* scnt_context_ptr = (mtraversal_select_count_context_t*) context_ptr;
ASSERT(iterations_left <= MAX_SELECT_COUNT_ITERATIONS);
BUMP_REDS(scnt_context_ptr->p, MAX_SELECT_COUNT_ITERATIONS - iterations_left);
*ret = erts_make_integer(got, scnt_context_ptr->p);
return DB_ERROR_NONE;
}
static int mtraversal_select_count_on_trap(void* context_ptr, Sint slot_ix, Sint got,
Binary** mpp, Eterm* ret)
{
mtraversal_select_count_context_t* scnt_context_ptr = (mtraversal_select_count_context_t*) context_ptr;
return on_mtraversal_simple_trap(
&ets_select_count_continue_exp,
scnt_context_ptr->p,
scnt_context_ptr->tb,
scnt_context_ptr->tid,
scnt_context_ptr->prev_continuation_tptr,
slot_ix, got, mpp, ret);
}
static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) {
mtraversal_select_count_context_t scnt_context = {0};
Sint iterations_left = MAX_SELECT_COUNT_ITERATIONS;
Sint chunk_size = 0;
scnt_context.p = p;
scnt_context.tb = &tbl->hash;
scnt_context.tid = tid;
scnt_context.hp = NULL;
scnt_context.prev_continuation_tptr = NULL;
return match_traverse(
scnt_context.p, scnt_context.tb,
pattern, NULL,
chunk_size, iterations_left, NULL, 0,
mtraversal_select_count_on_nothing_can_match,
mtraversal_select_count_on_match_res,
mtraversal_select_count_on_loop_ended,
mtraversal_select_count_on_trap,
&scnt_context, ret);
}
/*
* This is called when select_count traps
*/
static int db_select_count_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) {
mtraversal_select_count_context_t scnt_context = {0};
Eterm* tptr = NULL;
Eterm tid = NIL;
Binary* mp = NULL;
Sint got = 0;
Sint slot_ix = 0;
Sint chunk_size = 0;
*ret = NIL;
if (unpack_simple_mtraversal_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
return DB_ERROR_BADPARAM;
}
scnt_context.p = p;
scnt_context.tb = &tbl->hash;
scnt_context.tid = tid;
scnt_context.hp = NULL;
scnt_context.prev_continuation_tptr = tptr;
return match_traverse_continue(
scnt_context.p, scnt_context.tb, chunk_size,
MAX_SELECT_COUNT_ITERATIONS,
NULL, slot_ix, got, &mp, 0,
mtraversal_select_count_on_match_res, /* Reuse callback */
mtraversal_select_count_on_loop_ended, /* Reuse callback */
mtraversal_select_count_on_trap, /* Reuse callback */
&scnt_context, ret);
}
#undef MAX_SELECT_COUNT_ITERATIONS
/*
*
* select_delete match traversal
*
*/
#define MAX_SELECT_DELETE_ITERATIONS 1000
typedef struct {
Process* p;
DbTableHash* tb;
Eterm tid;
Eterm* hp;
Eterm* prev_continuation_tptr;
erts_aint_t fixated_by_me;
Uint last_pseudo_delete;
} mtraversal_select_delete_context_t;
static int mtraversal_select_delete_on_nothing_can_match(void* context_ptr, Eterm* ret) {
*ret = make_small(0);
return DB_ERROR_NONE;
}
static int mtraversal_select_delete_on_match_res(void* context_ptr, Sint slot_ix,
HashDbTerm*** current_ptr_ptr,
Eterm match_res)
{
HashDbTerm** current_ptr = *current_ptr_ptr;
mtraversal_select_delete_context_t* sd_context_ptr = (mtraversal_select_delete_context_t*) context_ptr;
HashDbTerm* del = NULL;
if (match_res != am_true)
return 0;
if (NFIXED(sd_context_ptr->tb) > sd_context_ptr->fixated_by_me) { /* fixated by others? */
if (slot_ix != sd_context_ptr->last_pseudo_delete) {
if (!add_fixed_deletion(sd_context_ptr->tb, slot_ix, sd_context_ptr->fixated_by_me))
goto do_erase;
sd_context_ptr->last_pseudo_delete = slot_ix;
}
(*current_ptr)->hvalue = INVALID_HASH;
}
else {
do_erase:
del = *current_ptr;
*current_ptr = (*current_ptr)->next; // replace pointer to term using next
free_term(sd_context_ptr->tb, del);
}
erts_smp_atomic_dec_nob(&sd_context_ptr->tb->common.nitems);
return 1;
}
static int mtraversal_select_delete_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got,
Sint iterations_left, Binary** mpp, Eterm* ret)
{
mtraversal_select_delete_context_t* sd_context_ptr = (mtraversal_select_delete_context_t*) context_ptr;
ASSERT(iterations_left <= MAX_SELECT_DELETE_ITERATIONS);
BUMP_REDS(sd_context_ptr->p, MAX_SELECT_DELETE_ITERATIONS - iterations_left);
if (got) {
try_shrink(sd_context_ptr->tb);
}
*ret = erts_make_integer(got, sd_context_ptr->p);
return DB_ERROR_NONE;
}
static int mtraversal_select_delete_on_trap(void* context_ptr, Sint slot_ix, Sint got,
Binary** mpp, Eterm* ret)
{
mtraversal_select_delete_context_t* sd_context_ptr = (mtraversal_select_delete_context_t*) context_ptr;
return on_mtraversal_simple_trap(
&ets_select_delete_continue_exp,
sd_context_ptr->p,
sd_context_ptr->tb,
sd_context_ptr->tid,
sd_context_ptr->prev_continuation_tptr,
slot_ix, got, mpp, ret);
}
static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) {
mtraversal_select_delete_context_t sd_context = {0};
Sint chunk_size = 0;
sd_context.p = p;
sd_context.tb = &tbl->hash;
sd_context.tid = tid;
sd_context.hp = NULL;
sd_context.prev_continuation_tptr = NULL;
#ifdef ERTS_SMP
sd_context.fixated_by_me = sd_context.tb->common.is_thread_safe ? 0 : 1; /* TODO: something nicer */
#else
sd_context.fixated_by_me = 0;
#endif
sd_context.last_pseudo_delete = (Uint) -1;
return match_traverse(
sd_context.p, sd_context.tb,
pattern, NULL,
chunk_size,
MAX_SELECT_DELETE_ITERATIONS, NULL, 1,
mtraversal_select_delete_on_nothing_can_match,
mtraversal_select_delete_on_match_res,
mtraversal_select_delete_on_loop_ended,
mtraversal_select_delete_on_trap,
&sd_context, ret);
}
/*
* This is called when select_delete traps
*/
static int db_select_delete_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) {
mtraversal_select_delete_context_t sd_context = {0};
Eterm* tptr = NULL;
Eterm tid = NIL;
Binary* mp = NULL;
Sint got = 0;
Sint slot_ix = 0;
Sint chunk_size = 0;
*ret = NIL;
if (unpack_simple_mtraversal_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
return DB_ERROR_BADPARAM;
}
sd_context.p = p;
sd_context.tb = &tbl->hash;
sd_context.tid = tid;
sd_context.hp = NULL;
sd_context.prev_continuation_tptr = tptr;
sd_context.fixated_by_me = ONLY_WRITER(p, sd_context.tb) ? 0 : 1; /* TODO: something nicer */
sd_context.last_pseudo_delete = (Uint) -1;
return match_traverse_continue(
sd_context.p, sd_context.tb, chunk_size,
MAX_SELECT_DELETE_ITERATIONS,
NULL, slot_ix, got, &mp, 1,
mtraversal_select_delete_on_match_res, /* Reuse callback */
mtraversal_select_delete_on_loop_ended, /* Reuse callback */
mtraversal_select_delete_on_trap, /* Reuse callback */
&sd_context, ret);
}
#undef MAX_SELECT_DELETE_ITERATIONS
/*
*
* select_replace match traversal
*
*/
#define MAX_SELECT_REPLACE_ITERATIONS 1000
typedef struct {
Process* p;
DbTableHash* tb;
Eterm tid;
Eterm* hp;
Eterm* prev_continuation_tptr;
} mtraversal_select_replace_context_t;
static int mtraversal_select_replace_on_nothing_can_match(void* context_ptr, Eterm* ret) {
*ret = make_small(0);
return DB_ERROR_NONE;
}
static int mtraversal_select_replace_on_match_res(void* context_ptr, Sint slot_ix,
HashDbTerm*** current_ptr_ptr,
Eterm match_res)
{
mtraversal_select_replace_context_t* sr_context_ptr = (mtraversal_select_replace_context_t*) context_ptr;
DbTableHash* tb = sr_context_ptr->tb;
#ifdef DEBUG
Eterm key = NIL;
#endif
HashDbTerm* new = NULL;
HashDbTerm* next = NULL;
HashValue hval = INVALID_HASH;
if (is_value(match_res)) {
#ifdef DEBUG
ASSERT(is_value(key = db_getkey(tb->common.keypos, match_res)));
ASSERT(eq(key, GETKEY(tb, (**current_ptr_ptr)->dbterm.tpl)));
#endif
next = (**current_ptr_ptr)->next;
hval = (**current_ptr_ptr)->hvalue;
new = replace_dbterm(tb, **current_ptr_ptr, match_res);
new->next = next;
new->hvalue = hval;
**current_ptr_ptr = new; /* replace 'next' pointer in previous object */
*current_ptr_ptr = &((**current_ptr_ptr)->next); /* advance to next object */
return 1;
}
return 0;
}
static int mtraversal_select_replace_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got,
Sint iterations_left, Binary** mpp, Eterm* ret)
{
mtraversal_select_replace_context_t* sr_context_ptr = (mtraversal_select_replace_context_t*) context_ptr;
ASSERT(iterations_left <= MAX_SELECT_REPLACE_ITERATIONS);
/* the more objects we've replaced, the more reductions we've consumed */
BUMP_REDS(sr_context_ptr->p,
MIN(MAX_SELECT_REPLACE_ITERATIONS * 2,
(MAX_SELECT_REPLACE_ITERATIONS - iterations_left) + (int)got));
*ret = erts_make_integer(got, sr_context_ptr->p);
return DB_ERROR_NONE;
}
static int mtraversal_select_replace_on_trap(void* context_ptr, Sint slot_ix, Sint got,
Binary** mpp, Eterm* ret)
{
mtraversal_select_replace_context_t* sr_context_ptr = (mtraversal_select_replace_context_t*) context_ptr;
return on_mtraversal_simple_trap(
&ets_select_replace_continue_exp,
sr_context_ptr->p,
sr_context_ptr->tb,
sr_context_ptr->tid,
sr_context_ptr->prev_continuation_tptr,
slot_ix, got, mpp, ret);
}
static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm pattern, Eterm *ret)
{
mtraversal_select_replace_context_t sr_context = {0};
Sint chunk_size = 0;
/* Bag implementation presented both semantic consistency and performance issues,
* unsupported for now
*/
ASSERT(!(tbl->hash.common.status & DB_BAG));
sr_context.p = p;
sr_context.tb = &tbl->hash;
sr_context.tid = NIL; // TODO
sr_context.hp = NULL;
sr_context.prev_continuation_tptr = NULL;
return match_traverse(
sr_context.p, sr_context.tb,
pattern, db_match_keeps_key,
chunk_size,
MAX_SELECT_REPLACE_ITERATIONS, NULL, 1,
mtraversal_select_replace_on_nothing_can_match,
mtraversal_select_replace_on_match_res,
mtraversal_select_replace_on_loop_ended,
mtraversal_select_replace_on_trap,
&sr_context, ret);
}
/*
* This is called when select_replace traps
*/
static int db_select_replace_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret)
{
mtraversal_select_replace_context_t sr_context = {0};
Eterm* tptr = NULL;
Eterm tid = NIL;
Binary* mp = NULL;
Sint got = 0;
Sint slot_ix = 0;
Sint chunk_size = 0;
*ret = NIL;
if (unpack_simple_mtraversal_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
return DB_ERROR_BADPARAM;
}
/* Proceed */
sr_context.p = p;
sr_context.tb = &tbl->hash;
sr_context.tid = tid;
sr_context.hp = NULL;
sr_context.prev_continuation_tptr = tptr;
return match_traverse_continue(
sr_context.p, sr_context.tb, chunk_size,
MAX_SELECT_REPLACE_ITERATIONS,
NULL, slot_ix, got, &mp, 1,
mtraversal_select_replace_on_match_res, /* Reuse callback */
mtraversal_select_replace_on_loop_ended, /* Reuse callback */
mtraversal_select_replace_on_trap, /* Reuse callback */
&sr_context, ret);
}
static int db_take_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
HashDbTerm **bp, *b;
HashValue hval = MAKE_HASH(key);
erts_smp_rwmtx_t *lck = WLOCK_HASH(tb, hval);
int ix = hash_to_ix(tb, hval);
int nitems_diff = 0;
*ret = NIL;
for (bp = &BUCKET(tb, ix), b = *bp; b; bp = &b->next, b = b->next) {
if (has_live_key(tb, b, key, hval)) {
HashDbTerm *bend;
*ret = get_term_list(p, tb, key, hval, b, &bend);
while (b != bend) {
--nitems_diff;
if (nitems_diff == -1 && IS_FIXED(tb)
&& add_fixed_deletion(tb, ix, 0)) {
/* Pseudo remove (no need to keep several of same key) */
bp = &b->next;
b->hvalue = INVALID_HASH;
b = b->next;
} else {
*bp = b->next;
free_term(tb, b);
b = *bp;
}
}
break;
}
}
WUNLOCK_HASH(lck);
if (nitems_diff) {
erts_smp_atomic_add_nob(&tb->common.nitems, nitems_diff);
try_shrink(tb);
}
return DB_ERROR_NONE;
}
/*
** Other interface routines (not directly coupled to one bif)
*/
void db_initialize_hash(void)
{
}
int db_mark_all_deleted_hash(DbTable *tbl)
{
DbTableHash *tb = &tbl->hash;
HashDbTerm* list;
int i;
ERTS_SMP_LC_ASSERT(IS_TAB_WLOCKED(tb));
for (i = 0; i < NACTIVE(tb); i++) {
if ((list = BUCKET(tb,i)) != NULL) {
add_fixed_deletion(tb, i, 0);
do {
list->hvalue = INVALID_HASH;
list = list->next;
}while(list != NULL);
}
}
erts_smp_atomic_set_nob(&tb->common.nitems, 0);
return DB_ERROR_NONE;
}
/* Display hash table contents (for dump) */
static void db_print_hash(fmtfn_t to, void *to_arg, int show, DbTable *tbl)
{
DbTableHash *tb = &tbl->hash;
DbHashStats stats;
int i;
erts_print(to, to_arg, "Buckets: %d\n", NACTIVE(tb));
#ifdef ERTS_SMP
i = tbl->common.is_thread_safe;
/* If crash dumping we set table to thread safe in order to
avoid taking any locks */
if (ERTS_IS_CRASH_DUMPING)
tbl->common.is_thread_safe = 1;
db_calc_stats_hash(&tbl->hash, &stats);
tbl->common.is_thread_safe = i;
#else
db_calc_stats_hash(&tbl->hash, &stats);
#endif
erts_print(to, to_arg, "Chain Length Avg: %f\n", stats.avg_chain_len);
erts_print(to, to_arg, "Chain Length Max: %d\n", stats.max_chain_len);
erts_print(to, to_arg, "Chain Length Min: %d\n", stats.min_chain_len);
erts_print(to, to_arg, "Chain Length Std Dev: %f\n",
stats.std_dev_chain_len);
erts_print(to, to_arg, "Chain Length Expected Std Dev: %f\n",
stats.std_dev_expected);
if (IS_FIXED(tb))
erts_print(to, to_arg, "Fixed: %d\n", stats.kept_items);
else
erts_print(to, to_arg, "Fixed: false\n");
if (show) {
for (i = 0; i < NACTIVE(tb); i++) {
HashDbTerm* list = BUCKET(tb,i);
if (list == NULL)
continue;
erts_print(to, to_arg, "%d: [", i);
while(list != 0) {
if (list->hvalue == INVALID_HASH)
erts_print(to, to_arg, "*");
if (tb->common.compress) {
Eterm key = GETKEY(tb, list->dbterm.tpl);
erts_print(to, to_arg, "key=%T", key);
}
else {
Eterm obj = make_tuple(list->dbterm.tpl);
erts_print(to, to_arg, "%T", obj);
}
if (list->next != 0)
erts_print(to, to_arg, ",");
list = list->next;
}
erts_print(to, to_arg, "]\n");
}
}
}
/* release all memory occupied by a single table */
static int db_free_table_hash(DbTable *tbl)
{
while (db_free_table_continue_hash(tbl, ERTS_SWORD_MAX) < 0)
;
return 0;
}
static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds)
{
DbTableHash *tb = &tbl->hash;
FixedDeletion* fixdel = (FixedDeletion*) erts_smp_atomic_read_acqb(&tb->fixdel);
ERTS_SMP_LC_ASSERT(IS_TAB_WLOCKED(tb) || (tb->common.status & DB_DELETE));
while (fixdel != NULL) {
FixedDeletion *fx = fixdel;
fixdel = fx->next;
erts_db_free(ERTS_ALC_T_DB_FIX_DEL,
(DbTable *) tb,
(void *) fx,
sizeof(FixedDeletion));
ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion));
if (--reds < 0) {
erts_smp_atomic_set_relb(&tb->fixdel, (erts_aint_t)fixdel);
return reds; /* Not done */
}
}
erts_smp_atomic_set_relb(&tb->fixdel, (erts_aint_t)NULL);
while(tb->nslots != 0) {
reds -= EXT_SEGSZ/64 + free_seg(tb, 1);
/*
* If we have done enough work, get out here.
*/
if (reds < 0) {
return reds; /* Not done */
}
}
#ifdef ERTS_SMP
if (tb->locks != NULL) {
int i;
for (i=0; i<DB_HASH_LOCK_CNT; ++i) {
erts_rwmtx_destroy(GET_LOCK(tb,i));
}
erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb,
(void*)tb->locks, sizeof(DbTableHashFineLocks));
tb->locks = NULL;
}
#endif
ASSERT(erts_smp_atomic_read_nob(&tb->common.memory_size) == sizeof(DbTable));
return reds; /* Done */
}
/*
** Utility routines. (static)
*/
/*
** For the select functions, analyzes the pattern and determines which
** slots should be searched. Also compiles the match program
*/
static int analyze_pattern(DbTableHash *tb, Eterm pattern,
extra_match_validator_t extra_validator, /* Optional callback */
struct mp_info *mpi)
{
Eterm *ptpl;
Eterm lst, tpl, ttpl;
Eterm *matches,*guards, *bodies;
Eterm sbuff[30];
Eterm *buff = sbuff;
Eterm key = NIL;
HashValue hval = NIL;
int num_heads = 0;
int i;
mpi->lists = mpi->dlists;
mpi->num_lists = 0;
mpi->key_given = 1;
mpi->something_can_match = 0;
mpi->all_objects = 1;
mpi->mp = NULL;
for (lst = pattern; is_list(lst); lst = CDR(list_val(lst)))
++num_heads;
if (lst != NIL) {/* proper list... */
return DB_ERROR_BADPARAM;
}
if (num_heads > 10) {
buff = erts_alloc(ERTS_ALC_T_DB_TMP, sizeof(Eterm) * num_heads * 3);
mpi->lists = erts_alloc(ERTS_ALC_T_DB_SEL_LIST,
sizeof(*(mpi->lists)) * num_heads);
}
matches = buff;
guards = buff + num_heads;
bodies = buff + (num_heads * 2);
i = 0;
for(lst = pattern; is_list(lst); lst = CDR(list_val(lst))) {
Eterm match = NIL;
Eterm guard = NIL;
Eterm body = NIL;
ttpl = CAR(list_val(lst));
if (!is_tuple(ttpl)) {
if (buff != sbuff) {
erts_free(ERTS_ALC_T_DB_TMP, buff);
}
return DB_ERROR_BADPARAM;
}
ptpl = tuple_val(ttpl);
if (ptpl[0] != make_arityval(3U)) {
if (buff != sbuff) {
erts_free(ERTS_ALC_T_DB_TMP, buff);
}
return DB_ERROR_BADPARAM;
}
matches[i] = match = tpl = ptpl[1];
guards[i] = guard = ptpl[2];
bodies[i] = body = ptpl[3];
if(extra_validator != NULL && !extra_validator(tb->common.keypos, match, guard, body)) {
if (buff != sbuff) {
erts_free(ERTS_ALC_T_DB_TMP, buff);
}
return DB_ERROR_BADPARAM;
}
if (!is_list(body) || CDR(list_val(body)) != NIL ||
CAR(list_val(body)) != am_DollarUnderscore) {
mpi->all_objects = 0;
}
++i;
if (!(mpi->key_given)) {
continue;
}
if (tpl == am_Underscore || db_is_variable(tpl) != -1) {
(mpi->key_given) = 0;
(mpi->something_can_match) = 1;
} else {
key = db_getkey(tb->common.keypos, tpl);
if (is_value(key)) {
if (!db_has_variable(key)) { /* Bound key */
int ix, search_slot;
HashDbTerm** bp;
erts_smp_rwmtx_t* lck;
hval = MAKE_HASH(key);
lck = RLOCK_HASH(tb,hval);
ix = hash_to_ix(tb, hval);
bp = &BUCKET(tb,ix);
if (lck == NULL) {
search_slot = search_list(tb,key,hval,*bp) != NULL;
} else {
/* No point to verify if key exist now as there may be
concurrent inserters/deleters anyway */
RUNLOCK_HASH(lck);
search_slot = 1;
}
if (search_slot) {
int j;
for (j=0; ; ++j) {
if (j == mpi->num_lists) {
mpi->lists[mpi->num_lists].bucket = bp;
mpi->lists[mpi->num_lists].ix = ix;
++mpi->num_lists;
break;
}
if (mpi->lists[j].bucket == bp) {
ASSERT(mpi->lists[j].ix == ix);
break;
}
ASSERT(mpi->lists[j].ix != ix);
}
mpi->something_can_match = 1;
}
} else {
mpi->key_given = 0;
mpi->something_can_match = 1;
}
}
}
}
/*
* It would be nice not to compile the match_spec if nothing could match,
* but then the select calls would not fail like they should on bad
* match specs that happen to specify non existent keys etc.
*/
if ((mpi->mp = db_match_compile(matches, guards, bodies,
num_heads, DCOMP_TABLE, NULL))
== NULL) {
if (buff != sbuff) {
erts_free(ERTS_ALC_T_DB_TMP, buff);
}
return DB_ERROR_BADPARAM;
}
if (buff != sbuff) {
erts_free(ERTS_ALC_T_DB_TMP, buff);
}
return DB_ERROR_NONE;
}
static struct ext_segtab* alloc_ext_segtab(DbTableHash* tb, unsigned seg_ix)
{
struct segment** old_segtab = SEGTAB(tb);
int nsegs = 0;
struct ext_segtab* est;
ASSERT(seg_ix >= NSEG_1);
switch (seg_ix) {
case NSEG_1: nsegs = NSEG_2; break;
default: nsegs = seg_ix + NSEG_INC; break;
}
ASSERT(nsegs > tb->nsegs);
est = (struct ext_segtab*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
(DbTable *) tb,
SIZEOF_EXT_SEGTAB(nsegs));
est->nsegs = nsegs;
est->prev_segtab = old_segtab;
est->prev_nsegs = tb->nsegs;
sys_memcpy(est->segtab, old_segtab, tb->nsegs*sizeof(struct segment*));
#ifdef DEBUG
sys_memset(&est->segtab[seg_ix], 0, (nsegs-seg_ix)*sizeof(struct segment*));
#endif
return est;
}
/* Extend table with one new segment
*/
static void alloc_seg(DbTableHash *tb)
{
int seg_ix = SLOT_IX_TO_SEG_IX(tb->nslots);
struct segment** segtab;
ASSERT(seg_ix > 0);
if (seg_ix == tb->nsegs) { /* New segtab needed */
struct ext_segtab* est = alloc_ext_segtab(tb, seg_ix);
SET_SEGTAB(tb, est->segtab);
tb->nsegs = est->nsegs;
}
ASSERT(seg_ix < tb->nsegs);
segtab = SEGTAB(tb);
segtab[seg_ix] = (struct segment*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
(DbTable *) tb,
SIZEOF_SEGMENT(EXT_SEGSZ));
sys_memset(segtab[seg_ix], 0, SIZEOF_SEGMENT(EXT_SEGSZ));
tb->nslots += EXT_SEGSZ;
}
#ifdef ERTS_SMP
static void dealloc_ext_segtab(void* lop_data)
{
struct ext_segtab* est = (struct ext_segtab*) lop_data;
erts_free(ERTS_ALC_T_DB_SEG, est);
}
#endif
/* Shrink table by freeing the top segment
** free_records: 1=free any records in segment, 0=assume segment is empty
*/
static int free_seg(DbTableHash *tb, int free_records)
{
const int seg_ix = SLOT_IX_TO_SEG_IX(tb->nslots) - 1;
struct segment** const segtab = SEGTAB(tb);
struct segment* const segp = segtab[seg_ix];
Uint seg_sz;
int nrecords = 0;
ASSERT(segp != NULL);
#ifndef DEBUG
if (free_records)
#endif
{
int i = (seg_ix == 0) ? FIRST_SEGSZ : EXT_SEGSZ;
while (i--) {
HashDbTerm* p = segp->buckets[i];
while(p != 0) {
HashDbTerm* nxt = p->next;
ASSERT(free_records); /* segment not empty as assumed? */
free_term(tb, p);
p = nxt;
++nrecords;
}
}
}
if (seg_ix >= NSEG_1) {
struct ext_segtab* est = ErtsContainerStruct_(segtab,struct ext_segtab,segtab);
if (seg_ix == est->prev_nsegs) { /* Dealloc extended segtab */
ASSERT(est->prev_segtab != NULL);
SET_SEGTAB(tb, est->prev_segtab);
tb->nsegs = est->prev_nsegs;
#ifdef ERTS_SMP
if (!tb->common.is_thread_safe) {
/*
* Table is doing a graceful shrink operation and we must avoid
* deallocating this segtab while it may still be read by other
* threads. Schedule deallocation with thread progress to make
* sure no lingering threads are still hanging in BUCKET macro
* with an old segtab pointer.
*/
Uint sz = SIZEOF_EXT_SEGTAB(est->nsegs);
ASSERT(sz == ERTS_ALC_DBG_BLK_SZ(est));
ERTS_DB_ALC_MEM_UPDATE_(tb, sz, 0);
erts_schedule_thr_prgr_later_cleanup_op(dealloc_ext_segtab,
est,
&est->lop,
sz);
}
else
#endif
erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable*)tb, est,
SIZEOF_EXT_SEGTAB(est->nsegs));
}
}
seg_sz = (seg_ix == 0) ? FIRST_SEGSZ : EXT_SEGSZ;
erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb, segp, SIZEOF_SEGMENT(seg_sz));
#ifdef DEBUG
if (seg_ix < tb->nsegs)
SEGTAB(tb)[seg_ix] = NULL;
#endif
tb->nslots -= seg_sz;
ASSERT(tb->nslots >= 0);
return nrecords;
}
/*
** Copy terms from ptr1 until ptr2
** works for ptr1 == ptr2 == 0 => []
** or ptr2 == 0
** sz is either precalculated heap size or 0 if not known
*/
static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2,
Uint sz, DbTableHash* tb)
{
HashDbTerm* ptr;
Eterm list = NIL;
Eterm copy;
Eterm *hp, *hend;
if (!sz) {
ptr = ptr1;
while(ptr != ptr2) {
if (ptr->hvalue != INVALID_HASH)
sz += ptr->dbterm.size + 2;
ptr = ptr->next;
}
}
hp = HAlloc(p, sz);
hend = hp + sz;
ptr = ptr1;
while(ptr != ptr2) {
if (ptr->hvalue != INVALID_HASH) {
copy = db_copy_object_from_ets(&tb->common, &ptr->dbterm, &hp, &MSO(p));
list = CONS(hp, copy, list);
hp += 2;
}
ptr = ptr->next;
}
HRelease(p,hend,hp);
return list;
}
static ERTS_INLINE int
begin_resizing(DbTableHash* tb)
{
#ifdef ERTS_SMP
if (DB_USING_FINE_LOCKING(tb))
return !erts_atomic_xchg_acqb(&tb->is_resizing, 1);
else
ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock));
#endif
return 1;
}
static ERTS_INLINE void
done_resizing(DbTableHash* tb)
{
#ifdef ERTS_SMP
if (DB_USING_FINE_LOCKING(tb))
erts_atomic_set_relb(&tb->is_resizing, 0);
#endif
}
/* Grow table with one or more new buckets.
** Allocate new segment if needed.
*/
static void grow(DbTableHash* tb, int nitems)
{
HashDbTerm** pnext;
HashDbTerm** to_pnext;
HashDbTerm* p;
erts_smp_rwmtx_t* lck;
int nactive;
int from_ix, to_ix;
int szm;
int loop_limit = 5;
do {
if (!begin_resizing(tb))
return; /* already in progress */
nactive = NACTIVE(tb);
if (nitems <= GROW_LIMIT(nactive)) {
goto abort; /* already done (race) */
}
/* Ensure that the slot nactive exists */
if (nactive == tb->nslots) {
/* Time to get a new segment */
ASSERT(((nactive-FIRST_SEGSZ) & EXT_SEGSZ_MASK) == 0);
alloc_seg(tb);
}
ASSERT(nactive < tb->nslots);
szm = erts_smp_atomic_read_nob(&tb->szm);
if (nactive <= szm) {
from_ix = nactive & (szm >> 1);
} else {
ASSERT(nactive == szm+1);
from_ix = 0;
szm = (szm<<1) | 1;
}
to_ix = nactive;
lck = WLOCK_HASH(tb, from_ix);
ERTS_SMP_ASSERT(lck == GET_LOCK_MAYBE(tb,to_ix));
/* Now a final double check (with the from_ix lock held)
* that we did not get raced by a table fixer.
*/
if (IS_FIXED(tb)) {
WUNLOCK_HASH(lck);
goto abort;
}
erts_smp_atomic_set_nob(&tb->nactive, ++nactive);
if (from_ix == 0) {
if (DB_USING_FINE_LOCKING(tb))
erts_smp_atomic_set_relb(&tb->szm, szm);
else
erts_smp_atomic_set_nob(&tb->szm, szm);
}
done_resizing(tb);
/* Finally, let's split the bucket. We try to do it in a smart way
to keep link order and avoid unnecessary updates of next-pointers */
pnext = &BUCKET(tb, from_ix);
p = *pnext;
to_pnext = &BUCKET(tb, to_ix);
while (p != NULL) {
if (p->hvalue == INVALID_HASH) { /* rare but possible with fine locking */
*pnext = p->next;
free_term(tb, p);
p = *pnext;
}
else {
int ix = p->hvalue & szm;
if (ix != from_ix) {
ASSERT(ix == (from_ix ^ ((szm+1)>>1)));
*to_pnext = p;
/* Swap "from" and "to": */
from_ix = ix;
to_pnext = pnext;
}
pnext = &p->next;
p = *pnext;
}
}
*to_pnext = NULL;
WUNLOCK_HASH(lck);
}while (--loop_limit && nitems > GROW_LIMIT(nactive));
return;
abort:
done_resizing(tb);
}
/* Shrink table by joining top bucket.
** Remove top segment if it gets empty.
*/
static void shrink(DbTableHash* tb, int nitems)
{
HashDbTerm** src_bp;
HashDbTerm** dst_bp;
HashDbTerm** bp;
erts_smp_rwmtx_t* lck;
int src_ix, dst_ix, low_szm;
int nactive;
int loop_limit = 5;
do {
if (!begin_resizing(tb))
return; /* already in progress */
nactive = NACTIVE(tb);
if (!(nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive))) {
goto abort; /* already done (race) */
}
src_ix = nactive - 1;
low_szm = erts_smp_atomic_read_nob(&tb->szm) >> 1;
dst_ix = src_ix & low_szm;
ASSERT(dst_ix < src_ix);
ASSERT(nactive > FIRST_SEGSZ);
lck = WLOCK_HASH(tb, dst_ix);
ERTS_SMP_ASSERT(lck == GET_LOCK_MAYBE(tb,src_ix));
/* Double check for racing table fixers */
if (IS_FIXED(tb)) {
WUNLOCK_HASH(lck);
goto abort;
}
src_bp = &BUCKET(tb, src_ix);
dst_bp = &BUCKET(tb, dst_ix);
bp = src_bp;
/*
* We join lists by appending "dst" at the end of "src"
* as we must step through "src" anyway to purge pseudo deleted.
*/
while(*bp != NULL) {
if ((*bp)->hvalue == INVALID_HASH) {
HashDbTerm* deleted = *bp;
*bp = deleted->next;
free_term(tb, deleted);
} else {
bp = &(*bp)->next;
}
}
*bp = *dst_bp;
*dst_bp = *src_bp;
*src_bp = NULL;
nactive = src_ix;
erts_smp_atomic_set_nob(&tb->nactive, nactive);
if (dst_ix == 0) {
erts_smp_atomic_set_relb(&tb->szm, low_szm);
}
WUNLOCK_HASH(lck);
if (tb->nslots - src_ix >= EXT_SEGSZ) {
free_seg(tb, 0);
}
done_resizing(tb);
} while (--loop_limit
&& nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive));
return;
abort:
done_resizing(tb);
}
/* Search a list of tuples for a matching key */
static HashDbTerm* search_list(DbTableHash* tb, Eterm key,
HashValue hval, HashDbTerm *list)
{
while (list != 0) {
if (has_live_key(tb,list,key,hval))
return list;
list = list->next;
}
return 0;
}
/* This function is called by the next AND the select BIF */
/* It return the next live object in a table, NULL if no more */
/* In-bucket: RLOCKED */
/* Out-bucket: RLOCKED unless NULL */
static HashDbTerm* next(DbTableHash *tb, Uint *iptr, erts_smp_rwmtx_t** lck_ptr,
HashDbTerm *list)
{
int i;
ERTS_SMP_LC_ASSERT(IS_HASH_RLOCKED(tb,*iptr));
for (list = list->next; list != NULL; list = list->next) {
if (list->hvalue != INVALID_HASH)
return list;
}
i = *iptr;
while ((i=next_slot(tb, i, lck_ptr)) != 0) {
list = BUCKET(tb,i);
while (list != NULL) {
if (list->hvalue != INVALID_HASH) {
*iptr = i;
return list;
}
list = list->next;
}
}
/* *iptr = ??? */
return NULL;
}
static int
db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj,
DbUpdateHandle* handle)
{
DbTableHash *tb = &tbl->hash;
HashValue hval;
HashDbTerm **bp, *b;
erts_smp_rwmtx_t* lck;
int flags = 0;
ASSERT(tb->common.status & DB_SET);
hval = MAKE_HASH(key);
lck = WLOCK_HASH(tb, hval);
bp = &BUCKET(tb, hash_to_ix(tb, hval));
b = *bp;
for (;;) {
if (b == NULL) {
break;
}
if (has_key(tb, b, key, hval)) {
if (b->hvalue != INVALID_HASH) {
goto Ldone;
}
break;
}
bp = &b->next;
b = *bp;
}
if (obj == THE_NON_VALUE) {
WUNLOCK_HASH(lck);
return 0;
}
{
Eterm *objp = tuple_val(obj);
int arity = arityval(*objp);
Eterm *htop, *hend;
ASSERT(arity >= tb->common.keypos);
htop = HAlloc(p, arity + 1);
hend = htop + arity + 1;
sys_memcpy(htop, objp, sizeof(Eterm) * (arity + 1));
htop[tb->common.keypos] = key;
obj = make_tuple(htop);
if (b == NULL) {
HashDbTerm *q = new_dbterm(tb, obj);
q->hvalue = hval;
q->next = NULL;
*bp = b = q;
flags |= DB_INC_TRY_GROW;
} else {
HashDbTerm *q, *next = b->next;
ASSERT(b->hvalue == INVALID_HASH);
q = replace_dbterm(tb, b, obj);
q->next = next;
q->hvalue = hval;
*bp = b = q;
erts_smp_atomic_inc_nob(&tb->common.nitems);
}
HRelease(p, hend, htop);
flags |= DB_NEW_OBJECT;
}
Ldone:
handle->tb = tbl;
handle->bp = (void **)bp;
handle->dbterm = &b->dbterm;
handle->flags = flags;
handle->new_size = b->dbterm.size;
handle->lck = lck;
return 1;
}
/* Must be called after call to db_lookup_dbterm
*/
static void
db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle)
{
DbTable* tbl = handle->tb;
DbTableHash *tb = &tbl->hash;
HashDbTerm **bp = (HashDbTerm **) handle->bp;
HashDbTerm *b = *bp;
erts_smp_rwmtx_t* lck = (erts_smp_rwmtx_t*) handle->lck;
HashDbTerm* free_me = NULL;
ERTS_SMP_LC_ASSERT(IS_HASH_WLOCKED(tb, lck)); /* locked by db_lookup_dbterm_hash */
ASSERT((&b->dbterm == handle->dbterm) == !(tb->common.compress && handle->flags & DB_MUST_RESIZE));
if (handle->flags & DB_NEW_OBJECT && cret != DB_ERROR_NONE) {
if (IS_FIXED(tb) && add_fixed_deletion(tb, hash_to_ix(tb, b->hvalue),
0)) {
b->hvalue = INVALID_HASH;
} else {
*bp = b->next;
free_me = b;
}
WUNLOCK_HASH(lck);
erts_smp_atomic_dec_nob(&tb->common.nitems);
try_shrink(tb);
} else {
if (handle->flags & DB_MUST_RESIZE) {
db_finalize_resize(handle, offsetof(HashDbTerm,dbterm));
free_me = b;
}
if (handle->flags & DB_INC_TRY_GROW) {
int nactive;
int nitems = erts_smp_atomic_inc_read_nob(&tb->common.nitems);
WUNLOCK_HASH(lck);
nactive = NACTIVE(tb);
if (nitems > GROW_LIMIT(nactive) && !IS_FIXED(tb)) {
grow(tb, nitems);
}
} else {
WUNLOCK_HASH(lck);
}
}
if (free_me)
free_term(tb, free_me);
#ifdef DEBUG
handle->dbterm = 0;
#endif
return;
}
static int db_delete_all_objects_hash(Process* p, DbTable* tbl)
{
if (IS_FIXED(tbl)) {
db_mark_all_deleted_hash(tbl);
} else {
db_free_table_hash(tbl);
db_create_hash(p, tbl);
erts_smp_atomic_set_nob(&tbl->hash.common.nitems, 0);
}
return 0;
}
void db_foreach_offheap_hash(DbTable *tbl,
void (*func)(ErlOffHeap *, void *),
void * arg)
{
DbTableHash *tb = &tbl->hash;
HashDbTerm* list;
int i;
int nactive = NACTIVE(tb);
for (i = 0; i < nactive; i++) {
list = BUCKET(tb,i);
while(list != 0) {
ErlOffHeap tmp_offheap;
tmp_offheap.first = list->dbterm.first_oh;
tmp_offheap.overhead = 0;
(*func)(&tmp_offheap, arg);
list->dbterm.first_oh = tmp_offheap.first;
list = list->next;
}
}
}
void db_calc_stats_hash(DbTableHash* tb, DbHashStats* stats)
{
HashDbTerm* b;
erts_smp_rwmtx_t* lck;
int sum = 0;
int sq_sum = 0;
int kept_items = 0;
int ix;
int len;
stats->min_chain_len = INT_MAX;
stats->max_chain_len = 0;
ix = 0;
lck = RLOCK_HASH(tb,ix);
do {
len = 0;
for (b = BUCKET(tb,ix); b!=NULL; b=b->next) {
len++;
if (b->hvalue == INVALID_HASH)
++kept_items;
}
sum += len;
sq_sum += len*len;
if (len < stats->min_chain_len) stats->min_chain_len = len;
if (len > stats->max_chain_len) stats->max_chain_len = len;
ix = next_slot(tb,ix,&lck);
}while (ix);
stats->avg_chain_len = (float)sum / NACTIVE(tb);
stats->std_dev_chain_len = sqrt((sq_sum - stats->avg_chain_len*sum) / NACTIVE(tb));
/* Expected standard deviation from a good uniform hash function,
ie binomial distribution (not taking the linear hashing into acount) */
stats->std_dev_expected = sqrt(stats->avg_chain_len * (1 - 1.0/NACTIVE(tb)));
stats->kept_items = kept_items;
}
/* For testing only */
Eterm erts_ets_hash_sizeof_ext_segtab(void)
{
return make_small(((SIZEOF_EXT_SEGTAB(0)-1) / sizeof(UWord)) + 1);
}