/*
 * %CopyrightBegin%
 *
 * Copyright Ericsson AB 1998-2018. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * %CopyrightEnd%
 */

/*
** Implementation of unordered ETS tables.
** The tables are implemented as linear dynamic hash tables.
** https://en.wikipedia.org/wiki/Linear_hashing
*/

/* SMP:
** The hash table supports two different locking "modes",
** coarse grained and fine grained locking.
**
** Coarse grained locking relies entirely on the caller (erl_db.c) to obtain
** the right kind of lock on the entire table depending on operation (reading
** or writing). No further locking is then done by the table itself.
**
** Fine grained locking is supported by this code to allow concurrent updates
** (and reading) to different parts of the table. This works by keeping one
** rw-mtx for every N'th bucket. Even dynamic growing and shrinking by
** rehashing buckets can be done without exclusive table lock.
**
** A table will support fine grained locking if it is created with flag
** DB_FINE_LOCKED set. The table variable is_thread_safe will then indicate
** if operations need to obtain fine grained locks or not. Some operations
** will for example always use exclusive table lock to guarantee
** a higher level of atomicity.
*/

/* FIXATION:
** Fixating the table, by ets:safe_fixtable or as done by select-operations,
** guarantees two things in current implementation.
** (1) Keys will not *totaly* disappear from the table. A key can thus be used
**     as an iterator to find the next key in iteration sequence. Note however
**     that this does not mean that (pointers to) table objects are guaranteed
**     to be maintained while the table is fixated. A BAG or DBAG may actually
**     remove objects as long as there is at least one object left in the table
**     with the same key (alive or pseudo-deleted). 
** (2) Objects will not be moved between buckets due to table grow/shrink.
**     This will guarantee that iterations do not miss keys or get double-hits.
**
** With fine grained locking, a concurrent thread can fixate the table at any
** time. A "dangerous" operation (delete or move) therefore needs to check
** if the table is fixated while write-locking the bucket.
*/

/*
#ifdef DEBUG
#define HARDDEBUG 1
#endif
*/

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "sys.h"
#include "erl_vm.h"
#include "global.h"
#include "erl_process.h"
#include "error.h"
#define ERTS_WANT_DB_INTERNAL__
#include "erl_db.h"
#include "bif.h"
#include "big.h"
#include "export.h"
#include "erl_binary.h"

#include "erl_db_hash.h"

/* 
 * The following symbols can be manipulated to "tune" the linear hash array 
 */
#define GROW_LIMIT(NACTIVE) ((NACTIVE)*1)
#define SHRINK_LIMIT(NACTIVE) ((NACTIVE) / 2)

/*
** We want the first mandatory segment to be small (to reduce minimal footprint)
** and larger extra segments (to reduce number of alloc/free calls).
*/

/* Number of slots in first segment */
#define FIRST_SEGSZ_EXP  8
#define FIRST_SEGSZ      (1 << FIRST_SEGSZ_EXP)
#define FIRST_SEGSZ_MASK (FIRST_SEGSZ - 1)

/* Number of slots per extra segment */
#define EXT_SEGSZ_EXP  11
#define EXT_SEGSZ   (1 << EXT_SEGSZ_EXP)
#define EXT_SEGSZ_MASK (EXT_SEGSZ-1)

#define NSEG_1   (ErtsSizeofMember(DbTableHash,first_segtab) / sizeof(struct segment*))
#define NSEG_2   256 /* Size of second segment table */
#define NSEG_INC 128 /* Number of segments to grow after that */

#  define DB_USING_FINE_LOCKING(TB) (((TB))->common.type & DB_FINE_LOCKED)

#ifdef ETHR_ORDERED_READ_DEPEND
#define SEGTAB(tb) ((struct segment**) erts_atomic_read_nob(&(tb)->segtab))
#else
#define SEGTAB(tb)							\
    (DB_USING_FINE_LOCKING(tb)						\
     ? ((struct segment**) erts_atomic_read_ddrb(&(tb)->segtab))	\
     : ((struct segment**) erts_atomic_read_nob(&(tb)->segtab)))
#endif
#define NACTIVE(tb) ((int)erts_atomic_read_nob(&(tb)->nactive))
#define NITEMS(tb) ((int)erts_atomic_read_nob(&(tb)->common.nitems))

#define SLOT_IX_TO_SEG_IX(i) (((i)+(EXT_SEGSZ-FIRST_SEGSZ)) >> EXT_SEGSZ_EXP)

#define BUCKET(tb, i) SEGTAB(tb)[SLOT_IX_TO_SEG_IX(i)]->buckets[(i) & EXT_SEGSZ_MASK]

/*
 * When deleting a table, the number of records to delete.
 * Approximate number, because we must delete entire buckets.
 */
#define DELETE_RECORD_LIMIT 10000

/* Calculate slot index from hash value.
** RLOCK_HASH or WLOCK_HASH must be done before.
*/
static ERTS_INLINE Uint hash_to_ix(DbTableHash* tb, HashValue hval)
{
    Uint mask = (DB_USING_FINE_LOCKING(tb)
		 ? erts_atomic_read_acqb(&tb->szm)
		 : erts_atomic_read_nob(&tb->szm));
    Uint ix = hval & mask; 
    if (ix >= erts_atomic_read_nob(&tb->nactive)) {
	ix &= mask>>1;
	ASSERT(ix < erts_atomic_read_nob(&tb->nactive));
    }
    return ix;
}


static ERTS_INLINE FixedDeletion* alloc_fixdel(DbTableHash* tb)
{
    FixedDeletion* fixd = (FixedDeletion*) erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL,
                                                         (DbTable *) tb,
                                                         sizeof(FixedDeletion));
    ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion));
    return fixd;
}

static ERTS_INLINE void free_fixdel(DbTableHash* tb, FixedDeletion* fixd)
{
    erts_db_free(ERTS_ALC_T_DB_FIX_DEL, (DbTable*)tb,
                 fixd, sizeof(FixedDeletion));
    ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion));
}

static ERTS_INLINE int link_fixdel(DbTableHash* tb,
                                   FixedDeletion* fixd,
                                   erts_aint_t fixated_by_me)
{
    erts_aint_t was_next;
    erts_aint_t exp_next;

    was_next = erts_atomic_read_acqb(&tb->fixdel);
    do { /* Lockless atomic insertion in linked list: */
        if (NFIXED(tb) <= fixated_by_me) {
            free_fixdel(tb, fixd);
            return 0; /* raced by unfixer */
        }
        exp_next = was_next;
	fixd->next = (FixedDeletion*) exp_next;
	was_next = erts_atomic_cmpxchg_mb(&tb->fixdel,
                                              (erts_aint_t) fixd,
                                              exp_next);
    }while (was_next != exp_next);
    return 1;
}

/* Remember a slot containing a pseudo-deleted item
 * Return false if we got raced by unfixing thread
 * and the object should be deleted for real.
 */
static int add_fixed_deletion(DbTableHash* tb, int ix,
                              erts_aint_t fixated_by_me)
{
    FixedDeletion* fixd = alloc_fixdel(tb);
    fixd->slot = ix;
    fixd->all = 0;
    return link_fixdel(tb, fixd, fixated_by_me);
}


static ERTS_INLINE int is_pseudo_deleted(HashDbTerm* p)
{
    return p->pseudo_deleted;
}


/* optimised version of make_hash (normal case? atomic key) */
#define MAKE_HASH(term) \
    ((is_atom(term) ? (atom_tab(atom_val(term))->slot.bucket.hvalue) : \
      make_internal_hash(term, 0)) & MAX_HASH_MASK)

#  define DB_HASH_LOCK_MASK (DB_HASH_LOCK_CNT-1)
#  define GET_LOCK(tb,hval) (&(tb)->locks->lck_vec[(hval) & DB_HASH_LOCK_MASK].lck)
#  define GET_LOCK_MAYBE(tb,hval) ((tb)->common.is_thread_safe ? NULL : GET_LOCK(tb,hval))

/* Fine grained read lock */
static ERTS_INLINE erts_rwmtx_t* RLOCK_HASH(DbTableHash* tb, HashValue hval)
{
    if (tb->common.is_thread_safe) {
	return NULL;
    } else {
	erts_rwmtx_t* lck = GET_LOCK(tb,hval);
	ASSERT(tb->common.type & DB_FINE_LOCKED);
	erts_rwmtx_rlock(lck);
	return lck;
    }
}
/* Fine grained write lock */
static ERTS_INLINE erts_rwmtx_t* WLOCK_HASH(DbTableHash* tb, HashValue hval)
{
    if (tb->common.is_thread_safe) {
	return NULL;
    } else {
	erts_rwmtx_t* lck = GET_LOCK(tb,hval);
	ASSERT(tb->common.type & DB_FINE_LOCKED);
	erts_rwmtx_rwlock(lck);
	return lck;
    }
}

static ERTS_INLINE void RUNLOCK_HASH(erts_rwmtx_t* lck)
{
    if (lck != NULL) {
	erts_rwmtx_runlock(lck);
    }
}

static ERTS_INLINE void WUNLOCK_HASH(erts_rwmtx_t* lck)
{
    if (lck != NULL) {
	erts_rwmtx_rwunlock(lck);
    }
}


#ifdef ERTS_ENABLE_LOCK_CHECK
#  define IFN_EXCL(tb,cmd) (((tb)->common.is_thread_safe) || (cmd))
#  define IS_HASH_RLOCKED(tb,hval) IFN_EXCL(tb,erts_lc_rwmtx_is_rlocked(GET_LOCK(tb,hval)))
#  define IS_HASH_WLOCKED(tb,lck) IFN_EXCL(tb,erts_lc_rwmtx_is_rwlocked(lck))
#  define IS_TAB_WLOCKED(tb) erts_lc_rwmtx_is_rwlocked(&(tb)->common.rwlock)
#else
#  define IS_HASH_RLOCKED(tb,hval) (1)
#  define IS_HASH_WLOCKED(tb,hval) (1)
#  define IS_TAB_WLOCKED(tb) (1)
#endif


/* Iteration helper
** Returns "next" slot index or 0 if EOT reached.
** Slot READ locks updated accordingly, unlocked if EOT.
*/
static ERTS_INLINE Sint next_slot(DbTableHash* tb, Uint ix,
				  erts_rwmtx_t** lck_ptr)
{
    ix += DB_HASH_LOCK_CNT;
    if (ix < NACTIVE(tb)) return ix;
    RUNLOCK_HASH(*lck_ptr);
    ix = (ix + 1) & DB_HASH_LOCK_MASK;
    if (ix != 0) *lck_ptr = RLOCK_HASH(tb,ix);
    return ix;
}
/* Same as next_slot but with WRITE locking */
static ERTS_INLINE Sint next_slot_w(DbTableHash* tb, Uint ix,
				    erts_rwmtx_t** lck_ptr)
{
    ix += DB_HASH_LOCK_CNT;
    if (ix < NACTIVE(tb)) return ix;
    WUNLOCK_HASH(*lck_ptr);
    ix = (ix + 1) & DB_HASH_LOCK_MASK;
    if (ix != 0) *lck_ptr = WLOCK_HASH(tb,ix);
    return ix;
}



static ERTS_INLINE void free_term(DbTableHash *tb, HashDbTerm* p)
{
    db_free_term((DbTable*)tb, p, offsetof(HashDbTerm, dbterm));
}

static ERTS_INLINE void free_term_list(DbTableHash *tb, HashDbTerm* p)
{
    while (p) {
        HashDbTerm* next = p->next;
        free_term(tb, p);
        p = next;
    }
}


/*
 * Local types 
 */
struct mp_prefound {
    HashDbTerm** bucket;
    int ix;
};

struct mp_info {
    int something_can_match;	/* The match_spec is not "impossible" */
    int key_given;
    struct mp_prefound dlists[10];  /* Default list of "pre-found" buckets */
    struct mp_prefound* lists;   /* Buckets to search if keys are given, 
				  * = dlists initially */
    unsigned num_lists;         /* Number of elements in "lists",
				 * = 0 initially */
    Binary *mp;                 /* The compiled match program */
};

/* A table segment */
struct segment {
    HashDbTerm* buckets[1];
};
#define SIZEOF_SEGMENT(N) \
    (offsetof(struct segment,buckets) + sizeof(HashDbTerm*)*(N))

/* An extended segment table */
struct ext_segtab {
    ErtsThrPrgrLaterOp lop;
    struct segment** prev_segtab;  /* Used when table is shrinking */
    int prev_nsegs;                /* Size of prev_segtab */
    int nsegs;                     /* Size of this segtab */
    struct segment* segtab[1];     /* The segment table */
};
#define SIZEOF_EXT_SEGTAB(NSEGS) \
    (offsetof(struct ext_segtab,segtab) + sizeof(struct segment*)*(NSEGS))


static ERTS_INLINE void SET_SEGTAB(DbTableHash* tb,
				   struct segment** segtab)
{
    if (DB_USING_FINE_LOCKING(tb))
	erts_atomic_set_wb(&tb->segtab, (erts_aint_t) segtab);
    else
	erts_atomic_set_nob(&tb->segtab, (erts_aint_t) segtab);
}

/* Used by select_replace on analyze_pattern */
typedef int (*extra_match_validator_t)(int keypos, Eterm match, Eterm guard, Eterm body);

/*
** Forward decl's (static functions)
*/
static struct ext_segtab* alloc_ext_segtab(DbTableHash* tb, unsigned seg_ix);
static void alloc_seg(DbTableHash *tb);
static int free_seg(DbTableHash *tb, int free_records);
static HashDbTerm* next_live(DbTableHash *tb, Uint *iptr, erts_rwmtx_t** lck_ptr,
			     HashDbTerm *list);
static HashDbTerm* search_list(DbTableHash* tb, Eterm key, 
			       HashValue hval, HashDbTerm *list);
static void shrink(DbTableHash* tb, int nitems);
static void grow(DbTableHash* tb, int nitems);
static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2,
			   Uint sz, DbTableHash*);
static int analyze_pattern(DbTableHash *tb, Eterm pattern,
                           extra_match_validator_t extra_validator, /* Optional callback */
                           struct mp_info *mpi);

/*
 *  Method interface functions
 */
static int db_first_hash(Process *p, 
			 DbTable *tbl, 
			 Eterm *ret);

static int db_next_hash(Process *p, 
			DbTable *tbl, 
			Eterm key,
			Eterm *ret);

static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret);

static int db_get_element_hash(Process *p, DbTable *tbl, 
			       Eterm key, int ndex, Eterm *ret);

static int db_erase_object_hash(DbTable *tbl, Eterm object,Eterm *ret);

static int db_slot_hash(Process *p, DbTable *tbl, 
			Eterm slot_term, Eterm *ret);

static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid,
				Eterm pattern, Sint chunk_size,
				int reverse, Eterm *ret);
static int db_select_hash(Process *p, DbTable *tbl, Eterm tid,
			  Eterm pattern, int reverse, Eterm *ret);
static int db_select_continue_hash(Process *p, DbTable *tbl,
				   Eterm continuation, Eterm *ret);

static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid,
				Eterm pattern, Eterm *ret);
static int db_select_count_continue_hash(Process *p, DbTable *tbl,
					 Eterm continuation, Eterm *ret);

static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid,
				 Eterm pattern, Eterm *ret);
static int db_select_delete_continue_hash(Process *p, DbTable *tbl,
					  Eterm continuation, Eterm *ret);

static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid,
                                  Eterm pattern, Eterm *ret);
static int db_select_replace_continue_hash(Process *p, DbTable *tbl,
                                           Eterm continuation, Eterm *ret);

static int db_take_hash(Process *, DbTable *, Eterm, Eterm *);
static void db_print_hash(fmtfn_t to,
			  void *to_arg,
			  int show,
			  DbTable *tbl);
static int db_free_empty_table_hash(DbTable *tbl);

static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds);


static void db_foreach_offheap_hash(DbTable *,
				    void (*)(ErlOffHeap *, void *),
				    void *);

static SWord db_delete_all_objects_hash(Process* p, DbTable* tbl, SWord reds);
#ifdef HARDDEBUG
static void db_check_table_hash(DbTableHash *tb);
#endif
static int
db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj,
                      DbUpdateHandle* handle);
static void
db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle);

static ERTS_INLINE void try_shrink(DbTableHash* tb)
{
    int nactive = NACTIVE(tb);
    int nitems = NITEMS(tb);
    if (nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive)
	&& !IS_FIXED(tb)) {
	shrink(tb, nitems);
    }
}	

/* Is this a live object (not pseodo-deleted) with the specified key? 
*/
static ERTS_INLINE int has_live_key(DbTableHash* tb, HashDbTerm* b,
				    Eterm key, HashValue hval)
{
    if (b->hvalue != hval || is_pseudo_deleted(b))
        return 0;
    else {
	Eterm itemKey = GETKEY(tb, b->dbterm.tpl);
	ASSERT(!is_header(itemKey));
	return EQ(key, itemKey);
    }
}

/* Has this object the specified key? Can be pseudo-deleted.
*/
static ERTS_INLINE int has_key(DbTableHash* tb, HashDbTerm* b,
			       Eterm key, HashValue hval)
{
    if (b->hvalue != hval)
        return 0;
    else {
	Eterm itemKey = GETKEY(tb, b->dbterm.tpl);
	ASSERT(!is_header(itemKey));
	return EQ(key, itemKey);
    }
}

static ERTS_INLINE HashDbTerm* new_dbterm(DbTableHash* tb, Eterm obj)
{
    HashDbTerm* p;
    if (tb->common.compress) {
	p = db_store_term_comp(&tb->common, NULL, offsetof(HashDbTerm,dbterm), obj);
    }
    else {
	p = db_store_term(&tb->common, NULL, offsetof(HashDbTerm,dbterm), obj);
    }
    return p;
}

static ERTS_INLINE HashDbTerm* replace_dbterm(DbTableHash* tb, HashDbTerm* old,
					      Eterm obj)
{
    HashDbTerm* ret;
    ASSERT(old != NULL);
    if (tb->common.compress) {
	ret = db_store_term_comp(&tb->common, &(old->dbterm), offsetof(HashDbTerm,dbterm), obj);
    }
    else {
	ret = db_store_term(&tb->common, &(old->dbterm), offsetof(HashDbTerm,dbterm), obj);
    }
    return ret;
}



/*
** External interface 
*/
DbTableMethod db_hash =
{
    db_create_hash,
    db_first_hash,
    db_next_hash,
    db_first_hash,   /* last == first  */
    db_next_hash,    /* prev == next   */
    db_put_hash,
    db_get_hash,
    db_get_element_hash,
    db_member_hash,
    db_erase_hash,
    db_erase_object_hash,
    db_slot_hash,
    db_select_chunk_hash,
    db_select_hash,
    db_select_delete_hash,
    db_select_continue_hash, /* hmm continue_hash? */
    db_select_delete_continue_hash,
    db_select_count_hash,
    db_select_count_continue_hash,
    db_select_replace_hash,
    db_select_replace_continue_hash,
    db_take_hash,
    db_delete_all_objects_hash,
    db_free_empty_table_hash,
    db_free_table_continue_hash,
    db_print_hash,
    db_foreach_offheap_hash,
    db_lookup_dbterm_hash,
    db_finalize_dbterm_hash
};

#ifdef DEBUG
/* Wait a while to provoke race and get code coverage */
static void DEBUG_WAIT(void)
{
    unsigned long spin = 1UL << 20;
    while (--spin);
}
#else
#  define DEBUG_WAIT()
#endif

/* Rare case of restoring the rest of the fixdel list
   when "unfixer" gets interrupted by "fixer" */ 
static void restore_fixdel(DbTableHash* tb, FixedDeletion* fixdel)
{
    /*int tries = 0;*/
    DEBUG_WAIT();
    if (erts_atomic_cmpxchg_relb(&tb->fixdel,
				     (erts_aint_t) fixdel,
				     (erts_aint_t) NULL) != (erts_aint_t) NULL) {
	/* Oboy, must join lists */    
	FixedDeletion* last = fixdel;
	erts_aint_t was_tail;
	erts_aint_t exp_tail;

	while (last->next != NULL) last = last->next;
	was_tail = erts_atomic_read_acqb(&tb->fixdel);
	do { /* Lockless atomic list insertion */
	    exp_tail = was_tail;
	    last->next = (FixedDeletion*) exp_tail;
	    /*++tries;*/
	    DEBUG_WAIT();
	    was_tail = erts_atomic_cmpxchg_relb(&tb->fixdel,
						    (erts_aint_t) fixdel,
						    exp_tail);
	}while (was_tail != exp_tail);
    }
    /*erts_fprintf(stderr,"erl_db_hash: restore_fixdel tries=%d\r\n", tries);*/
}
/*
** Table interface routines ie what's called by the bif's 
*/

SWord db_unfix_table_hash(DbTableHash *tb)
{
    FixedDeletion* fixdel;
    SWord work = 0;

    ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock)
                   || (erts_lc_rwmtx_is_rlocked(&tb->common.rwlock)
                       && !tb->common.is_thread_safe));
restart:
    fixdel = (FixedDeletion*) erts_atomic_xchg_mb(&tb->fixdel,
                                                  (erts_aint_t) NULL);
    while (fixdel) {
        FixedDeletion *free_me;

        do {
            HashDbTerm **bp;
            HashDbTerm *b;
            HashDbTerm *free_us = NULL;
            erts_rwmtx_t* lck;

            lck = WLOCK_HASH(tb, fixdel->slot);

            if (IS_FIXED(tb)) { /* interrupted by fixer */
                WUNLOCK_HASH(lck);
                restore_fixdel(tb,fixdel);
                if (!IS_FIXED(tb)) {
                    goto restart; /* unfixed again! */
                }
                return work;
            }
            if (fixdel->slot < NACTIVE(tb)) {
                bp = &BUCKET(tb, fixdel->slot);
                b = *bp;

                while (b != NULL) {
                    if (is_pseudo_deleted(b)) {
                        HashDbTerm* nxt = b->next;
                        b->next = free_us;
                        free_us = b;
                        work++;
                        b = *bp = nxt;
                    } else {
                        bp = &b->next;
                        b = b->next;
                    }
                }
            }
            /* else slot has been joined and purged by shrink() */
            WUNLOCK_HASH(lck);
            free_term_list(tb, free_us);

        }while (fixdel->all && fixdel->slot-- > 0);

        free_me = fixdel;
        fixdel = fixdel->next;
        free_fixdel(tb, free_me);
        work++;
    }

    /* ToDo: Maybe try grow/shrink the table as well */

    return work;
}

int db_create_hash(Process *p, DbTable *tbl)
{
    DbTableHash *tb = &tbl->hash;

    erts_atomic_init_nob(&tb->szm, FIRST_SEGSZ_MASK);
    erts_atomic_init_nob(&tb->nactive, FIRST_SEGSZ);
    erts_atomic_init_nob(&tb->fixdel, (erts_aint_t)NULL);
    erts_atomic_init_nob(&tb->segtab, (erts_aint_t)NULL);
    SET_SEGTAB(tb, tb->first_segtab);
    tb->nsegs = NSEG_1;
    tb->nslots = FIRST_SEGSZ;
    tb->first_segtab[0] = (struct segment*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
                                                          (DbTable *) tb,
                                                          SIZEOF_SEGMENT(FIRST_SEGSZ));
    sys_memset(tb->first_segtab[0], 0, SIZEOF_SEGMENT(FIRST_SEGSZ));

    erts_atomic_init_nob(&tb->is_resizing, 0);
    if (tb->common.type & DB_FINE_LOCKED) {
	erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
	int i;
	if (tb->common.type & DB_FREQ_READ)
	    rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
	if (erts_ets_rwmtx_spin_count >= 0)
	    rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
	tb->locks = (DbTableHashFineLocks*) erts_db_alloc(ERTS_ALC_T_DB_SEG, /* Other type maybe? */
                                                          (DbTable *) tb,
                                                          sizeof(DbTableHashFineLocks));
	for (i=0; i<DB_HASH_LOCK_CNT; ++i) {
            erts_rwmtx_init_opt(&tb->locks->lck_vec[i].lck, &rwmtx_opt,
                "db_hash_slot", tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB);
	}
	/* This important property is needed to guarantee the two buckets
    	 * involved in a grow/shrink operation it protected by the same lock:
	 */
	ASSERT(erts_atomic_read_nob(&tb->nactive) % DB_HASH_LOCK_CNT == 0);
    }
    else { /* coarse locking */
	tb->locks = NULL;
    }
    ERTS_THR_MEMORY_BARRIER;
    return DB_ERROR_NONE;
}

static int db_first_hash(Process *p, DbTable *tbl, Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    Uint ix = 0;
    erts_rwmtx_t* lck = RLOCK_HASH(tb,ix);
    HashDbTerm* list;

    list = BUCKET(tb,ix);
    list = next_live(tb, &ix, &lck, list);

    if (list != NULL) {
	*ret = db_copy_key(p, tbl, &list->dbterm);
	RUNLOCK_HASH(lck);
    }
    else {
	*ret = am_EOT;
    }
    return DB_ERROR_NONE;
}


static int db_next_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    HashValue hval;
    Uint ix;
    HashDbTerm* b;
    erts_rwmtx_t* lck;

    hval = MAKE_HASH(key);
    lck = RLOCK_HASH(tb,hval);
    ix = hash_to_ix(tb, hval);
    b = BUCKET(tb, ix);    

    for (;;) {
	if (b == NULL) {
	    RUNLOCK_HASH(lck);
	    return DB_ERROR_BADKEY;    
	}
	if (has_key(tb, b, key, hval)) {
	    break;
	}
	b = b->next;
    }
    /* Key found */

    b = next_live(tb, &ix, &lck, b->next);
    if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
	while (b != 0) {
	    if (!has_live_key(tb, b, key, hval)) {
		break;
	    }
	    b = next_live(tb, &ix, &lck, b->next);
	}
    }
    if (b == NULL) {
	*ret = am_EOT;
    }
    else {
	*ret = db_copy_key(p, tbl, &b->dbterm);
	RUNLOCK_HASH(lck);
    }    
    return DB_ERROR_NONE;
}    

int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail)
{
    DbTableHash *tb = &tbl->hash;
    HashValue hval;
    int ix;
    Eterm key;
    HashDbTerm** bp;
    HashDbTerm* b;
    HashDbTerm* q;
    erts_rwmtx_t* lck;
    int nitems;
    int ret = DB_ERROR_NONE;

    key = GETKEY(tb, tuple_val(obj));
    hval = MAKE_HASH(key);
    lck = WLOCK_HASH(tb, hval);
    ix = hash_to_ix(tb, hval);
    bp = &BUCKET(tb, ix);
    b = *bp;

    for (;;) {
	if (b == NULL) {
	    goto Lnew;
	}
	if (has_key(tb,b,key,hval)) {
	    break;
	}
	bp = &b->next;
	b = b->next;
    }
    /* Key found
    */
    if (tb->common.status & DB_SET) {
	HashDbTerm* bnext = b->next;
	if (is_pseudo_deleted(b)) {
	    erts_atomic_inc_nob(&tb->common.nitems);
            b->pseudo_deleted = 0;
	}
	else if (key_clash_fail) {
	    ret = DB_ERROR_BADKEY;
	    goto Ldone;
	}
	q = replace_dbterm(tb, b, obj);
	q->next = bnext;
	ASSERT(q->hvalue == hval);
	*bp = q;
	goto Ldone;
    }
    else if (key_clash_fail) { /* && (DB_BAG || DB_DUPLICATE_BAG) */
	q = b;
	do {
	    if (!is_pseudo_deleted(q)) {
		ret = DB_ERROR_BADKEY;
		goto Ldone;
	    }
	    q = q->next;
	}while (q != NULL && has_key(tb,q,key,hval)); 	
    }
    else if (tb->common.status & DB_BAG) {
	HashDbTerm** qp = bp;
	q = b;
	do {
	    if (db_eq(&tb->common,obj,&q->dbterm)) {
		if (is_pseudo_deleted(q)) {
		    erts_atomic_inc_nob(&tb->common.nitems);
                    q->pseudo_deleted = 0;
		    ASSERT(q->hvalue == hval);
		    if (q != b) { /* must move to preserve key insertion order */
			*qp = q->next;
			q->next = b;
			*bp = q;
		    }
		}
		goto Ldone;
	    }
	    qp = &q->next;
	    q = *qp;
	}while (q != NULL && has_key(tb,q,key,hval)); 
    }
    /*else DB_DUPLICATE_BAG */

Lnew:
    q = new_dbterm(tb, obj);
    q->hvalue = hval;
    q->pseudo_deleted = 0;
    q->next = b;
    *bp = q;
    nitems = erts_atomic_inc_read_nob(&tb->common.nitems);
    WUNLOCK_HASH(lck);
    {
	int nactive = NACTIVE(tb);       
	if (nitems > GROW_LIMIT(nactive) && !IS_FIXED(tb)) {
	    grow(tb, nitems);
	}
    }
    return DB_ERROR_NONE;

Ldone:
    WUNLOCK_HASH(lck);	
    return ret;
}

static Eterm
get_term_list(Process *p, DbTableHash *tb, Eterm key, HashValue hval,
              HashDbTerm *b1, HashDbTerm **bend)
{
    HashDbTerm* b2 = b1->next;
    Eterm copy;
    Uint sz = b1->dbterm.size + 2;

    if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
        while (b2 && has_key(tb, b2, key, hval)) {
	    if (!is_pseudo_deleted(b2))
		sz += b2->dbterm.size + 2;

            b2 = b2->next;
        }
    }
    copy = build_term_list(p, b1, b2, sz, tb);
    if (bend) {
        *bend = b2;
    }
    return copy;
}

int db_get_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    HashValue hval;
    int ix;
    HashDbTerm* b;
    erts_rwmtx_t* lck;

    hval = MAKE_HASH(key);
    lck = RLOCK_HASH(tb,hval);
    ix = hash_to_ix(tb, hval);
    b = BUCKET(tb, ix);

    while(b != 0) {
        if (has_live_key(tb, b, key, hval)) {
            *ret = get_term_list(p, tb, key, hval, b, NULL);
	    goto done;
	}
        b = b->next;
    }
    *ret = NIL;
done:
    RUNLOCK_HASH(lck);
    return DB_ERROR_NONE;
}
    
static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    HashValue hval;
    int ix;
    HashDbTerm* b1;
    erts_rwmtx_t* lck;

    hval = MAKE_HASH(key);
    ix = hash_to_ix(tb, hval);
    lck = RLOCK_HASH(tb, hval);
    b1 = BUCKET(tb, ix);

    while(b1 != 0) {
	if (has_live_key(tb,b1,key,hval)) {
	    *ret = am_true;
	    goto done;
	}
	b1 = b1->next;
    }
    *ret = am_false;
done:
    RUNLOCK_HASH(lck);
    return DB_ERROR_NONE;
}
    
static int db_get_element_hash(Process *p, DbTable *tbl, 
			       Eterm key,
			       int ndex, 
			       Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    HashValue hval;
    int ix;
    HashDbTerm* b1;
    erts_rwmtx_t* lck;
    int retval;
    
    hval = MAKE_HASH(key);
    lck = RLOCK_HASH(tb, hval);
    ix = hash_to_ix(tb, hval);
    b1 = BUCKET(tb, ix);


    while(b1 != 0) {
	if (has_live_key(tb,b1,key,hval)) {
	    if (ndex > arityval(b1->dbterm.tpl[0])) {
		retval = DB_ERROR_BADITEM;
		goto done;
	    }
	    if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
		HashDbTerm* b;
		HashDbTerm* b2 = b1->next;
		Eterm elem_list = NIL;

		while(b2 != NULL && has_key(tb,b2,key,hval)) {
		    if (ndex > arityval(b2->dbterm.tpl[0])
			&& !is_pseudo_deleted(b2)) {
			retval = DB_ERROR_BADITEM;
			goto done;
		    }
		    b2 = b2->next;
		}
		b = b1;
		while(b != b2) {
		    if (!is_pseudo_deleted(b)) {
			Eterm *hp;
			Eterm copy = db_copy_element_from_ets(&tb->common, p,
							      &b->dbterm, ndex, &hp, 2);
			elem_list = CONS(hp, copy, elem_list);
		    }
		    b = b->next;
		}
		*ret = elem_list;
	    }
	    else {
		Eterm* hp;
		*ret = db_copy_element_from_ets(&tb->common, p, &b1->dbterm, ndex, &hp, 0);
	    }
	    retval = DB_ERROR_NONE;
	    goto done;
	}
	b1 = b1->next;
    }
    retval = DB_ERROR_BADKEY;
done:
    RUNLOCK_HASH(lck);
    return retval;
}

/*
** NB, this is for the db_erase/2 bif.
*/
int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    HashValue hval;
    int ix;
    HashDbTerm** bp;
    HashDbTerm* b;
    HashDbTerm* free_us = NULL;
    erts_rwmtx_t* lck;
    int nitems_diff = 0;

    hval = MAKE_HASH(key);
    lck = WLOCK_HASH(tb,hval);
    ix = hash_to_ix(tb, hval);
    bp = &BUCKET(tb, ix);
    b = *bp;

    while(b != 0) {
	if (has_live_key(tb,b,key,hval)) {
	    --nitems_diff;
	    if (nitems_diff == -1 && IS_FIXED(tb)
                && add_fixed_deletion(tb, ix, 0)) {
		/* Pseudo remove (no need to keep several of same key) */
		b->pseudo_deleted = 1;
	    } else {
		HashDbTerm* next = b->next;
                b->next = free_us;
		free_us = b;
		b = *bp = next;
		continue;
	    }
	}
	else {
	    if (nitems_diff && !is_pseudo_deleted(b))
		break;
	}
	bp = &b->next;
	b = b->next;
    }
    WUNLOCK_HASH(lck);
    if (nitems_diff) {
	erts_atomic_add_nob(&tb->common.nitems, nitems_diff);
	try_shrink(tb);
    }
    free_term_list(tb, free_us);
    *ret = am_true;
    return DB_ERROR_NONE;
}    

/*
** This is for the ets:delete_object BIF
*/
static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    HashValue hval;
    int ix;
    HashDbTerm** bp;
    HashDbTerm* b;
    HashDbTerm* free_us = NULL;
    erts_rwmtx_t* lck;
    int nitems_diff = 0;
    int nkeys = 0;
    Eterm key;

    key = GETKEY(tb, tuple_val(object));
    hval = MAKE_HASH(key);
    lck = WLOCK_HASH(tb,hval);
    ix = hash_to_ix(tb, hval);
    bp = &BUCKET(tb, ix);
    b = *bp;

    while(b != 0) {
	if (has_live_key(tb,b,key,hval)) {
	    ++nkeys;
	    if (db_eq(&tb->common,object, &b->dbterm)) {
		--nitems_diff;
		if (nkeys==1 && IS_FIXED(tb) && add_fixed_deletion(tb,ix,0)) {
		    b->pseudo_deleted = 1;
		    bp = &b->next;
		    b = b->next;
		} else {
                    HashDbTerm* next = b->next;
		    b->next = free_us;
                    free_us = b;
		    b = *bp = next;
		}
		if (tb->common.status & (DB_DUPLICATE_BAG)) {
		    continue;
		} else {
		    break;
		}
	    }
	}
	else if (nitems_diff && !is_pseudo_deleted(b)) {
	    break;
	}
	bp = &b->next;
	b = b->next;
    }
    WUNLOCK_HASH(lck);
    if (nitems_diff) {
	erts_atomic_add_nob(&tb->common.nitems, nitems_diff);
	try_shrink(tb);
    }
    free_term_list(tb, free_us);
    *ret = am_true;
    return DB_ERROR_NONE;
}    


static int db_slot_hash(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    erts_rwmtx_t* lck;
    Sint slot;
    int retval;
    int nactive;

    if (is_not_small(slot_term) || ((slot = signed_val(slot_term)) < 0)) {
	return DB_ERROR_BADPARAM;
    }
    lck = RLOCK_HASH(tb, slot);
    nactive = NACTIVE(tb);
    if (slot < nactive) {
	*ret = build_term_list(p, BUCKET(tb, slot), NULL, 0, tb);
	retval = DB_ERROR_NONE;
    }
    else if (slot == nactive) {
	*ret = am_EOT;
	retval = DB_ERROR_NONE;
    }    
    else {
	retval = DB_ERROR_BADPARAM;		
    }    
    RUNLOCK_HASH(lck);
    return retval;
}


/*
 * Match traversal callbacks
 */

typedef struct match_callbacks_t_ match_callbacks_t;
struct match_callbacks_t_
{
/* Called when no match is possible.
 *      context_ptr: Pointer to context
 *      ret: Pointer to traversal function term return.
 *
 * Both the direct return value and 'ret' are used as the traversal function return values.
 */
    int (*on_nothing_can_match)(match_callbacks_t* ctx, Eterm* ret);

/* Called for each match result.
 *      context_ptr: Pointer to context
 *      slot_ix: Current slot index
 *      current_ptr_ptr: Triple pointer to either the bucket or the 'next' pointer in the previous element;
 *                       can be (carefully) used to adjust iteration when deleting or replacing elements.
 *      match_res: The result of running the match program against the current term.
 *
 * Should return 1 for successful match, 0 otherwise.
 */
    int (*on_match_res)(match_callbacks_t* ctx, Sint slot_ix,
                        HashDbTerm*** current_ptr_ptr, Eterm match_res);

/* Called when either we've matched enough elements in this cycle or EOT was reached.
 *      context_ptr: Pointer to context
 *      slot_ix: Current slot index
 *      got: How many elements have been matched so far
 *      iterations_left: Number of intended iterations (down from an initial max.) left in this traversal cycle
 *      mpp: Double pointer to the compiled match program
 *      ret: Pointer to traversal function term return.
 *
 * Both the direct return value and 'ret' are used as the traversal function return values.
 * If *mpp is set to NULL, it won't be deallocated (useful for trapping.)
 */
    int (*on_loop_ended)(match_callbacks_t* ctx, Sint slot_ix, Sint got,
                         Sint iterations_left, Binary** mpp, Eterm* ret);

/* Called when it's time to trap
 *      context_ptr: Pointer to context
 *      slot_ix: Current slot index
 *      got: How many elements have been matched so far
 *      mpp: Double pointer to the compiled match program
 *      ret: Pointer to traversal function term return.
 *
 * Both the direct return value and 'ret' are used as the traversal function return values.
 * If *mpp is set to NULL, it won't be deallocated (useful for trapping.)
 */
    int (*on_trap)(match_callbacks_t* ctx, Sint slot_ix, Sint got, Binary** mpp,
                   Eterm* ret);

};


/*
 * Begin hash table match traversal
 */
static int match_traverse(Process* p, DbTableHash* tb,
                          Eterm pattern,
                          extra_match_validator_t extra_match_validator, /* Optional */
                          Sint chunk_size,      /* If 0, no chunking */
                          Sint iterations_left, /* Nr. of iterations left */
                          Eterm** hpp,          /* Heap */
                          int lock_for_write,   /* Set to 1 if we're going to delete or
                                                   modify existing terms */
                          match_callbacks_t* ctx,
                          Eterm* ret)
{
    Sint slot_ix;                  /* Slot index */
    HashDbTerm** current_ptr;      /* Refers to either the bucket pointer or
                                    * the 'next' pointer in the previous term
                                    */
    HashDbTerm* saved_current;     /* Helper to avoid double skip on match */
    struct mp_info mpi;
    unsigned current_list_pos = 0; /* Prefound buckets list index */
    Eterm match_res;
    Sint got = 0;                  /* Matched terms counter */
    erts_rwmtx_t* lck;         /* Slot lock */
    int ret_value;
    erts_rwmtx_t* (*lock_hash_function)(DbTableHash*, HashValue)
        = (lock_for_write ? WLOCK_HASH : RLOCK_HASH);
    void (*unlock_hash_function)(erts_rwmtx_t*)
        = (lock_for_write ? WUNLOCK_HASH : RUNLOCK_HASH);
    Sint (*next_slot_function)(DbTableHash*, Uint, erts_rwmtx_t**)
        = (lock_for_write ? next_slot_w : next_slot);

    if ((ret_value = analyze_pattern(tb, pattern, extra_match_validator, &mpi))
            != DB_ERROR_NONE)
    {
        *ret = NIL;
        goto done;
    }

    if (!mpi.something_can_match) {
        /* Can't possibly match anything */
        ret_value = ctx->on_nothing_can_match(ctx, ret);
        goto done;
    }

    /*
     * Look for initial slot / bucket
     */
    if (!mpi.key_given) {
        /* Run this code if pattern is variable or GETKEY(pattern)  */
        /* is a variable                                            */
        slot_ix = 0;
        lck = lock_hash_function(tb,slot_ix);
        for (;;) {
            ASSERT(slot_ix < NACTIVE(tb));
            if (*(current_ptr = &BUCKET(tb,slot_ix)) != NULL) {
                break;
            }
            slot_ix = next_slot_function(tb,slot_ix,&lck);
            if (slot_ix == 0) {
                ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left,
                                               &mpi.mp, ret);
                goto done;
            }
        }
    } else {
        /* We have at least one */
        slot_ix = mpi.lists[current_list_pos].ix;
        lck = lock_hash_function(tb, slot_ix);
        current_ptr = mpi.lists[current_list_pos].bucket;
        ASSERT(*current_ptr == BUCKET(tb,slot_ix));
        ++current_list_pos;
    }

    /*
     * Execute traversal cycle
     */
    for(;;) {
        if (*current_ptr != NULL) {
            if (!is_pseudo_deleted(*current_ptr)) {
                match_res = db_match_dbterm(&tb->common, p, mpi.mp,
                                            &(*current_ptr)->dbterm, hpp, 2);
                saved_current = *current_ptr;
                if (ctx->on_match_res(ctx, slot_ix, &current_ptr, match_res)) {
                    ++got;
                }
                --iterations_left;
                if (*current_ptr != saved_current) {
                    /* Don't advance to next, the callback did it already */
                    continue;
                }
            }
            current_ptr = &((*current_ptr)->next);
        }
        else if (mpi.key_given) {  /* Key is bound */
            unlock_hash_function(lck);
            if (current_list_pos == mpi.num_lists) {
                ret_value = ctx->on_loop_ended(ctx, -1, got, iterations_left, &mpi.mp, ret);
                goto done;
            } else {
                slot_ix = mpi.lists[current_list_pos].ix;
                lck = lock_hash_function(tb, slot_ix);
                current_ptr = mpi.lists[current_list_pos].bucket;
                ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix));
                ++current_list_pos;
            }
        }
        else { /* Key is variable */
            if ((slot_ix = next_slot_function(tb,slot_ix,&lck)) == 0) {
                slot_ix = -1;
                break;
            }
            if (chunk_size && got >= chunk_size) {
                unlock_hash_function(lck);
                break;
            }
            if (iterations_left <= 0 || MBUF(p)) {
                /*
                 * We have either reached our limit, or just created some heap fragments.
                 * Since many heap fragments will make the GC slower, trap and GC now.
                 */
                unlock_hash_function(lck);
                ret_value = ctx->on_trap(ctx, slot_ix, got, &mpi.mp, ret);
                goto done;
            }
            current_ptr = &BUCKET(tb,slot_ix);
        }
    }

    ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, &mpi.mp, ret);

done:
    /* We should only jump directly to this label if
     * we've already called ctx->nothing_can_match / loop_ended / trap
     */
    if (mpi.mp != NULL) {
        erts_bin_free(mpi.mp);
    }
    if (mpi.lists != mpi.dlists) {
        erts_free(ERTS_ALC_T_DB_SEL_LIST,
                (void *) mpi.lists);
    }
    return ret_value;

}

/*
 * Continue hash table match traversal
 */
static int match_traverse_continue(Process* p, DbTableHash* tb,
                                   Sint chunk_size,      /* If 0, no chunking */
                                   Sint iterations_left, /* Nr. of iterations left */
                                   Eterm** hpp,          /* Heap */
                                   Sint slot_ix,         /* Slot index to resume traversal from */
                                   Sint got,             /* Matched terms counter */
                                   Binary** mpp,         /* Existing match program */
                                   int lock_for_write,   /* Set to 1 if we're going to delete or
                                                            modify existing terms */
                                   match_callbacks_t* ctx,
                                   Eterm* ret)
{
    HashDbTerm** current_ptr;  /* Refers to either the bucket pointer or
                                       * the 'next' pointer in the previous term
                                       */
    HashDbTerm* saved_current; /* Helper to avoid double skip on match */
    Eterm match_res;
    erts_rwmtx_t* lck;
    int ret_value;
    erts_rwmtx_t* (*lock_hash_function)(DbTableHash*, HashValue)
        = (lock_for_write ? WLOCK_HASH : RLOCK_HASH);
    void (*unlock_hash_function)(erts_rwmtx_t*)
        = (lock_for_write ? WUNLOCK_HASH : RUNLOCK_HASH);
    Sint (*next_slot_function)(DbTableHash* tb, Uint ix, erts_rwmtx_t** lck_ptr)
        = (lock_for_write ? next_slot_w : next_slot);

    if (got < 0) {
        *ret = NIL;
        return DB_ERROR_BADPARAM;
    }

    if (slot_ix < 0 /* EOT */
       || (chunk_size && got >= chunk_size))
    {
        /* Already got all or enough in the match_list */
        ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, mpp, ret);
        goto done;
    }

    lck = lock_hash_function(tb, slot_ix);
    if (slot_ix >= NACTIVE(tb)) { /* Is this possible? */
        unlock_hash_function(lck);
        *ret = NIL;
        ret_value = DB_ERROR_BADPARAM;
        goto done;
    }

    /*
     * Resume traversal cycle from where we left
     */
    current_ptr = &BUCKET(tb,slot_ix);
    for(;;) {
        if (*current_ptr != NULL) {
            if (!is_pseudo_deleted(*current_ptr)) {
                match_res = db_match_dbterm(&tb->common, p, *mpp,
                                            &(*current_ptr)->dbterm, hpp, 2);
                saved_current = *current_ptr;
                if (ctx->on_match_res(ctx, slot_ix, &current_ptr, match_res)) {
                    ++got;
                }
                --iterations_left;
                if (*current_ptr != saved_current) {
                    /* Don't advance to next, the callback did it already */
                    continue;
                }
            }
            current_ptr = &((*current_ptr)->next);
        }
        else {
            if ((slot_ix=next_slot_function(tb,slot_ix,&lck)) == 0) {
                slot_ix = -1;
                break;
            }
            if (chunk_size && got >= chunk_size) {
                unlock_hash_function(lck);
                break;
            }
            if (iterations_left <= 0 || MBUF(p)) {
                /*
                 * We have either reached our limit, or just created some heap fragments.
                 * Since many heap fragments will make the GC slower, trap and GC now.
                 */
                unlock_hash_function(lck);
                ret_value = ctx->on_trap(ctx, slot_ix, got, mpp, ret);
                goto done;
            }
            current_ptr = &BUCKET(tb,slot_ix);
        }
    }

    ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, mpp, ret);

done:
    /* We should only jump directly to this label if
     * we've already called on_loop_ended / on_trap
     */
    return ret_value;

}


/*
 * Common traversal trapping/continuation code;
 * used by select_count, select_delete and select_replace,
 * as well as their continuation-handling counterparts.
 */

static ERTS_INLINE int on_simple_trap(Export* trap_function,
                                                 Process* p,
                                                 DbTableHash* tb,
                                                 Eterm tid,
                                                 Eterm* prev_continuation_tptr,
                                                 Sint slot_ix,
                                                 Sint got,
                                                 Binary** mpp,
                                                 Eterm* ret)
{
    Eterm* hp;
    Eterm egot;
    Eterm mpb;
    Eterm continuation;
    int is_first_trap = (prev_continuation_tptr == NULL);
    size_t base_halloc_sz = (is_first_trap ? ERTS_MAGIC_REF_THING_SIZE : 0);

    BUMP_ALL_REDS(p);
    if (IS_USMALL(0, got)) {
	hp = HAllocX(p,  base_halloc_sz + 5, ERTS_MAGIC_REF_THING_SIZE);
	egot = make_small(got);
    }
    else {
	hp = HAllocX(p, base_halloc_sz + BIG_UINT_HEAP_SIZE + 5,
                     ERTS_MAGIC_REF_THING_SIZE);
	egot = uint_to_big(got, hp);
	hp += BIG_UINT_HEAP_SIZE;
    }

    if (is_first_trap) {
        if (is_atom(tid))
            tid = erts_db_make_tid(p, &tb->common);
        mpb = erts_db_make_match_prog_ref(p, *mpp, &hp);
        *mpp = NULL; /* otherwise the caller will destroy it */
    }
    else {
        ASSERT(!is_atom(tid));
        mpb = prev_continuation_tptr[3];
    }

    continuation = TUPLE4(
            hp,
            tid,
            make_small(slot_ix),
            mpb,
            egot);
    ERTS_BIF_PREP_TRAP1(*ret, trap_function, p, continuation);
    return DB_ERROR_NONE;
}

static ERTS_INLINE int unpack_simple_continuation(Eterm continuation,
                                                  Eterm** tptr_ptr,
                                                  Eterm* tid_ptr,
                                                  Sint* slot_ix_p,
                                                  Binary** mpp,
                                                  Sint* got_p)
{
    Eterm* tptr;
    ASSERT(is_tuple(continuation));
    tptr = tuple_val(continuation);
    if (arityval(*tptr) != 4)
        return 1;

    if (! is_small(tptr[2]) || !(is_big(tptr[4]) || is_small(tptr[4]))) {
        return 1;
    }

    *tptr_ptr = tptr;
    *tid_ptr = tptr[1];
    *slot_ix_p = unsigned_val(tptr[2]);
    *mpp = erts_db_get_match_prog_binary_unchecked(tptr[3]);
    if (is_big(tptr[4])) {
        *got_p = big_to_uint32(tptr[4]);
    }
    else {
        *got_p = unsigned_val(tptr[4]);
    }
    return 0;
}


/*
 *
 * select / select_chunk match traversal
 *
 */

#define MAX_SELECT_CHUNK_ITERATIONS 1000

typedef struct {
    match_callbacks_t base;
    Process* p;
    DbTableHash* tb;
    Eterm tid;
    Eterm* hp;
    Sint chunk_size;
    Eterm match_list;
    Eterm* prev_continuation_tptr;
} select_chunk_context_t;

static int select_chunk_on_nothing_can_match(match_callbacks_t* ctx_base, Eterm* ret)
{
    select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
    *ret = (ctx->chunk_size > 0 ? am_EOT : NIL);
    return DB_ERROR_NONE;
}

static int select_chunk_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
                                     HashDbTerm*** current_ptr_ptr,
                                     Eterm match_res)
{
    select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
    if (is_value(match_res)) {
        ctx->match_list = CONS(ctx->hp, match_res, ctx->match_list);
        return 1;
    }
    return 0;
}

static int select_chunk_on_loop_ended(match_callbacks_t* ctx_base,
                                      Sint slot_ix, Sint got,
                                      Sint iterations_left, Binary** mpp,
                                      Eterm* ret)
{
    select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
    Eterm mpb;

    if (iterations_left == MAX_SELECT_CHUNK_ITERATIONS) {
        /* We didn't get to iterate a single time, which means EOT */
        ASSERT(ctx->match_list == NIL);
        *ret = (ctx->chunk_size > 0 ? am_EOT : NIL);
        return DB_ERROR_NONE;
    }
    else {
        ASSERT(iterations_left < MAX_SELECT_CHUNK_ITERATIONS);
        BUMP_REDS(ctx->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left);
        if (ctx->chunk_size) {
            Eterm continuation;
            Eterm rest = NIL;
            Sint rest_size = 0;

            if (got > ctx->chunk_size) { /* Split list in return value and 'rest' */
                Eterm tmp = ctx->match_list;
                rest = ctx->match_list;
                while (got-- > ctx->chunk_size + 1) {
                    tmp = CDR(list_val(tmp));
                    ++rest_size;
                }
                ++rest_size;
                ctx->match_list = CDR(list_val(tmp));
                CDR(list_val(tmp)) = NIL; /* Destructive, the list has never
                                             been in 'user space' */
            }
            if (rest != NIL || slot_ix >= 0) { /* Need more calls */
                Eterm tid = ctx->tid;
                ctx->hp = HAllocX(ctx->p,
                                  3 + 7 + ERTS_MAGIC_REF_THING_SIZE,
                                  ERTS_MAGIC_REF_THING_SIZE);
                mpb = erts_db_make_match_prog_ref(ctx->p, *mpp, &ctx->hp);
                if (is_atom(tid))
                    tid = erts_db_make_tid(ctx->p,
                                           &ctx->tb->common);
                continuation = TUPLE6(
                        ctx->hp,
                        tid,
                        make_small(slot_ix),
                        make_small(ctx->chunk_size),
                        mpb, rest,
                        make_small(rest_size));
                *mpp = NULL; /* Otherwise the caller will destroy it */
                ctx->hp += 7;
                *ret = TUPLE2(ctx->hp, ctx->match_list, continuation);
                return DB_ERROR_NONE;
            } else { /* All data is exhausted */
                if (ctx->match_list != NIL) { /* No more data to search but still a
                                                            result to return to the caller */
                    ctx->hp = HAlloc(ctx->p, 3);
                    *ret = TUPLE2(ctx->hp, ctx->match_list, am_EOT);
                    return DB_ERROR_NONE;
                } else { /* Reached the end of the ttable with no data to return */
                    *ret = am_EOT;
                    return DB_ERROR_NONE;
                }
            }
        }
        *ret = ctx->match_list;
        return DB_ERROR_NONE;
    }
}

static int select_chunk_on_trap(match_callbacks_t* ctx_base,
                                Sint slot_ix, Sint got,
                                Binary** mpp, Eterm* ret)
{
    select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
    Eterm mpb;
    Eterm continuation;
    Eterm* hp;

    BUMP_ALL_REDS(ctx->p);

    if (ctx->prev_continuation_tptr == NULL) {
        Eterm tid = ctx->tid;
        /* First time we're trapping */
        hp = HAllocX(ctx->p, 7 + ERTS_MAGIC_REF_THING_SIZE,
                     ERTS_MAGIC_REF_THING_SIZE);
        if (is_atom(tid))
            tid = erts_db_make_tid(ctx->p, &ctx->tb->common);
        mpb = erts_db_make_match_prog_ref(ctx->p, *mpp, &hp);
        continuation = TUPLE6(
                hp,
                tid,
                make_small(slot_ix),
                make_small(ctx->chunk_size),
                mpb,
                ctx->match_list,
                make_small(got));
        *mpp = NULL; /* otherwise the caller will destroy it */
    }
    else {
        /* Not the first time we're trapping; reuse continuation terms */
        hp = HAlloc(ctx->p, 7);
        continuation = TUPLE6(
                hp,
                ctx->prev_continuation_tptr[1],
                make_small(slot_ix),
                ctx->prev_continuation_tptr[3],
                ctx->prev_continuation_tptr[4],
                ctx->match_list,
                make_small(got));
    }
    ERTS_BIF_PREP_TRAP1(*ret, &ets_select_continue_exp, ctx->p,
                        continuation);
    return DB_ERROR_NONE;
}

static int db_select_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern,
                          int reverse, Eterm *ret)
{
    return db_select_chunk_hash(p, tbl, tid, pattern, 0, reverse, ret);
}

static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid,
                                Eterm pattern, Sint chunk_size,
                                int reverse, Eterm *ret)
{
    select_chunk_context_t ctx;

    ctx.base.on_nothing_can_match = select_chunk_on_nothing_can_match;
    ctx.base.on_match_res         = select_chunk_on_match_res;
    ctx.base.on_loop_ended        = select_chunk_on_loop_ended;
    ctx.base.on_trap              = select_chunk_on_trap,
    ctx.p = p;
    ctx.tb = &tbl->hash;
    ctx.tid = tid;
    ctx.hp = NULL;
    ctx.chunk_size = chunk_size;
    ctx.match_list = NIL;
    ctx.prev_continuation_tptr = NULL;

    return match_traverse(
            ctx.p, ctx.tb,
            pattern, NULL,
            ctx.chunk_size,
            MAX_SELECT_CHUNK_ITERATIONS,
            &ctx.hp, 0,
            &ctx.base, ret);
}

/*
 *
 * select_continue match traversal
 *
 */

static
int select_chunk_continue_on_loop_ended(match_callbacks_t* ctx_base,
                                        Sint slot_ix, Sint got,
                                        Sint iterations_left, Binary** mpp,
                                        Eterm* ret)
{
    select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base;
    Eterm continuation;
    Eterm rest = NIL;
    Eterm* hp;

    ASSERT(iterations_left <= MAX_SELECT_CHUNK_ITERATIONS);
    BUMP_REDS(ctx->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left);
    if (ctx->chunk_size) {
        Sint rest_size = 0;
        if (got > ctx->chunk_size) {
            /* Cannot write destructively here,
               the list may have
               been in user space */
            hp = HAlloc(ctx->p, (got - ctx->chunk_size) * 2);
            while (got-- > ctx->chunk_size) {
                rest = CONS(hp, CAR(list_val(ctx->match_list)), rest);
                hp += 2;
                ctx->match_list = CDR(list_val(ctx->match_list));
                ++rest_size;
            }
        }
        if (rest != NIL || slot_ix >= 0) {
            hp = HAlloc(ctx->p, 3 + 7);
            continuation = TUPLE6(
                    hp,
                    ctx->prev_continuation_tptr[1],
                    make_small(slot_ix),
                    ctx->prev_continuation_tptr[3],
                    ctx->prev_continuation_tptr[4],
                    rest,
                    make_small(rest_size));
            hp += 7;
            *ret = TUPLE2(hp, ctx->match_list, continuation);
            return DB_ERROR_NONE;
        } else {
            if (ctx->match_list != NIL) {
                hp = HAlloc(ctx->p, 3);
                *ret = TUPLE2(hp, ctx->match_list, am_EOT);
                return DB_ERROR_NONE;
            } else {
                *ret = am_EOT;
                return DB_ERROR_NONE;
            }
        }
    }
    *ret = ctx->match_list;
    return DB_ERROR_NONE;
}

/*
 * This is called when select traps
 */
static int db_select_continue_hash(Process* p, DbTable* tbl, Eterm continuation,
                                   Eterm* ret)
{
    select_chunk_context_t ctx;
    Eterm* tptr;
    Eterm tid;
    Binary* mp;
    Sint got;
    Sint slot_ix;
    Sint chunk_size;
    Eterm match_list;
    Sint iterations_left = MAX_SELECT_CHUNK_ITERATIONS;

    /* Decode continuation. We know it's a tuple but not the arity or anything else */
    ASSERT(is_tuple(continuation));
    tptr = tuple_val(continuation);

    if (arityval(*tptr) != 6)
        goto badparam;

    if (!is_small(tptr[2]) || !is_small(tptr[3]) ||
            !(is_list(tptr[5]) || tptr[5] == NIL) || !is_small(tptr[6]))
        goto badparam;
    if ((chunk_size = signed_val(tptr[3])) < 0)
        goto badparam;

    mp = erts_db_get_match_prog_binary(tptr[4]);
    if (mp == NULL)
        goto badparam;

    if ((got = signed_val(tptr[6])) < 0)
        goto badparam;

    tid = tptr[1];
    slot_ix = signed_val(tptr[2]);
    match_list = tptr[5];

    /* Proceed */
    ctx.base.on_match_res  = select_chunk_on_match_res;
    ctx.base.on_loop_ended = select_chunk_continue_on_loop_ended;
    ctx.base.on_trap       = select_chunk_on_trap;
    ctx.p = p;
    ctx.tb = &tbl->hash;
    ctx.tid = tid;
    ctx.hp = NULL;
    ctx.chunk_size = chunk_size;
    ctx.match_list = match_list;
    ctx.prev_continuation_tptr = tptr;

    return match_traverse_continue(
            ctx.p, ctx.tb, ctx.chunk_size,
            iterations_left, &ctx.hp, slot_ix, got, &mp, 0,
            &ctx.base, ret);

badparam:
    *ret = NIL;
    return DB_ERROR_BADPARAM;
}

#undef MAX_SELECT_CHUNK_ITERATIONS


/*
 *
 * select_count match traversal
 *
 */

#define MAX_SELECT_COUNT_ITERATIONS 1000

typedef struct {
    match_callbacks_t base;
    Process* p;
    DbTableHash* tb;
    Eterm tid;
    Eterm* prev_continuation_tptr;
} select_count_context_t;

static int select_count_on_nothing_can_match(match_callbacks_t* ctx_base,
                                             Eterm* ret)
{
    *ret = make_small(0);
    return DB_ERROR_NONE;
}

static int select_count_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
                                     HashDbTerm*** current_ptr_ptr,
                                     Eterm match_res)
{
    return (match_res == am_true);
}

static int select_count_on_loop_ended(match_callbacks_t* ctx_base,
                                      Sint slot_ix, Sint got,
                                      Sint iterations_left, Binary** mpp,
                                      Eterm* ret)
{
    select_count_context_t* ctx = (select_count_context_t*) ctx_base;
    ASSERT(iterations_left <= MAX_SELECT_COUNT_ITERATIONS);
    BUMP_REDS(ctx->p, MAX_SELECT_COUNT_ITERATIONS - iterations_left);
    *ret = erts_make_integer(got, ctx->p);
    return DB_ERROR_NONE;
}

static int select_count_on_trap(match_callbacks_t* ctx_base,
                                Sint slot_ix, Sint got,
                                Binary** mpp, Eterm* ret)
{
    select_count_context_t* ctx = (select_count_context_t*) ctx_base;
    return on_simple_trap(
            &ets_select_count_continue_exp,
            ctx->p,
            ctx->tb,
            ctx->tid,
            ctx->prev_continuation_tptr,
            slot_ix, got, mpp, ret);
}

static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid,
                                Eterm pattern, Eterm *ret)
{
    select_count_context_t ctx;
    Sint iterations_left = MAX_SELECT_COUNT_ITERATIONS;
    Sint chunk_size = 0;

    ctx.base.on_nothing_can_match = select_count_on_nothing_can_match;
    ctx.base.on_match_res         = select_count_on_match_res;
    ctx.base.on_loop_ended        = select_count_on_loop_ended;
    ctx.base.on_trap              = select_count_on_trap;
    ctx.p = p;
    ctx.tb = &tbl->hash;
    ctx.tid = tid;
    ctx.prev_continuation_tptr = NULL;

    return match_traverse(
            ctx.p, ctx.tb,
            pattern, NULL,
            chunk_size, iterations_left, NULL, 0,
            &ctx.base, ret);
}

/*
 * This is called when select_count traps
 */
static int db_select_count_continue_hash(Process* p, DbTable* tbl,
                                         Eterm continuation, Eterm* ret)
{
    select_count_context_t ctx;
    Eterm* tptr;
    Eterm tid;
    Binary* mp;
    Sint got;
    Sint slot_ix;
    Sint chunk_size = 0;
    *ret = NIL;

    if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
        *ret = NIL;
        return DB_ERROR_BADPARAM;
    }

    ctx.base.on_match_res  = select_count_on_match_res;
    ctx.base.on_loop_ended = select_count_on_loop_ended;
    ctx.base.on_trap       = select_count_on_trap;
    ctx.p = p;
    ctx.tb = &tbl->hash;
    ctx.tid = tid;
    ctx.prev_continuation_tptr = tptr;

    return match_traverse_continue(
            ctx.p, ctx.tb, chunk_size,
            MAX_SELECT_COUNT_ITERATIONS,
            NULL, slot_ix, got, &mp, 0,
            &ctx.base, ret);
}

#undef MAX_SELECT_COUNT_ITERATIONS


/*
 *
 * select_delete match traversal
 *
 */

#define MAX_SELECT_DELETE_ITERATIONS 1000

typedef struct {
    match_callbacks_t base;
    Process* p;
    DbTableHash* tb;
    Eterm tid;
    Eterm* prev_continuation_tptr;
    erts_aint_t fixated_by_me;
    Uint last_pseudo_delete;
    HashDbTerm* free_us;
} select_delete_context_t;

static int select_delete_on_nothing_can_match(match_callbacks_t* ctx_base,
                                              Eterm* ret)
{
    *ret = make_small(0);
    return DB_ERROR_NONE;
}

static int select_delete_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
                                      HashDbTerm*** current_ptr_ptr,
                                      Eterm match_res)
{
    HashDbTerm** current_ptr = *current_ptr_ptr;
    select_delete_context_t* ctx = (select_delete_context_t*) ctx_base;
    HashDbTerm* del;
    if (match_res != am_true)
        return 0;

    if (NFIXED(ctx->tb) > ctx->fixated_by_me) { /* fixated by others? */
        if (slot_ix != ctx->last_pseudo_delete) {
            if (!add_fixed_deletion(ctx->tb, slot_ix, ctx->fixated_by_me))
                goto do_erase;
            ctx->last_pseudo_delete = slot_ix;
        }
        (*current_ptr)->pseudo_deleted = 1;
    }
    else {
    do_erase:
        del = *current_ptr;
        *current_ptr = (*current_ptr)->next; // replace pointer to term using next
        del->next = ctx->free_us;
        ctx->free_us = del;
    }
    erts_atomic_dec_nob(&ctx->tb->common.nitems);

    return 1;
}

static int select_delete_on_loop_ended(match_callbacks_t* ctx_base,
                                       Sint slot_ix, Sint got,
                                       Sint iterations_left, Binary** mpp,
                                       Eterm* ret)
{
    select_delete_context_t* ctx = (select_delete_context_t*) ctx_base;
    free_term_list(ctx->tb, ctx->free_us);
    ctx->free_us = NULL;
    ASSERT(iterations_left <= MAX_SELECT_DELETE_ITERATIONS);
    BUMP_REDS(ctx->p, MAX_SELECT_DELETE_ITERATIONS - iterations_left);
    if (got) {
	try_shrink(ctx->tb);
    }
    *ret = erts_make_integer(got, ctx->p);
    return DB_ERROR_NONE;
}

static int select_delete_on_trap(match_callbacks_t* ctx_base,
                                 Sint slot_ix, Sint got,
                                 Binary** mpp, Eterm* ret)
{
    select_delete_context_t* ctx = (select_delete_context_t*) ctx_base;
    free_term_list(ctx->tb, ctx->free_us);
    ctx->free_us = NULL;
    return on_simple_trap(
            &ets_select_delete_continue_exp,
            ctx->p,
            ctx->tb,
            ctx->tid,
            ctx->prev_continuation_tptr,
            slot_ix, got, mpp, ret);
}

static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid,
                                 Eterm pattern, Eterm *ret)
{
    select_delete_context_t ctx;
    Sint chunk_size = 0;

    ctx.base.on_nothing_can_match = select_delete_on_nothing_can_match;
    ctx.base.on_match_res         = select_delete_on_match_res;
    ctx.base.on_loop_ended        = select_delete_on_loop_ended;
    ctx.base.on_trap              = select_delete_on_trap;
    ctx.p = p;
    ctx.tb = &tbl->hash;
    ctx.tid = tid;
    ctx.prev_continuation_tptr = NULL;
    ctx.fixated_by_me = ctx.tb->common.is_thread_safe ? 0 : 1; /* TODO: something nicer */
    ctx.last_pseudo_delete = (Uint) -1;
    ctx.free_us = NULL;

    return match_traverse(
            ctx.p, ctx.tb,
            pattern, NULL,
            chunk_size,
            MAX_SELECT_DELETE_ITERATIONS, NULL, 1,
            &ctx.base, ret);
}

/*
 * This is called when select_delete traps
 */
static int db_select_delete_continue_hash(Process* p, DbTable* tbl,
                                          Eterm continuation, Eterm* ret)
{
    select_delete_context_t ctx;
    Eterm* tptr;
    Eterm tid;
    Binary* mp;
    Sint got;
    Sint slot_ix;
    Sint chunk_size = 0;

    if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
        *ret = NIL;
        return DB_ERROR_BADPARAM;
    }

    ctx.base.on_match_res  = select_delete_on_match_res;
    ctx.base.on_loop_ended = select_delete_on_loop_ended;
    ctx.base.on_trap       = select_delete_on_trap;
    ctx.p = p;
    ctx.tb = &tbl->hash;
    ctx.tid = tid;
    ctx.prev_continuation_tptr = tptr;
    ctx.fixated_by_me = ONLY_WRITER(p, ctx.tb) ? 0 : 1; /* TODO: something nicer */
    ctx.last_pseudo_delete = (Uint) -1;
    ctx.free_us = NULL;

    return match_traverse_continue(
            ctx.p, ctx.tb, chunk_size,
            MAX_SELECT_DELETE_ITERATIONS,
            NULL, slot_ix, got, &mp, 1,
            &ctx.base, ret);
}

#undef MAX_SELECT_DELETE_ITERATIONS


/*
 *
 * select_replace match traversal
 *
 */

#define MAX_SELECT_REPLACE_ITERATIONS 1000

typedef struct {
    match_callbacks_t base;
    Process* p;
    DbTableHash* tb;
    Eterm tid;
    Eterm* prev_continuation_tptr;
} select_replace_context_t;

static int select_replace_on_nothing_can_match(match_callbacks_t* ctx_base,
                                               Eterm* ret)
{
    *ret = make_small(0);
    return DB_ERROR_NONE;
}

static int select_replace_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix,
                                       HashDbTerm*** current_ptr_ptr,
                                       Eterm match_res)
{
    select_replace_context_t* ctx = (select_replace_context_t*) ctx_base;
    DbTableHash* tb = ctx->tb;
    HashDbTerm* new;
    HashDbTerm* next;
    HashValue hval;

    if (is_value(match_res)) {
#ifdef DEBUG
        Eterm key = db_getkey(tb->common.keypos, match_res);
        ASSERT(is_value(key));
        ASSERT(eq(key, GETKEY(tb, (**current_ptr_ptr)->dbterm.tpl)));
#endif
        next = (**current_ptr_ptr)->next;
        hval = (**current_ptr_ptr)->hvalue;
        new = new_dbterm(tb, match_res);
        new->next = next;
        new->hvalue = hval;
        new->pseudo_deleted = 0;
        free_term(tb, **current_ptr_ptr);
        **current_ptr_ptr = new; /* replace 'next' pointer in previous object */
        *current_ptr_ptr = &((**current_ptr_ptr)->next); /* advance to next object */
        return 1;
    }
    return 0;
}

static int select_replace_on_loop_ended(match_callbacks_t* ctx_base, Sint slot_ix,
                                        Sint got, Sint iterations_left,
                                        Binary** mpp, Eterm* ret)
{
    select_replace_context_t* ctx = (select_replace_context_t*) ctx_base;
    ASSERT(iterations_left <= MAX_SELECT_REPLACE_ITERATIONS);
    /* the more objects we've replaced, the more reductions we've consumed */
    BUMP_REDS(ctx->p,
              MIN(MAX_SELECT_REPLACE_ITERATIONS * 2,
                  (MAX_SELECT_REPLACE_ITERATIONS - iterations_left) + (int)got));
    *ret = erts_make_integer(got, ctx->p);
    return DB_ERROR_NONE;
}

static int select_replace_on_trap(match_callbacks_t* ctx_base,
                                  Sint slot_ix, Sint got,
                                  Binary** mpp, Eterm* ret)
{
    select_replace_context_t* ctx = (select_replace_context_t*) ctx_base;
    return on_simple_trap(
            &ets_select_replace_continue_exp,
            ctx->p,
            ctx->tb,
            ctx->tid,
            ctx->prev_continuation_tptr,
            slot_ix, got, mpp, ret);
}

static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret)
{
    select_replace_context_t ctx;
    Sint chunk_size = 0;

    /* Bag implementation presented both semantic consistency and performance issues,
     * unsupported for now
     */
    ASSERT(!(tbl->hash.common.status & DB_BAG));

    ctx.base.on_nothing_can_match = select_replace_on_nothing_can_match;
    ctx.base.on_match_res         = select_replace_on_match_res;
    ctx.base.on_loop_ended        = select_replace_on_loop_ended;
    ctx.base.on_trap              = select_replace_on_trap;
    ctx.p = p;
    ctx.tb = &tbl->hash;
    ctx.tid = tid;
    ctx.prev_continuation_tptr = NULL;

    return match_traverse(
            ctx.p, ctx.tb,
            pattern, db_match_keeps_key,
            chunk_size,
            MAX_SELECT_REPLACE_ITERATIONS, NULL, 1,
            &ctx.base, ret);
}

/*
 * This is called when select_replace traps
 */
static int db_select_replace_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret)
{
    select_replace_context_t ctx;
    Eterm* tptr;
    Eterm tid ;
    Binary* mp;
    Sint got;
    Sint slot_ix;
    Sint chunk_size = 0;
    *ret = NIL;

    if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
        *ret = NIL;
        return DB_ERROR_BADPARAM;
    }

    /* Proceed */
    ctx.base.on_match_res  = select_replace_on_match_res;
    ctx.base.on_loop_ended = select_replace_on_loop_ended;
    ctx.base.on_trap       = select_replace_on_trap;
    ctx.p = p;
    ctx.tb = &tbl->hash;
    ctx.tid = tid;
    ctx.prev_continuation_tptr = tptr;

    return match_traverse_continue(
            ctx.p, ctx.tb, chunk_size,
            MAX_SELECT_REPLACE_ITERATIONS,
            NULL, slot_ix, got, &mp, 1,
            &ctx.base, ret);
}


static int db_take_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    HashDbTerm **bp, *b;
    HashDbTerm *free_us = NULL;
    HashValue hval = MAKE_HASH(key);
    erts_rwmtx_t *lck = WLOCK_HASH(tb, hval);
    int ix = hash_to_ix(tb, hval);
    int nitems_diff = 0;

    *ret = NIL;
    for (bp = &BUCKET(tb, ix), b = *bp; b; bp = &b->next, b = b->next) {
        if (has_live_key(tb, b, key, hval)) {
            HashDbTerm *bend;

            *ret = get_term_list(p, tb, key, hval, b, &bend);
            while (b != bend) {
                --nitems_diff;
                if (nitems_diff == -1 && IS_FIXED(tb)
                    && add_fixed_deletion(tb, ix, 0)) {
                    /* Pseudo remove (no need to keep several of same key) */
                    bp = &b->next;
                    b->pseudo_deleted = 1;
                    b = b->next;
                } else {
                    HashDbTerm* next = b->next;
                    b->next = free_us;
                    free_us = b;
                    b = *bp = next;
                }
            }
            break;
        }
    }
    WUNLOCK_HASH(lck);
    if (nitems_diff) {
        erts_atomic_add_nob(&tb->common.nitems, nitems_diff);
        try_shrink(tb);
    }
    free_term_list(tb, free_us);
    return DB_ERROR_NONE;
}


/*
** Other interface routines (not directly coupled to one bif)
*/

void db_initialize_hash(void)
{
}


static SWord db_mark_all_deleted_hash(DbTable *tbl, SWord reds)
{
    const int LOOPS_PER_REDUCTION  = 8;
    DbTableHash *tb = &tbl->hash;
    FixedDeletion* fixdel;
    SWord loops = reds * LOOPS_PER_REDUCTION;
    int i;

    ERTS_LC_ASSERT(IS_TAB_WLOCKED(tb));

    fixdel = (FixedDeletion*) erts_atomic_read_nob(&tb->fixdel);
    if (fixdel && fixdel->trap) {
        /* Continue after trap */
        ASSERT(fixdel->all);
        ASSERT(fixdel->slot < NACTIVE(tb));
        i = fixdel->slot;
    }
    else {
        /* First call */
        int ok;
        fixdel = alloc_fixdel(tb);
        ok = link_fixdel(tb, fixdel, 0);
        ASSERT(ok); (void)ok;
        i = 0;
    }

    do {
        HashDbTerm* b;
	for (b = BUCKET(tb,i); b; b = b->next)
            b->pseudo_deleted = 1;
    } while (++i < NACTIVE(tb) && --loops > 0);

    if (i < NACTIVE(tb)) {
         /* Yield */
        fixdel->slot = i;
        fixdel->all = 0;
        fixdel->trap = 1;
        return -1;
    }

    fixdel->slot = NACTIVE(tb) - 1;
    fixdel->all = 1;
    fixdel->trap = 0;
    erts_atomic_set_nob(&tb->common.nitems, 0);
    return loops < 0 ? 0 : loops / LOOPS_PER_REDUCTION;
}


/* Display hash table contents (for dump) */
static void db_print_hash(fmtfn_t to, void *to_arg, int show, DbTable *tbl)
{
    DbTableHash *tb = &tbl->hash;
    DbHashStats stats;
    int i;

    erts_print(to, to_arg, "Buckets: %d\n", NACTIVE(tb));

    i = tbl->common.is_thread_safe;
    /* If crash dumping we set table to thread safe in order to
       avoid taking any locks */
    if (ERTS_IS_CRASH_DUMPING)
        tbl->common.is_thread_safe = 1;

    db_calc_stats_hash(&tbl->hash, &stats);

    tbl->common.is_thread_safe = i;

    erts_print(to, to_arg, "Chain Length Avg: %f\n", stats.avg_chain_len);
    erts_print(to, to_arg, "Chain Length Max: %d\n", stats.max_chain_len);
    erts_print(to, to_arg, "Chain Length Min: %d\n", stats.min_chain_len);
    erts_print(to, to_arg, "Chain Length Std Dev: %f\n",
               stats.std_dev_chain_len);
    erts_print(to, to_arg, "Chain Length Expected Std Dev: %f\n",
               stats.std_dev_expected);

    if (IS_FIXED(tb))
        erts_print(to, to_arg, "Fixed: %d\n", stats.kept_items);
    else
        erts_print(to, to_arg, "Fixed: false\n");

    if (show) {
	for (i = 0; i < NACTIVE(tb); i++) {
	    HashDbTerm* list = BUCKET(tb,i);
	    if (list == NULL)
		continue;
	    erts_print(to, to_arg, "%d: [", i);
	    while(list != 0) {
		if (is_pseudo_deleted(list))
		    erts_print(to, to_arg, "*");
		if (tb->common.compress) {
		    Eterm key = GETKEY(tb, list->dbterm.tpl);
		    erts_print(to, to_arg, "key=%T", key);
		}
		else {
		    Eterm obj = make_tuple(list->dbterm.tpl);
		    erts_print(to, to_arg, "%T", obj);
		}
		if (list->next != 0)
		    erts_print(to, to_arg, ",");
		list = list->next;
	    }
	    erts_print(to, to_arg, "]\n");
	}
    }
}

static int db_free_empty_table_hash(DbTable *tbl)
{
    ASSERT(NITEMS(tbl) == 0);
    while (db_free_table_continue_hash(tbl, ERTS_SWORD_MAX) < 0)
	;
    return 0;
}

static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds)
{
    DbTableHash *tb = &tbl->hash;
    FixedDeletion* fixdel = (FixedDeletion*) erts_atomic_read_acqb(&tb->fixdel);
    ERTS_LC_ASSERT(IS_TAB_WLOCKED(tb) || (tb->common.status & DB_DELETE));

    while (fixdel != NULL) {
	FixedDeletion *fx = fixdel;

	fixdel = fx->next;
        free_fixdel(tb, fx);
	if (--reds < 0) {
	    erts_atomic_set_relb(&tb->fixdel, (erts_aint_t)fixdel);
	    return reds;		/* Not done */
	}
    }
    erts_atomic_set_relb(&tb->fixdel, (erts_aint_t)NULL);

    while(tb->nslots != 0) {
	reds -= EXT_SEGSZ/64 + free_seg(tb, 1);

	/*
	 * If we have done enough work, get out here.
	 */
	if (reds < 0) {
	    return reds;	/* Not done */
	}
    }
    if (tb->locks != NULL) {
	int i;
	for (i=0; i<DB_HASH_LOCK_CNT; ++i) {
	    erts_rwmtx_destroy(GET_LOCK(tb,i)); 
	}
	erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb,
		     (void*)tb->locks, sizeof(DbTableHashFineLocks));
	tb->locks = NULL;
    }
    ASSERT(erts_atomic_read_nob(&tb->common.memory_size) == sizeof(DbTable));
    return reds;			/* Done */
}



/*
** Utility routines. (static)
*/
/*
** For the select functions, analyzes the pattern and determines which
** slots should be searched. Also compiles the match program
*/
static int analyze_pattern(DbTableHash *tb, Eterm pattern, 
                           extra_match_validator_t extra_validator, /* Optional callback */
                           struct mp_info *mpi)
{
    Eterm *ptpl;
    Eterm lst, tpl, ttpl;
    Eterm *matches,*guards, *bodies;
    Eterm sbuff[30];
    Eterm *buff = sbuff;
    Eterm key = NIL;	       
    HashValue hval = NIL;      
    int num_heads = 0;
    int i;

    mpi->lists = mpi->dlists;
    mpi->num_lists = 0;
    mpi->key_given = 1;
    mpi->something_can_match = 0;
    mpi->mp = NULL;

    for (lst = pattern; is_list(lst); lst = CDR(list_val(lst)))
	++num_heads;

    if (lst != NIL) {/* proper list... */
	return DB_ERROR_BADPARAM;
    }

    if (num_heads > 10) {
	buff = erts_alloc(ERTS_ALC_T_DB_TMP, sizeof(Eterm) * num_heads * 3);
	mpi->lists = erts_alloc(ERTS_ALC_T_DB_SEL_LIST,
				sizeof(*(mpi->lists)) * num_heads);	
    }

    matches = buff;
    guards = buff + num_heads;
    bodies = buff + (num_heads * 2);

    i = 0;
    for(lst = pattern; is_list(lst); lst = CDR(list_val(lst))) {
        Eterm match;
        Eterm guard;
        Eterm body;

	ttpl = CAR(list_val(lst));
	if (!is_tuple(ttpl)) {
	    if (buff != sbuff) { 
		erts_free(ERTS_ALC_T_DB_TMP, buff);
	    }
	    return DB_ERROR_BADPARAM;
	}
	ptpl = tuple_val(ttpl);
	if (ptpl[0] != make_arityval(3U)) {
	    if (buff != sbuff) { 
		erts_free(ERTS_ALC_T_DB_TMP, buff);
	    }
	    return DB_ERROR_BADPARAM;
	}
	matches[i] = match = tpl = ptpl[1];
	guards[i] = guard = ptpl[2];
	bodies[i] = body = ptpl[3];

        if(extra_validator != NULL && !extra_validator(tb->common.keypos, match, guard, body)) {
	    if (buff != sbuff) {
		erts_free(ERTS_ALC_T_DB_TMP, buff);
	    }
            return DB_ERROR_BADPARAM;
        }

	if (!is_list(body) || CDR(list_val(body)) != NIL ||
	    CAR(list_val(body)) != am_DollarUnderscore) {
	}
	++i;
	if (!(mpi->key_given)) {
	    continue;
	}
	if (tpl == am_Underscore || db_is_variable(tpl) != -1) {
	    (mpi->key_given) = 0;
	    (mpi->something_can_match) = 1;
	} else {
	    key = db_getkey(tb->common.keypos, tpl);
	    if (is_value(key)) {
		if (!db_has_variable(key)) {   /* Bound key */
		    int ix, search_slot;
		    HashDbTerm** bp;
		    erts_rwmtx_t* lck;
		    hval = MAKE_HASH(key);
		    lck = RLOCK_HASH(tb,hval);
		    ix = hash_to_ix(tb, hval);
		    bp = &BUCKET(tb,ix);
		    if (lck == NULL) {
			search_slot = search_list(tb,key,hval,*bp) != NULL;
		    } else {
			/* No point to verify if key exist now as there may be
			   concurrent inserters/deleters anyway */
			RUNLOCK_HASH(lck);
			search_slot = 1;
		    }
		    if (search_slot) {
			int j;
			for (j=0; ; ++j) {
			    if (j == mpi->num_lists) {
				mpi->lists[mpi->num_lists].bucket = bp;
				mpi->lists[mpi->num_lists].ix = ix;
				++mpi->num_lists;
				break;
			    }
			    if (mpi->lists[j].bucket == bp) {
				ASSERT(mpi->lists[j].ix == ix);
				break;
			    }
			    ASSERT(mpi->lists[j].ix != ix);
			}
			mpi->something_can_match = 1;
		    }
		} else {
		    mpi->key_given = 0;
		    mpi->something_can_match = 1;
		}
	    }
	}
    }

    /*
     * It would be nice not to compile the match_spec if nothing could match,
     * but then the select calls would not fail like they should on bad 
     * match specs that happen to specify non existent keys etc.
     */
    if ((mpi->mp = db_match_compile(matches, guards, bodies,
				    num_heads, DCOMP_TABLE, NULL)) 
	== NULL) {
	if (buff != sbuff) { 
	    erts_free(ERTS_ALC_T_DB_TMP, buff);
	}
	return DB_ERROR_BADPARAM;
    }
    if (buff != sbuff) { 
	erts_free(ERTS_ALC_T_DB_TMP, buff);
    }
    return DB_ERROR_NONE;
}

static struct ext_segtab* alloc_ext_segtab(DbTableHash* tb, unsigned seg_ix)
{
    struct segment** old_segtab = SEGTAB(tb);
    int nsegs = 0;
    struct ext_segtab* est;
    
    ASSERT(seg_ix >= NSEG_1);
    switch (seg_ix) {
    case NSEG_1: nsegs = NSEG_2; break;
    default:     nsegs = seg_ix + NSEG_INC; break;
    }
    ASSERT(nsegs > tb->nsegs);
    est = (struct ext_segtab*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
                                             (DbTable *) tb,
                                             SIZEOF_EXT_SEGTAB(nsegs));
    est->nsegs = nsegs;
    est->prev_segtab = old_segtab;
    est->prev_nsegs = tb->nsegs;
    sys_memcpy(est->segtab, old_segtab, tb->nsegs*sizeof(struct segment*));
#ifdef DEBUG
    sys_memset(&est->segtab[seg_ix], 0, (nsegs-seg_ix)*sizeof(struct segment*));
#endif
    return est;
}

/* Extend table with one new segment
*/
static void alloc_seg(DbTableHash *tb)
{    
    int seg_ix = SLOT_IX_TO_SEG_IX(tb->nslots);
    struct segment** segtab;

    ASSERT(seg_ix > 0);
    if (seg_ix == tb->nsegs) { /* New segtab needed */
	struct ext_segtab* est = alloc_ext_segtab(tb, seg_ix);
        SET_SEGTAB(tb, est->segtab);
        tb->nsegs = est->nsegs;
    }
    ASSERT(seg_ix < tb->nsegs);
    segtab = SEGTAB(tb);
    segtab[seg_ix] = (struct segment*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
                                                     (DbTable *) tb,
                                                     SIZEOF_SEGMENT(EXT_SEGSZ));
    sys_memset(segtab[seg_ix], 0, SIZEOF_SEGMENT(EXT_SEGSZ));
    tb->nslots += EXT_SEGSZ;
}

static void dealloc_ext_segtab(void* lop_data)
{
    struct ext_segtab* est = (struct ext_segtab*) lop_data;

    erts_free(ERTS_ALC_T_DB_SEG, est);
}

/* Shrink table by freeing the top segment
** free_records: 1=free any records in segment, 0=assume segment is empty 
*/
static int free_seg(DbTableHash *tb, int free_records)
{
    const int seg_ix = SLOT_IX_TO_SEG_IX(tb->nslots) - 1;
    struct segment** const segtab = SEGTAB(tb);
    struct segment* const segp = segtab[seg_ix];
    Uint seg_sz;
    int nrecords = 0;

    ASSERT(segp != NULL);
#ifndef DEBUG
    if (free_records)
#endif
    {	
	int i = (seg_ix == 0) ? FIRST_SEGSZ : EXT_SEGSZ;
	while (i--) {
	    HashDbTerm* p = segp->buckets[i];
	    while(p != 0) {		
		HashDbTerm* nxt = p->next;
		ASSERT(free_records); /* segment not empty as assumed? */
		free_term(tb, p);
		p = nxt;
		++nrecords;
	    }
	}
    }
    
    if (seg_ix >= NSEG_1) {
        struct ext_segtab* est = ErtsContainerStruct_(segtab,struct ext_segtab,segtab);

        if (seg_ix == est->prev_nsegs) { /* Dealloc extended segtab */
            ASSERT(est->prev_segtab != NULL);
            SET_SEGTAB(tb, est->prev_segtab);
            tb->nsegs = est->prev_nsegs;

            if (!tb->common.is_thread_safe) {
                /*
                 * Table is doing a graceful shrink operation and we must avoid
                 * deallocating this segtab while it may still be read by other
                 * threads. Schedule deallocation with thread progress to make
                 * sure no lingering threads are still hanging in BUCKET macro
                 * with an old segtab pointer.
                 */
                Uint sz = SIZEOF_EXT_SEGTAB(est->nsegs);
                ASSERT(sz == ERTS_ALC_DBG_BLK_SZ(est));
                ERTS_DB_ALC_MEM_UPDATE_(tb, sz, 0);
                erts_schedule_thr_prgr_later_cleanup_op(dealloc_ext_segtab,
                                                        est,
                                                        &est->lop,
                                                        sz);
            }
            else
                erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable*)tb, est,
                             SIZEOF_EXT_SEGTAB(est->nsegs));
        }
    }
    seg_sz = (seg_ix == 0) ? FIRST_SEGSZ : EXT_SEGSZ;
    erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb, segp, SIZEOF_SEGMENT(seg_sz));
    
#ifdef DEBUG
    if (seg_ix < tb->nsegs)
        SEGTAB(tb)[seg_ix] = NULL;
#endif
    tb->nslots -= seg_sz;
    ASSERT(tb->nslots >= 0);
    return nrecords;
}


/*
** Copy terms from ptr1 until ptr2
** works for ptr1 == ptr2 == 0  => []
** or ptr2 == 0
** sz is either precalculated heap size or 0 if not known
*/
static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2,
			     Uint sz, DbTableHash* tb)
{
    HashDbTerm* ptr;
    Eterm list = NIL;
    Eterm copy;
    Eterm *hp, *hend;

    if (!sz) {
	ptr = ptr1;
	while(ptr != ptr2) {
	    if (!is_pseudo_deleted(ptr))
		sz += ptr->dbterm.size + 2;
	    ptr = ptr->next;
	}
    }

    hp = HAlloc(p, sz);
    hend = hp + sz;

    ptr = ptr1;
    while(ptr != ptr2) {
	if (!is_pseudo_deleted(ptr)) {
	    copy = db_copy_object_from_ets(&tb->common, &ptr->dbterm, &hp, &MSO(p));
	    list = CONS(hp, copy, list);
	    hp  += 2;
	}
	ptr = ptr->next;
    }
    HRelease(p,hend,hp);

    return list;
}

static ERTS_INLINE int
begin_resizing(DbTableHash* tb)
{
    if (DB_USING_FINE_LOCKING(tb))
	return !erts_atomic_xchg_acqb(&tb->is_resizing, 1);
    else
        ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock));
    return 1;
}

static ERTS_INLINE void
done_resizing(DbTableHash* tb)
{
    if (DB_USING_FINE_LOCKING(tb))
	erts_atomic_set_relb(&tb->is_resizing, 0);
}

/* Grow table with one or more new buckets.
** Allocate new segment if needed.
*/
static void grow(DbTableHash* tb, int nitems)
{
    HashDbTerm** pnext;
    HashDbTerm** to_pnext;
    HashDbTerm* p;
    erts_rwmtx_t* lck;
    int nactive;
    int from_ix, to_ix;
    int szm;
    int loop_limit = 5;

    do {
        if (!begin_resizing(tb))
            return; /* already in progress */
        nactive = NACTIVE(tb);
        if (nitems <= GROW_LIMIT(nactive)) {
            goto abort; /* already done (race) */
        }

        /* Ensure that the slot nactive exists */
        if (nactive == tb->nslots) {
            /* Time to get a new segment */
            ASSERT(((nactive-FIRST_SEGSZ) & EXT_SEGSZ_MASK) == 0);
            alloc_seg(tb);
        }
        ASSERT(nactive < tb->nslots);

        szm = erts_atomic_read_nob(&tb->szm);
        if (nactive <= szm) {
            from_ix = nactive & (szm >> 1);
        } else {
            ASSERT(nactive == szm+1);
            from_ix = 0;
            szm = (szm<<1) | 1;
        }
        to_ix = nactive;

        lck = WLOCK_HASH(tb, from_ix);
        ERTS_ASSERT(lck == GET_LOCK_MAYBE(tb,to_ix));
        /* Now a final double check (with the from_ix lock held)
         * that we did not get raced by a table fixer.
         */
        if (IS_FIXED(tb)) {
            WUNLOCK_HASH(lck);
            goto abort;
        }
        erts_atomic_set_nob(&tb->nactive, ++nactive);
        if (from_ix == 0) {
            if (DB_USING_FINE_LOCKING(tb))
                erts_atomic_set_relb(&tb->szm, szm);
            else
                erts_atomic_set_nob(&tb->szm, szm);
        }
        done_resizing(tb);

        /* Finally, let's split the bucket. We try to do it in a smart way
           to keep link order and avoid unnecessary updates of next-pointers */
        pnext = &BUCKET(tb, from_ix);
        p = *pnext;
        to_pnext = &BUCKET(tb, to_ix);
        while (p != NULL) {
            if (is_pseudo_deleted(p)) { /* rare but possible with fine locking */
                *pnext = p->next;
                free_term(tb, p);
                p = *pnext;
            }
            else {
                int ix = p->hvalue & szm;
                if (ix != from_ix) {
                    ASSERT(ix == (from_ix ^ ((szm+1)>>1)));
                    *to_pnext = p;
                    /* Swap "from" and "to": */
                    from_ix = ix;
                    to_pnext = pnext;
                }
                pnext = &p->next;
                p = *pnext;
            }
        }
        *to_pnext = NULL;
        WUNLOCK_HASH(lck);

    }while (--loop_limit && nitems > GROW_LIMIT(nactive));

    return;
   
abort:
    done_resizing(tb);
}


/* Shrink table by joining top bucket.
** Remove top segment if it gets empty.
*/
static void shrink(DbTableHash* tb, int nitems)
{
    HashDbTerm** src_bp;
    HashDbTerm** dst_bp;
    HashDbTerm** bp;
    erts_rwmtx_t* lck;
    int src_ix, dst_ix, low_szm;
    int nactive;
    int loop_limit = 5;

    do {
        if (!begin_resizing(tb))
            return; /* already in progress */
        nactive = NACTIVE(tb);
        if (!(nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive))) {
            goto abort; /* already done (race) */
        }
        src_ix = nactive - 1;
        low_szm = erts_atomic_read_nob(&tb->szm) >> 1;
        dst_ix = src_ix & low_szm;

        ASSERT(dst_ix < src_ix);
        ASSERT(nactive > FIRST_SEGSZ);
        lck = WLOCK_HASH(tb, dst_ix);
        ERTS_ASSERT(lck == GET_LOCK_MAYBE(tb,src_ix));
        /* Double check for racing table fixers */
        if (IS_FIXED(tb)) {
            WUNLOCK_HASH(lck);
            goto abort;
        }

        src_bp = &BUCKET(tb, src_ix);
        dst_bp = &BUCKET(tb, dst_ix);
        bp = src_bp;

        /*
         * We join lists by appending "dst" at the end of "src"
         * as we must step through "src" anyway to purge pseudo deleted.
         */
        while(*bp != NULL) {
            if (is_pseudo_deleted(*bp)) {
                HashDbTerm* deleted = *bp;
                *bp = deleted->next;
                free_term(tb, deleted);
            } else {
                bp = &(*bp)->next;
            }
        }
        *bp = *dst_bp;
        *dst_bp = *src_bp;
        *src_bp = NULL;

        nactive = src_ix;
        erts_atomic_set_nob(&tb->nactive, nactive);
        if (dst_ix == 0) {
            erts_atomic_set_relb(&tb->szm, low_szm);
        }
        WUNLOCK_HASH(lck);

        if (tb->nslots - src_ix >= EXT_SEGSZ) {
            free_seg(tb, 0);
        }
        done_resizing(tb);

    } while (--loop_limit
             && nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive));
    return;

abort:
    done_resizing(tb);
}

/* Search a list of tuples for a matching key */

static HashDbTerm* search_list(DbTableHash* tb, Eterm key, 
			       HashValue hval, HashDbTerm *list)
{
    while (list != 0) {
	if (has_live_key(tb,list,key,hval))
	    return list;
	list = list->next;
    }
    return 0;
}


/* This function is called by the next AND the select BIF */
/* It return the next live object in a table, NULL if no more */
/* In-bucket: RLOCKED */
/* Out-bucket: RLOCKED unless NULL */
static HashDbTerm* next_live(DbTableHash *tb, Uint *iptr, erts_rwmtx_t** lck_ptr,
			     HashDbTerm *list)
{
    int i;

    ERTS_LC_ASSERT(IS_HASH_RLOCKED(tb,*iptr));

    for ( ; list != NULL; list = list->next) {
	if (!is_pseudo_deleted(list))
	    return list;        
    }

    i = *iptr;
    while ((i=next_slot(tb, i, lck_ptr)) != 0) {

	list = BUCKET(tb,i);
	while (list != NULL) {
	    if (!is_pseudo_deleted(list)) {
		*iptr = i;
		return list;        
	    }
	    list = list->next;
	}
    }
    /* *iptr = ??? */
    return NULL;
}

static int
db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj,
                      DbUpdateHandle* handle)
{
    DbTableHash *tb = &tbl->hash;
    HashValue hval;
    HashDbTerm **bp, *b;
    erts_rwmtx_t* lck;
    int flags = 0;

    ASSERT(tb->common.status & DB_SET);

    hval = MAKE_HASH(key);
    lck = WLOCK_HASH(tb, hval);
    bp = &BUCKET(tb, hash_to_ix(tb, hval));
    b = *bp;

    for (;;) {
        if (b == NULL) {
            break;
        }
        if (has_key(tb, b, key, hval)) {
            if (!is_pseudo_deleted(b)) {
                goto Ldone;
            }
            break;
        }
        bp = &b->next;
        b = *bp;
    }

    if (obj == THE_NON_VALUE) {
        WUNLOCK_HASH(lck);
        return 0;
    }

    {
        Eterm *objp = tuple_val(obj);
        int arity = arityval(*objp);
        Eterm *htop, *hend;

        ASSERT(arity >= tb->common.keypos);
        htop = HAlloc(p, arity + 1);
        hend = htop + arity + 1;
        sys_memcpy(htop, objp, sizeof(Eterm) * (arity + 1));
        htop[tb->common.keypos] = key;
        obj = make_tuple(htop);

        if (b == NULL) {
            HashDbTerm *q = new_dbterm(tb, obj);

            q->hvalue = hval;
            q->pseudo_deleted = 0;
            q->next = NULL;
            *bp = b = q;
            flags |= DB_INC_TRY_GROW;
        } else {
            HashDbTerm *q, *next = b->next;

            ASSERT(is_pseudo_deleted(b));
            q = replace_dbterm(tb, b, obj);
            q->next = next;
            ASSERT(q->hvalue == hval);
            q->pseudo_deleted = 0;
            *bp = b = q;
            erts_atomic_inc_nob(&tb->common.nitems);
        }

        HRelease(p, hend, htop);
        flags |= DB_NEW_OBJECT;
    }

Ldone:
    handle->tb = tbl;
    handle->bp = (void **)bp;
    handle->dbterm = &b->dbterm;
    handle->flags = flags;
    handle->new_size = b->dbterm.size;
    handle->lck = lck;
    return 1;
}

/* Must be called after call to db_lookup_dbterm
*/
static void
db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle)
{
    DbTable* tbl = handle->tb;
    DbTableHash *tb = &tbl->hash;
    HashDbTerm **bp = (HashDbTerm **) handle->bp;
    HashDbTerm *b = *bp;
    erts_rwmtx_t* lck = (erts_rwmtx_t*) handle->lck;
    HashDbTerm* free_me = NULL;

    ERTS_LC_ASSERT(IS_HASH_WLOCKED(tb, lck));  /* locked by db_lookup_dbterm_hash */

    ASSERT((&b->dbterm == handle->dbterm) == !(tb->common.compress && handle->flags & DB_MUST_RESIZE));

    if (handle->flags & DB_NEW_OBJECT && cret != DB_ERROR_NONE) {
        if (IS_FIXED(tb) && add_fixed_deletion(tb, hash_to_ix(tb, b->hvalue),
                                               0)) {
            b->pseudo_deleted = 1;
        } else {
            *bp = b->next;
            free_me = b;
        }

        WUNLOCK_HASH(lck);
        erts_atomic_dec_nob(&tb->common.nitems);
        try_shrink(tb);
    } else {
        if (handle->flags & DB_MUST_RESIZE) {
            db_finalize_resize(handle, offsetof(HashDbTerm,dbterm));
            free_me = b;
        }
        if (handle->flags & DB_INC_TRY_GROW) {
            int nactive;
            int nitems = erts_atomic_inc_read_nob(&tb->common.nitems);
            WUNLOCK_HASH(lck);
            nactive = NACTIVE(tb);

            if (nitems > GROW_LIMIT(nactive) && !IS_FIXED(tb)) {
                grow(tb, nitems);
            }
        } else {
            WUNLOCK_HASH(lck);
        }
    }

    if (free_me)
        free_term(tb, free_me);

#ifdef DEBUG
    handle->dbterm = 0;
#endif
    return;
}

static SWord db_delete_all_objects_hash(Process* p, DbTable* tbl, SWord reds)
{
    if (IS_FIXED(tbl)) {
	reds = db_mark_all_deleted_hash(tbl, reds);
    } else {
        reds = db_free_table_continue_hash(tbl, reds);
        if (reds < 0)
            return reds;

	db_create_hash(p, tbl);
	erts_atomic_set_nob(&tbl->hash.common.nitems, 0);
    }
    return reds;
}

void db_foreach_offheap_hash(DbTable *tbl,
			     void (*func)(ErlOffHeap *, void *),
			     void * arg)
{
    DbTableHash *tb = &tbl->hash;
    HashDbTerm* list;
    int i;
    int nactive = NACTIVE(tb);
    
    for (i = 0; i < nactive; i++) {
	list = BUCKET(tb,i);
	while(list != 0) {
	    ErlOffHeap tmp_offheap;
	    tmp_offheap.first = list->dbterm.first_oh;
	    tmp_offheap.overhead = 0;
	    (*func)(&tmp_offheap, arg);
	    list->dbterm.first_oh = tmp_offheap.first;
	    list = list->next;
	}
    }
}

void db_calc_stats_hash(DbTableHash* tb, DbHashStats* stats)
{
    HashDbTerm* b;
    erts_rwmtx_t* lck;
    int sum = 0;
    int sq_sum = 0;
    int kept_items = 0;
    int ix;
    int len;
    
    stats->min_chain_len = INT_MAX;
    stats->max_chain_len = 0;
    ix = 0;
    lck = RLOCK_HASH(tb,ix);
    do {
	len = 0;
	for (b = BUCKET(tb,ix); b!=NULL; b=b->next) {
	    len++;
            if (is_pseudo_deleted(b))
                ++kept_items;
	}
	sum += len;
	sq_sum += len*len;
	if (len < stats->min_chain_len) stats->min_chain_len = len;
	if (len > stats->max_chain_len) stats->max_chain_len = len;
	ix = next_slot(tb,ix,&lck);
    }while (ix);
    stats->avg_chain_len = (float)sum / NACTIVE(tb);	
    stats->std_dev_chain_len = sqrt((sq_sum - stats->avg_chain_len*sum) / NACTIVE(tb));
    /* Expected	standard deviation from a good uniform hash function, 
       ie binomial distribution (not taking the linear hashing into acount) */
    stats->std_dev_expected = sqrt(stats->avg_chain_len * (1 - 1.0/NACTIVE(tb)));
    stats->kept_items = kept_items;
}

/* For testing only */
Eterm erts_ets_hash_sizeof_ext_segtab(void)
{
    return make_small(((SIZEOF_EXT_SEGTAB(0)-1) / sizeof(UWord)) + 1);
}

#ifdef ERTS_ENABLE_LOCK_COUNT
void erts_lcnt_enable_db_hash_lock_count(DbTableHash *tb, int enable) {
    int i;

    if(tb->locks == NULL) {
        return;
    }

    for(i = 0; i < DB_HASH_LOCK_CNT; i++) {
        erts_lcnt_ref_t *ref = &tb->locks->lck_vec[i].lck.lcnt;

        if(enable) {
            erts_lcnt_install_new_lock_info(ref, "db_hash_slot", tb->common.the_name,
                ERTS_LOCK_TYPE_RWMUTEX | ERTS_LOCK_FLAGS_CATEGORY_DB);
        } else {
            erts_lcnt_uninstall(ref);
        }
    }
}
#endif /* ERTS_ENABLE_LOCK_COUNT */