/*
 * %CopyrightBegin%
 *
 * Copyright Ericsson AB 1998-2011. All Rights Reserved.
 *
 * The contents of this file are subject to the Erlang Public License,
 * Version 1.1, (the "License"); you may not use this file except in
 * compliance with the License. You should have received a copy of the
 * Erlang Public License along with this software. If not, it can be
 * retrieved online at http://www.erlang.org/.
 *
 * Software distributed under the License is distributed on an "AS IS"
 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 * the License for the specific language governing rights and limitations
 * under the License.
 *
 * %CopyrightEnd%
 */

/*
** Implementation of unordered ETS tables.
** The tables are implemented as linear dynamic hash tables.
*/

/* SMP:
** The hash table supports two different locking "modes",
** coarse grained and fine grained locking.
**
** Coarse grained locking relies entirely on the caller (erl_db.c) to obtain
** the right kind of lock on the entire table depending on operation (reading
** or writing). No further locking is then done by the table itself.
**
** Fine grained locking is supported by this code to allow concurrent updates
** (and reading) to different parts of the table. This works by keeping one
** rw-mtx for every N'th bucket. Even dynamic growing and shrinking by
** rehashing buckets can be done without exclusive table lock.
**
** A table will support fine grained locking if it is created with flag
** DB_FINE_LOCKED set. The table variable is_thread_safe will then indicate
** if operations need to obtain fine grained locks or not. Some operations
** will for example always use exclusive table lock to guarantee
** a higher level of atomicy.
*/

/* FIXATION:
** Fixating the table, by ets:safe_fixtable or as done by select-operations,
** guarantees two things in current implementation.
** (1) Keys will not *totaly* disappear from the table. A key can thus be used
**     as an iterator to find the next key in iteration sequence. Note however
**     that this does not mean that (pointers to) table objects are guaranteed
**     to be maintained while the table is fixated. A BAG or DBAG may actually
**     remove objects as long as there is at least one object left in the table
**     with the same key (alive or pseudo-deleted). 
** (2) Objects will not be moved between buckets due to table grow/shrink.
**     This will guarantee that iterations do not miss keys or get double-hits.
**
** With fine grained locking, a concurrent thread can fixate the table at any
** time. A "dangerous" operation (delete or move) therefore needs to check
** if the table is fixated while write-locking the bucket.
*/

/*
#ifdef DEBUG
#define HARDDEBUG 1
#endif
*/

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "sys.h"
#include "erl_vm.h"
#include "global.h"
#include "erl_process.h"
#include "error.h"
#define ERTS_WANT_DB_INTERNAL__
#include "erl_db.h"
#include "bif.h"
#include "big.h"
#include "export.h"
#include "erl_binary.h"

#include "erl_db_hash.h"

#ifdef MYDEBUG /* Will fail test case ets_SUITE:memory */
#  define IF_DEBUG(x) x
#  define MY_ASSERT(x) ASSERT(x)
#else
#  define IF_DEBUG(x)
#  define MY_ASSERT(x)
#endif

/* 
 * The following symbols can be manipulated to "tune" the linear hash array 
 */
#define CHAIN_LEN 6                 /* Medium bucket chain len      */

/* Number of slots per segment */
#define SEGSZ_EXP  8
#define SEGSZ   (1 << SEGSZ_EXP)
#define SEGSZ_MASK (SEGSZ-1)

#define NSEG_1     2 /* Size of first segment table (must be at least 2) */ 
#define NSEG_2   256 /* Size of second segment table */
#define NSEG_INC 128 /* Number of segments to grow after that */

#ifdef ETHR_ORDERED_READ_DEPEND
#define SEGTAB(tb) ((struct segment**)erts_smp_atomic_read_nob(&(tb)->segtab))
#else
#define SEGTAB(tb) ((struct segment**)erts_smp_atomic_read_rb(&(tb)->segtab))
#endif
#define NACTIVE(tb) ((int)erts_smp_atomic_read_nob(&(tb)->nactive))
#define NITEMS(tb) ((int)erts_smp_atomic_read_nob(&(tb)->common.nitems))

#define BUCKET(tb, i) SEGTAB(tb)[(i) >> SEGSZ_EXP]->buckets[(i) & SEGSZ_MASK]

/*
 * When deleting a table, the number of records to delete.
 * Approximate number, because we must delete entire buckets.
 */
#define DELETE_RECORD_LIMIT 10000

/* Calculate slot index from hash value.
** RLOCK_HASH or WLOCK_HASH must be done before.
*/
static ERTS_INLINE Uint hash_to_ix(DbTableHash* tb, HashValue hval)
{
    Uint mask = erts_smp_atomic_read_acqb(&tb->szm);
    Uint ix = hval & mask; 
    if (ix >= erts_smp_atomic_read_nob(&tb->nactive)) {
	ix &= mask>>1;
	ASSERT(ix < erts_smp_atomic_read_nob(&tb->nactive));
    }
    return ix;
}

/* Remember a slot containing a pseudo-deleted item (INVALID_HASH)
*/
static ERTS_INLINE void add_fixed_deletion(DbTableHash* tb, int ix)
{
    erts_aint_t was_next;
    erts_aint_t exp_next;
    FixedDeletion* fixd = (FixedDeletion*) erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL,
							 (DbTable *) tb,
							 sizeof(FixedDeletion));
    ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion));
    fixd->slot = ix;
    was_next = erts_smp_atomic_read_acqb(&tb->fixdel);
    do { /* Lockless atomic insertion in linked list: */
	exp_next = was_next;
	fixd->next = (FixedDeletion*) exp_next;
	was_next = erts_smp_atomic_cmpxchg_relb(&tb->fixdel,
						(erts_aint_t) fixd,
						exp_next);
    }while (was_next != exp_next);
}


#define MAX_HASH 0xEFFFFFFFUL
#define INVALID_HASH 0xFFFFFFFFUL

/* optimised version of make_hash (normal case? atomic key) */
#define MAKE_HASH(term) \
    ((is_atom(term) ? (atom_tab(atom_val(term))->slot.bucket.hvalue) : \
      make_hash2(term)) % MAX_HASH)

#ifdef ERTS_SMP
#  define DB_HASH_LOCK_MASK (DB_HASH_LOCK_CNT-1)
#  define GET_LOCK(tb,hval) (&(tb)->locks->lck_vec[(hval) & DB_HASH_LOCK_MASK].lck)

/* Fine grained read lock */
static ERTS_INLINE erts_smp_rwmtx_t* RLOCK_HASH(DbTableHash* tb, HashValue hval)
{
    if (tb->common.is_thread_safe) {
	return NULL;
    } else {
	erts_smp_rwmtx_t* lck = GET_LOCK(tb,hval);
	ASSERT(tb->common.type & DB_FINE_LOCKED);
	erts_smp_rwmtx_rlock(lck);
	return lck;
    }
}
/* Fine grained write lock */
static ERTS_INLINE erts_smp_rwmtx_t* WLOCK_HASH(DbTableHash* tb, HashValue hval)
{
    if (tb->common.is_thread_safe) {
	return NULL;
    } else {
	erts_smp_rwmtx_t* lck = GET_LOCK(tb,hval);
	ASSERT(tb->common.type & DB_FINE_LOCKED);
	erts_smp_rwmtx_rwlock(lck);
	return lck;
    }
}

static ERTS_INLINE void RUNLOCK_HASH(erts_smp_rwmtx_t* lck)
{
    if (lck != NULL) {
	erts_smp_rwmtx_runlock(lck);
    }
}

static ERTS_INLINE void WUNLOCK_HASH(erts_smp_rwmtx_t* lck)
{
    if (lck != NULL) {
	erts_smp_rwmtx_rwunlock(lck);
    }
}
#else /* ERTS_SMP */
# define RLOCK_HASH(tb,hval) NULL 
# define WLOCK_HASH(tb,hval) NULL
# define RUNLOCK_HASH(lck) ((void)lck) 
# define WUNLOCK_HASH(lck) ((void)lck)
#endif /* ERTS_SMP */


#ifdef ERTS_ENABLE_LOCK_CHECK
#  define IFN_EXCL(tb,cmd) (((tb)->common.is_thread_safe) || (cmd))
#  define IS_HASH_RLOCKED(tb,hval) IFN_EXCL(tb,erts_smp_lc_rwmtx_is_rlocked(GET_LOCK(tb,hval)))
#  define IS_HASH_WLOCKED(tb,lck) IFN_EXCL(tb,erts_smp_lc_rwmtx_is_rwlocked(lck))
#  define IS_TAB_WLOCKED(tb) erts_smp_lc_rwmtx_is_rwlocked(&(tb)->common.rwlock)
#else
#  define IS_HASH_RLOCKED(tb,hval) (1)
#  define IS_HASH_WLOCKED(tb,hval) (1)
#  define IS_TAB_WLOCKED(tb) (1)
#endif


/* Iteration helper
** Returns "next" slot index or 0 if EOT reached.
** Slot READ locks updated accordingly, unlocked if EOT.
*/
static ERTS_INLINE Sint next_slot(DbTableHash* tb, Uint ix,
				  erts_smp_rwmtx_t** lck_ptr)
{
#ifdef ERTS_SMP
    ix += DB_HASH_LOCK_CNT;
    if (ix < NACTIVE(tb)) return ix;
    RUNLOCK_HASH(*lck_ptr);
    ix = (ix + 1) & DB_HASH_LOCK_MASK;
    if (ix != 0) *lck_ptr = RLOCK_HASH(tb,ix);
    return ix;
#else
    return (++ix < NACTIVE(tb)) ? ix : 0;
#endif
}
/* Same as next_slot but with WRITE locking */
static ERTS_INLINE Sint next_slot_w(DbTableHash* tb, Uint ix,
				    erts_smp_rwmtx_t** lck_ptr)
{
#ifdef ERTS_SMP
    ix += DB_HASH_LOCK_CNT;
    if (ix < NACTIVE(tb)) return ix;
    WUNLOCK_HASH(*lck_ptr);
    ix = (ix + 1) & DB_HASH_LOCK_MASK;
    if (ix != 0) *lck_ptr = WLOCK_HASH(tb,ix);
    return ix;
#else
    return next_slot(tb,ix,lck_ptr);
#endif
}


/*
 * Some special binary flags
 */
#define BIN_FLAG_ALL_OBJECTS         BIN_FLAG_USR1


static ERTS_INLINE void free_term(DbTableHash *tb, HashDbTerm* p)
{
    db_free_term((DbTable*)tb, p, offsetof(HashDbTerm, dbterm));
}

/*
 * Local types 
 */
struct mp_prefound {
    HashDbTerm** bucket;
    int ix;
};

struct mp_info {
    int all_objects;		/* True if complete objects are always
				 * returned from the match_spec (can use 
				 * copy_shallow on the return value) */
    int something_can_match;	/* The match_spec is not "impossible" */
    int key_given;
    struct mp_prefound dlists[10];  /* Default list of "pre-found" buckets */
    struct mp_prefound* lists;   /* Buckets to search if keys are given, 
				  * = dlists initially */
    unsigned num_lists;         /* Number of elements in "lists",
				 * = 0 initially */
    Binary *mp;                 /* The compiled match program */
};

/* A table segment */
struct segment {
    HashDbTerm* buckets[SEGSZ];
#ifdef MYDEBUG
    int is_ext_segment;
#endif
};

/* A segment that also contains a segment table */
struct ext_segment {    
    struct segment s; /* The segment itself. Must be first */
	
    struct segment** prev_segtab;  /* Used when table is shrinking */
    int nsegs;                     /* Size of segtab */
    struct segment* segtab[1];     /* The segment table */
};
#define SIZEOF_EXTSEG(NSEGS) \
    (offsetof(struct ext_segment,segtab) + sizeof(struct segment*)*(NSEGS))

#if defined(DEBUG) || defined(VALGRIND)
#  define EXTSEG(SEGTAB_PTR) \
    ((struct ext_segment*) (((char*)(SEGTAB_PTR)) - offsetof(struct ext_segment,segtab)))
#endif


static ERTS_INLINE void SET_SEGTAB(DbTableHash* tb,
				   struct segment** segtab)
{
    erts_smp_atomic_set_wb(&tb->segtab, (erts_aint_t) segtab);
#ifdef VALGRIND
    tb->top_ptr_to_segment_with_active_segtab = EXTSEG(segtab);
#endif
}


/* How the table segments relate to each other:

    ext_segment:                      ext_segment:              "plain" segment
   #=================#                #================#        #=============#
   | bucket[0]       |<--+   +------->| bucket[256]    |     +->| bucket[512] |
   | bucket[1]       |   |   |        |       [257]    |     |  |       [513] |
   :                 :   |   |        :                :     |  :             :
   | bucket[255]     |   |   |        |       [511]    |     |  |       [767] |
   |-----------------|   |   |        |----------------|     |  #=============#
   | prev_segtab=NULL|   |   |   +--<---prev_segtab    |     |
   | nsegs = 2       |   |   |   |    | nsegs = 256    |     |
+->| segtab[0] -->-------+---|---|--<---segtab[0]      |<-+  |
|  | segtab[1] -->-----------+---|--<---segtab[1]      |  |  |
|  #=================#           |    | segtab[2] -->-----|--+    ext_segment:         
|                                |    :                :  |      #================#
+----------------<---------------+    | segtab[255] ->----|----->| bucket[255*256]| 
                                      #================#  |      |                | 
                                                          |      :                :
                                                          |      |----------------| 
                                                          +----<---prev_segtab    | 
                                                                 :                :
*/


/*
** Forward decl's (static functions)
*/
static struct ext_segment* alloc_ext_seg(DbTableHash* tb, unsigned seg_ix,
					 struct segment** old_segtab);
static int alloc_seg(DbTableHash *tb);
static int free_seg(DbTableHash *tb, int free_records);
static HashDbTerm* next(DbTableHash *tb, Uint *iptr, erts_smp_rwmtx_t** lck_ptr,
			HashDbTerm *list);
static HashDbTerm* search_list(DbTableHash* tb, Eterm key, 
			       HashValue hval, HashDbTerm *list);
static void shrink(DbTableHash* tb, int nactive);
static void grow(DbTableHash* tb, int nactive);
static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2,
			   DbTableHash*);
static int analyze_pattern(DbTableHash *tb, Eterm pattern, 
			   struct mp_info *mpi);

/*
 *  Method interface functions
 */
static int db_first_hash(Process *p, 
			 DbTable *tbl, 
			 Eterm *ret);

static int db_next_hash(Process *p, 
			DbTable *tbl, 
			Eterm key,
			Eterm *ret);

static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret);

static int db_get_element_hash(Process *p, DbTable *tbl, 
			       Eterm key, int ndex, Eterm *ret);

static int db_erase_object_hash(DbTable *tbl, Eterm object,Eterm *ret);

static int db_slot_hash(Process *p, DbTable *tbl, 
			Eterm slot_term, Eterm *ret);

static int db_select_chunk_hash(Process *p, DbTable *tbl, 
				Eterm pattern, Sint chunk_size,
				int reverse, Eterm *ret);
static int db_select_hash(Process *p, DbTable *tbl, 
			  Eterm pattern, int reverse, Eterm *ret);
static int db_select_count_hash(Process *p, DbTable *tbl, 
				Eterm pattern, Eterm *ret);
static int db_select_delete_hash(Process *p, DbTable *tbl, 
				 Eterm pattern, Eterm *ret);

static int db_select_continue_hash(Process *p, DbTable *tbl, 
				   Eterm continuation, Eterm *ret);

static int db_select_count_continue_hash(Process *p, DbTable *tbl, 
					 Eterm continuation, Eterm *ret);

static int db_select_delete_continue_hash(Process *p, DbTable *tbl,
					  Eterm continuation, Eterm *ret);
static void db_print_hash(int to,
			  void *to_arg,
			  int show,
			  DbTable *tbl);
static int db_free_table_hash(DbTable *tbl);

static int db_free_table_continue_hash(DbTable *tbl);


static void db_foreach_offheap_hash(DbTable *,
				    void (*)(ErlOffHeap *, void *),
				    void *);

static int db_delete_all_objects_hash(Process* p, DbTable* tbl);
#ifdef HARDDEBUG
static void db_check_table_hash(DbTableHash *tb);
#endif
static int db_lookup_dbterm_hash(DbTable *tbl, Eterm key, DbUpdateHandle* handle);
static void db_finalize_dbterm_hash(DbUpdateHandle* handle);

static ERTS_INLINE void try_shrink(DbTableHash* tb)
{
    int nactive = NACTIVE(tb);
    if (nactive > SEGSZ && NITEMS(tb) < (nactive * CHAIN_LEN)
	&& !IS_FIXED(tb)) {
	shrink(tb, nactive);
    }
}	

#define EQ_REL(x,y,y_base) \
    (is_same(x,NULL,y,y_base) || (is_not_both_immed((x),(y)) && eq_rel((x),NULL,(y),y_base)))

/* Is this a live object (not pseodo-deleted) with the specified key? 
*/
static ERTS_INLINE int has_live_key(DbTableHash* tb, HashDbTerm* b,
				    Eterm key, HashValue hval)
{
    if (b->hvalue != hval) return 0;
    else {
	Eterm itemKey = GETKEY(tb, b->dbterm.tpl);
	ASSERT(!is_header(itemKey));
	return EQ_REL(key, itemKey, b->dbterm.tpl);
    }
}

/* Has this object the specified key? Can be pseudo-deleted.
*/
static ERTS_INLINE int has_key(DbTableHash* tb, HashDbTerm* b,
			       Eterm key, HashValue hval)
{
    if (b->hvalue != hval && b->hvalue != INVALID_HASH) return 0;
    else {
	Eterm itemKey = GETKEY(tb, b->dbterm.tpl);
	ASSERT(!is_header(itemKey));
	return EQ_REL(key, itemKey, b->dbterm.tpl);
    }
}

static ERTS_INLINE HashDbTerm* new_dbterm(DbTableHash* tb, Eterm obj)
{
    HashDbTerm* p;
    if (tb->common.compress) {
	p = db_store_term_comp(&tb->common, NULL, offsetof(HashDbTerm,dbterm), obj);
    }
    else {
	p = db_store_term(&tb->common, NULL, offsetof(HashDbTerm,dbterm), obj);
    }
    return p;
}

static ERTS_INLINE HashDbTerm* replace_dbterm(DbTableHash* tb, HashDbTerm* old,
					      Eterm obj)
{
    HashDbTerm* ret;
    ASSERT(old != NULL);
    if (tb->common.compress) {
	ret = db_store_term_comp(&tb->common, &(old->dbterm), offsetof(HashDbTerm,dbterm), obj);
    }
    else {
	ret = db_store_term(&tb->common, &(old->dbterm), offsetof(HashDbTerm,dbterm), obj);
    }
    return ret;
}



/*
** External interface 
*/
DbTableMethod db_hash =
{
    db_create_hash,
    db_first_hash,
    db_next_hash,
    db_first_hash,   /* last == first  */
    db_next_hash,    /* prev == next   */
    db_put_hash,
    db_get_hash,
    db_get_element_hash,
    db_member_hash,
    db_erase_hash,
    db_erase_object_hash,
    db_slot_hash,
    db_select_chunk_hash,
    db_select_hash,
    db_select_delete_hash,
    db_select_continue_hash, /* hmm continue_hash? */
    db_select_delete_continue_hash,
    db_select_count_hash,
    db_select_count_continue_hash,
    db_delete_all_objects_hash,
    db_free_table_hash,
    db_free_table_continue_hash,
    db_print_hash,
    db_foreach_offheap_hash,
#ifdef HARDDEBUG
    db_check_table_hash,
#else
    NULL,
#endif
    db_lookup_dbterm_hash,
    db_finalize_dbterm_hash
};

#ifdef DEBUG
/* Wait a while to provoke race and get code coverage */
static void DEBUG_WAIT(void)
{
    unsigned long spin = 1UL << 20;
    while (--spin);
}
#else
#  define DEBUG_WAIT()
#endif

/* Rare case of restoring the rest of the fixdel list
   when "unfixer" gets interrupted by "fixer" */ 
static void restore_fixdel(DbTableHash* tb, FixedDeletion* fixdel)
{
    /*int tries = 0;*/
    DEBUG_WAIT();
    if (erts_smp_atomic_cmpxchg_relb(&tb->fixdel,
				     (erts_aint_t) fixdel,
				     (erts_aint_t) NULL) != (erts_aint_t) NULL) {
	/* Oboy, must join lists */    
	FixedDeletion* last = fixdel;
	erts_aint_t was_tail;
	erts_aint_t exp_tail;

	while (last->next != NULL) last = last->next;
	was_tail = erts_smp_atomic_read_acqb(&tb->fixdel);
	do { /* Lockless atomic list insertion */
	    exp_tail = was_tail;
	    last->next = (FixedDeletion*) exp_tail;
	    /*++tries;*/
	    DEBUG_WAIT();
	    was_tail = erts_smp_atomic_cmpxchg_relb(&tb->fixdel,
						    (erts_aint_t) fixdel,
						    exp_tail);
	}while (was_tail != exp_tail);
    }
    /*erts_fprintf(stderr,"erl_db_hash: restore_fixdel tries=%d\r\n", tries);*/
}
/*
** Table interface routines ie what's called by the bif's 
*/

void db_unfix_table_hash(DbTableHash *tb)
{
    FixedDeletion* fixdel;

    ERTS_SMP_LC_ASSERT(erts_smp_lc_rwmtx_is_rwlocked(&tb->common.rwlock)
		       || (erts_smp_lc_rwmtx_is_rlocked(&tb->common.rwlock)
			   && !tb->common.is_thread_safe));
restart:
    fixdel = (FixedDeletion*) erts_smp_atomic_xchg_acqb(&tb->fixdel,
							(erts_aint_t) NULL);
    while (fixdel != NULL) {
	FixedDeletion *fx = fixdel;
	int ix = fx->slot;
	HashDbTerm **bp;
	HashDbTerm *b;
	erts_smp_rwmtx_t* lck = WLOCK_HASH(tb,ix);

	if (IS_FIXED(tb)) { /* interrupted by fixer */
	    WUNLOCK_HASH(lck);
	    restore_fixdel(tb,fixdel);
	    if (!IS_FIXED(tb)) {
		goto restart; /* unfixed again! */
	    }
	    return;
	}
	if (ix < NACTIVE(tb)) {
	    bp = &BUCKET(tb, ix);
	    b = *bp;
	    
	    while (b != NULL) {
		if (b->hvalue == INVALID_HASH) {
		    *bp = b->next;
		    free_term(tb, b);
		    b = *bp;
		} else {
		    bp = &b->next;
		    b = b->next;
		}
	    }
	}
	/* else slot has been joined and purged by shrink() */
	WUNLOCK_HASH(lck);
	fixdel = fx->next;
	erts_db_free(ERTS_ALC_T_DB_FIX_DEL,
		     (DbTable *) tb,
		     (void *) fx,
		     sizeof(FixedDeletion));
	ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion));
    }

    /* ToDo: Maybe try grow/shrink the table as well */
}

/* Only used by tests
*/
Uint db_kept_items_hash(DbTableHash *tb)
{
    Uint kept_items = 0;
    Uint ix = 0;
    erts_smp_rwmtx_t* lck = RLOCK_HASH(tb,ix);
    HashDbTerm* b;
    do {
	for (b = BUCKET(tb, ix); b != NULL; b = b->next) {
	    if (b->hvalue == INVALID_HASH) {
		++kept_items;
	    }
	}	
	ix = next_slot(tb, ix, &lck);
    }while (ix);
    return kept_items;
}

int db_create_hash(Process *p, DbTable *tbl)
{
    DbTableHash *tb = &tbl->hash;

    erts_smp_atomic_init_nob(&tb->szm, SEGSZ_MASK);
    erts_smp_atomic_init_nob(&tb->nactive, SEGSZ);
    erts_smp_atomic_init_nob(&tb->fixdel, (erts_aint_t)NULL);
    erts_smp_atomic_init_nob(&tb->segtab, (erts_aint_t)NULL);
    SET_SEGTAB(tb, alloc_ext_seg(tb,0,NULL)->segtab);
    tb->nsegs = NSEG_1;
    tb->nslots = SEGSZ;

    erts_smp_atomic_init_nob(&tb->is_resizing, 0);
#ifdef ERTS_SMP
    if (tb->common.type & DB_FINE_LOCKED) {
	erts_smp_rwmtx_opt_t rwmtx_opt = ERTS_SMP_RWMTX_OPT_DEFAULT_INITER;
	int i;
	if (tb->common.type & DB_FREQ_READ)
	    rwmtx_opt.type = ERTS_SMP_RWMTX_TYPE_FREQUENT_READ;
	tb->locks = (DbTableHashFineLocks*) erts_db_alloc_fnf(ERTS_ALC_T_DB_SEG, /* Other type maybe? */ 
							      (DbTable *) tb,
							      sizeof(DbTableHashFineLocks));	    	    
	for (i=0; i<DB_HASH_LOCK_CNT; ++i) {
	    erts_smp_rwmtx_init_opt_x(&tb->locks->lck_vec[i].lck, &rwmtx_opt,
				      "db_hash_slot", make_small(i));
	}
	/* This important property is needed to guarantee that the buckets
    	 * involved in a grow/shrink operation it protected by the same lock:
	 */
	ASSERT(erts_smp_atomic_read_nob(&tb->nactive) % DB_HASH_LOCK_CNT == 0);
    }
    else { /* coarse locking */
	tb->locks = NULL;
    }
    ERTS_THR_MEMORY_BARRIER;
#endif /* ERST_SMP */
    return DB_ERROR_NONE;
}

static int db_first_hash(Process *p, DbTable *tbl, Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    Uint ix = 0;
    erts_smp_rwmtx_t* lck = RLOCK_HASH(tb,ix);
    HashDbTerm* list;

    for (;;) {
	list = BUCKET(tb,ix);
	if (list != NULL) {
	    if (list->hvalue == INVALID_HASH) {
		list = next(tb,&ix,&lck,list);
	    }
	    break;
	}
	if ((ix=next_slot(tb,ix,&lck)) == 0) {
	    list = NULL;
	    break;
	}
    }
    if (list != NULL) {
	*ret = db_copy_key(p, tbl, &list->dbterm);
	RUNLOCK_HASH(lck);
    }
    else {
	*ret = am_EOT;
    }
    return DB_ERROR_NONE;
}


static int db_next_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    HashValue hval;
    Uint ix;
    HashDbTerm* b;
    erts_smp_rwmtx_t* lck;

    hval = MAKE_HASH(key);
    lck = RLOCK_HASH(tb,hval);
    ix = hash_to_ix(tb, hval);
    b = BUCKET(tb, ix);    

    for (;;) {
	if (b == NULL) {
	    RUNLOCK_HASH(lck);
	    return DB_ERROR_BADKEY;    
	}
	if (has_key(tb, b, key, hval)) {
	    break;
	}
	b = b->next;
    }
    /* Key found */

    b = next(tb, &ix, &lck, b);
    if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
	while (b != 0) {
	    if (!has_live_key(tb, b, key, hval)) {
		break;
	    }
	    b = next(tb, &ix, &lck, b);
	}
    }
    if (b == NULL) {
	*ret = am_EOT;
    }
    else {
	*ret = db_copy_key(p, tbl, &b->dbterm);
	RUNLOCK_HASH(lck);
    }    
    return DB_ERROR_NONE;
}    

int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail)
{
    DbTableHash *tb = &tbl->hash;
    HashValue hval;
    int ix;
    Eterm key;
    HashDbTerm** bp;
    HashDbTerm* b;
    HashDbTerm* q;
    erts_smp_rwmtx_t* lck;
    int nitems;
    int ret = DB_ERROR_NONE;

    key = GETKEY(tb, tuple_val(obj));
    hval = MAKE_HASH(key);
    lck = WLOCK_HASH(tb, hval);
    ix = hash_to_ix(tb, hval);
    bp = &BUCKET(tb, ix);
    b = *bp;

    for (;;) {
	if (b == NULL) {
	    goto Lnew;
	}
	if (has_key(tb,b,key,hval)) {
	    break;
	}
	bp = &b->next;
	b = b->next;
    }
    /* Key found
    */
    if (tb->common.status & DB_SET) {
	HashDbTerm* bnext = b->next;
	if (b->hvalue == INVALID_HASH) {
	    erts_smp_atomic_inc_nob(&tb->common.nitems);
	}
	else if (key_clash_fail) {
	    ret = DB_ERROR_BADKEY;
	    goto Ldone;
	}
	q = replace_dbterm(tb, b, obj);
	q->next = bnext;
	q->hvalue = hval; /* In case of INVALID_HASH */
	*bp = q;
	goto Ldone;
    }
    else if (key_clash_fail) { /* && (DB_BAG || DB_DUPLICATE_BAG) */
	q = b;
	do {
	    if (q->hvalue != INVALID_HASH) {
		ret = DB_ERROR_BADKEY;
		goto Ldone;
	    }
	    q = q->next;
	}while (q != NULL && has_key(tb,q,key,hval)); 	
    }
    else if (tb->common.status & DB_BAG) {
	HashDbTerm** qp = bp;
	q = b;
	do {
	    if (db_eq(&tb->common,obj,&q->dbterm)) {
		if (q->hvalue == INVALID_HASH) {
		    erts_smp_atomic_inc_nob(&tb->common.nitems);
		    q->hvalue = hval;
		    if (q != b) { /* must move to preserve key insertion order */
			*qp = q->next;
			q->next = b;
			*bp = q;
		    }
		}
		goto Ldone;
	    }
	    qp = &q->next;
	    q = *qp;
	}while (q != NULL && has_key(tb,q,key,hval)); 
    }
    /*else DB_DUPLICATE_BAG */

Lnew:
    q = new_dbterm(tb, obj);
    q->hvalue = hval;
    q->next = b;
    *bp = q;
    nitems = erts_smp_atomic_inc_read_nob(&tb->common.nitems);
    WUNLOCK_HASH(lck);
    {
	int nactive = NACTIVE(tb);       
	if (nitems > nactive * (CHAIN_LEN+1) && !IS_FIXED(tb)) {
	    grow(tb, nactive);
	}
    }
    CHECK_TABLES();
    return DB_ERROR_NONE;

Ldone:
    WUNLOCK_HASH(lck);	
    return ret;
}

int db_get_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    HashValue hval;
    int ix;
    HashDbTerm* b1;
    erts_smp_rwmtx_t* lck;

    hval = MAKE_HASH(key);
    lck = RLOCK_HASH(tb,hval);
    ix = hash_to_ix(tb, hval);
    b1 = BUCKET(tb, ix);

    while(b1 != 0) {
	if (has_live_key(tb,b1,key,hval)) {
	    HashDbTerm* b2 = b1->next;
	    Eterm copy;

	    if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
		while(b2 != NULL && has_key(tb,b2,key,hval))
		    b2 = b2->next;
	    }
	    copy = build_term_list(p, b1, b2, tb);
	    CHECK_TABLES();
	    *ret = copy;
	    goto done;
	}
	b1 = b1->next;
    }
    *ret = NIL;
done:
    RUNLOCK_HASH(lck);
    return DB_ERROR_NONE;
}

int db_get_element_array(DbTable *tbl, 
			 Eterm key,
			 int ndex, 
			 Eterm *ret,
			 int *num_ret)
{
    DbTableHash *tb = &tbl->hash;
    HashValue hval;
    int ix;
    HashDbTerm* b1;
    int num = 0;
    int retval;
    erts_smp_rwmtx_t* lck;

    ASSERT(!IS_FIXED(tbl)); /* no support for fixed tables here */

    hval = MAKE_HASH(key);
    lck = RLOCK_HASH(tb, hval);
    ix = hash_to_ix(tb, hval);
    b1 = BUCKET(tb, ix);

    while(b1 != 0) {
	if (has_live_key(tb,b1,key,hval)) {
	    if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
		HashDbTerm* b;
		HashDbTerm* b2 = b1->next;

		while(b2 != NULL && has_live_key(tb,b2,key,hval)) {
		    if (ndex > arityval(b2->dbterm.tpl[0])) {
			retval = DB_ERROR_BADITEM;
			goto done;
		    }
		    b2 = b2->next;
		}

		b = b1;
		while(b != b2) {
		    if (num < *num_ret) {
			ret[num++] = b->dbterm.tpl[ndex];
		    } else {
			retval = DB_ERROR_NONE;
			goto done;
		    }
		    b = b->next;
		}
		*num_ret = num;
	    }
	    else {
		ASSERT(*num_ret > 0);
		ret[0] = b1->dbterm.tpl[ndex];
		*num_ret = 1;
	    }
	    retval = DB_ERROR_NONE;
	    goto done;
	}
	b1 = b1->next;
    }
    retval = DB_ERROR_BADKEY;
done:
    RUNLOCK_HASH(lck);
    return retval;
}
    
    
static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    HashValue hval;
    int ix;
    HashDbTerm* b1;
    erts_smp_rwmtx_t* lck;

    hval = MAKE_HASH(key);
    ix = hash_to_ix(tb, hval);
    lck = RLOCK_HASH(tb, hval);
    b1 = BUCKET(tb, ix);

    while(b1 != 0) {
	if (has_live_key(tb,b1,key,hval)) {
	    *ret = am_true;
	    goto done;
	}
	b1 = b1->next;
    }
    *ret = am_false;
done:
    RUNLOCK_HASH(lck);
    return DB_ERROR_NONE;
}
    
static int db_get_element_hash(Process *p, DbTable *tbl, 
			       Eterm key,
			       int ndex, 
			       Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    HashValue hval;
    int ix;
    HashDbTerm* b1;
    erts_smp_rwmtx_t* lck;
    int retval;
    
    hval = MAKE_HASH(key);
    lck = RLOCK_HASH(tb, hval);
    ix = hash_to_ix(tb, hval);
    b1 = BUCKET(tb, ix);


    while(b1 != 0) {
	if (has_live_key(tb,b1,key,hval)) {
	    if (ndex > arityval(b1->dbterm.tpl[0])) {
		retval = DB_ERROR_BADITEM;
		goto done;
	    }
	    if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
		HashDbTerm* b;
		HashDbTerm* b2 = b1->next;
		Eterm elem_list = NIL;

		while(b2 != NULL && has_key(tb,b2,key,hval)) {
		    if (ndex > arityval(b2->dbterm.tpl[0])
			&& b2->hvalue != INVALID_HASH) {
			retval = DB_ERROR_BADITEM;
			goto done;
		    }
		    b2 = b2->next;
		}
		b = b1;
		while(b != b2) {
		    if (b->hvalue != INVALID_HASH) {
			Eterm *hp;
			Eterm copy = db_copy_element_from_ets(&tb->common, p,
							      &b->dbterm, ndex, &hp, 2);
			elem_list = CONS(hp, copy, elem_list);
			hp += 2;
		    }
		    b = b->next;
		}
		*ret = elem_list;
	    }
	    else {
		Eterm* hp;
		*ret = db_copy_element_from_ets(&tb->common, p, &b1->dbterm, ndex, &hp, 0);
	    }
	    retval = DB_ERROR_NONE;
	    goto done;
	}
	b1 = b1->next;
    }
    retval = DB_ERROR_BADKEY;
done:
    RUNLOCK_HASH(lck);
    return retval;
}

/*
 * Very internal interface, removes elements of arity two from 
 * BAG. Used for the PID meta table
 */
int db_erase_bag_exact2(DbTable *tbl, Eterm key, Eterm value)
{
    DbTableHash *tb = &tbl->hash;
    HashValue hval;
    int ix;
    HashDbTerm** bp;
    HashDbTerm* b;
    erts_smp_rwmtx_t* lck;
    int found = 0;

    hval = MAKE_HASH(key);
    lck = WLOCK_HASH(tb,hval);
    ix = hash_to_ix(tb, hval);
    bp = &BUCKET(tb, ix);
    b = *bp;

    ASSERT(!IS_FIXED(tb));
    ASSERT((tb->common.status & DB_BAG));
    ASSERT(!tb->common.compress);

    while(b != 0) {
	if (has_live_key(tb,b,key,hval)) {
	    found = 1;
	    if ((arityval(b->dbterm.tpl[0]) == 2) && 
		EQ(value, b->dbterm.tpl[2])) {
		*bp = b->next;
		free_term(tb, b);
		erts_smp_atomic_dec_nob(&tb->common.nitems);
		b = *bp;
		break;
	    }
	} else if (found) {
		break;
	}
	bp = &b->next;
	b = b->next;
    }
    WUNLOCK_HASH(lck);
    if (found) {
	try_shrink(tb);
    }
    return DB_ERROR_NONE;
}
	
/*
** NB, this is for the db_erase/2 bif.
*/
int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    HashValue hval;
    int ix;
    HashDbTerm** bp;
    HashDbTerm* b;
    erts_smp_rwmtx_t* lck;
    int nitems_diff = 0;

    hval = MAKE_HASH(key);
    lck = WLOCK_HASH(tb,hval);
    ix = hash_to_ix(tb, hval);
    bp = &BUCKET(tb, ix);
    b = *bp;

    while(b != 0) {
	if (has_live_key(tb,b,key,hval)) {
	    --nitems_diff;
	    if (nitems_diff == -1 && IS_FIXED(tb)) {
		/* Pseudo remove (no need to keep several of same key) */
		add_fixed_deletion(tb, ix);
		b->hvalue = INVALID_HASH;
	    } else {
		*bp = b->next;
		free_term(tb, b);
		b = *bp;
		continue;
	    }
	}
	else {
	    if (nitems_diff && b->hvalue != INVALID_HASH)
		break;
	}
	bp = &b->next;
	b = b->next;
    }
    WUNLOCK_HASH(lck);
    if (nitems_diff) {
	erts_smp_atomic_add_nob(&tb->common.nitems, nitems_diff);
	try_shrink(tb);
    }
    *ret = am_true;
    return DB_ERROR_NONE;
}    

/*
** This is for the ets:delete_object BIF
*/
static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    HashValue hval;
    int ix;
    HashDbTerm** bp;
    HashDbTerm* b;
    erts_smp_rwmtx_t* lck;
    int nitems_diff = 0;
    int nkeys = 0;
    Eterm key;

    key = GETKEY(tb, tuple_val(object));
    hval = MAKE_HASH(key);
    lck = WLOCK_HASH(tb,hval);
    ix = hash_to_ix(tb, hval);
    bp = &BUCKET(tb, ix);
    b = *bp;

    while(b != 0) {
	if (has_live_key(tb,b,key,hval)) {
	    ++nkeys;
	    if (db_eq(&tb->common,object, &b->dbterm)) {
		--nitems_diff;
		if (nkeys==1 && IS_FIXED(tb)) { /* Pseudo remove */
		    add_fixed_deletion(tb,ix);
		    b->hvalue = INVALID_HASH;
		    bp = &b->next;
		    b = b->next;
		} else {
		    *bp = b->next;
		    free_term(tb, b);
		    b = *bp;
		}
		if (tb->common.status & (DB_DUPLICATE_BAG)) {
		    continue;
		} else {
		    break;
		}
	    }
	}
	else if (nitems_diff && b->hvalue != INVALID_HASH) {
	    break;
	}
	bp = &b->next;
	b = b->next;
    }
    WUNLOCK_HASH(lck);
    if (nitems_diff) {
	erts_smp_atomic_add_nob(&tb->common.nitems, nitems_diff);
	try_shrink(tb);
    }
    *ret = am_true;
    return DB_ERROR_NONE;
}    


static int db_slot_hash(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    erts_smp_rwmtx_t* lck;
    Sint slot;
    int retval;
    int nactive;

    if (is_not_small(slot_term) || ((slot = signed_val(slot_term)) < 0)) {
	return DB_ERROR_BADPARAM;
    }
    lck = RLOCK_HASH(tb, slot);
    nactive = NACTIVE(tb);
    if (slot < nactive) {
	*ret = build_term_list(p, BUCKET(tb, slot), 0, tb);
	retval = DB_ERROR_NONE;
    }
    else if (slot == nactive) {
	*ret = am_EOT;
	retval = DB_ERROR_NONE;
    }    
    else {
	retval = DB_ERROR_BADPARAM;		
    }    
    RUNLOCK_HASH(lck);
    return retval;
}


/*
 * This is just here so I can take care of the return value 
 * that is to be sent during a trap (the BIF_TRAP macros explicitly returns)
 */
static BIF_RETTYPE bif_trap1(Export *bif,
			      Process *p, 
			      Eterm p1) 
{
    BIF_TRAP1(bif, p, p1);
}
    
/*
 * Continue collecting select matches, this may happen either due to a trap
 * or when the user calls ets:select/1
 */
static int db_select_continue_hash(Process *p, 
				   DbTable *tbl, 
				   Eterm continuation, 
				   Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    Sint slot_ix; 
    Sint save_slot_ix;
    Sint chunk_size;
    int all_objects;
    Binary *mp;
    int num_left = 1000;
    HashDbTerm *current = 0;
    Eterm match_list;
    Eterm *hp;
    Eterm match_res;
    Sint got;
    Eterm *tptr;
    erts_smp_rwmtx_t* lck;

#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);

    /* Decode continuation. We know it's a tuple but not the arity or anything else */

    tptr = tuple_val(continuation);

    if (arityval(*tptr) != 6)
	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
    
    if (!is_small(tptr[2]) || !is_small(tptr[3]) || !is_binary(tptr[4]) || 
	!(is_list(tptr[5]) || tptr[5] == NIL) || !is_small(tptr[6]))
	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
    if ((chunk_size = signed_val(tptr[3])) < 0)
	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
    if (!(thing_subtag(*binary_val(tptr[4])) == REFC_BINARY_SUBTAG))
	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
    mp = ((ProcBin *) binary_val(tptr[4]))->val;
    if (!IsMatchProgBinary(mp))
	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
    all_objects = mp->flags & BIN_FLAG_ALL_OBJECTS;
    match_list = tptr[5];
    if ((got = signed_val(tptr[6])) < 0)
	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);

    slot_ix = signed_val(tptr[2]);
    if (slot_ix < 0 /* EOT */ 
	|| (chunk_size && got >= chunk_size)) {       
	goto done; /* Already got all or enough in the match_list */
    }

    lck = RLOCK_HASH(tb,slot_ix);
    if (slot_ix >= NACTIVE(tb)) {
	RUNLOCK_HASH(lck);	
	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
    }

    while ((current = BUCKET(tb,slot_ix)) == NULL) {
	slot_ix = next_slot(tb, slot_ix, &lck);
	if (slot_ix == 0) {
	    slot_ix = -1; /* EOT */
	    goto done;	   
	}
    }	  
    for(;;) {
	if (current->hvalue != INVALID_HASH && 
	    (match_res = db_match_dbterm(&tb->common, p, mp, all_objects,
					 &current->dbterm, &hp, 2),
	     is_value(match_res))) {

	    match_list = CONS(hp, match_res, match_list);
	    ++got;
	}

	--num_left;
	save_slot_ix = slot_ix;
	if ((current = next(tb, (Uint*)&slot_ix, &lck, current)) == NULL) {
	    slot_ix = -1; /* EOT */
	    break;
	}
	if (slot_ix != save_slot_ix) { 
	    if (chunk_size && got >= chunk_size) {
		RUNLOCK_HASH(lck);
		break;
	    }    
	    if (num_left <= 0 || MBUF(p)) {
		/*
		 * We have either reached our limit, or just created some heap fragments.
		 * Since many heap fragments will make the GC slower, trap and GC now.
		 */
		RUNLOCK_HASH(lck);
		goto trap;
	    }
	}
    }
done:
    BUMP_REDS(p, 1000 - num_left);
    if (chunk_size) {
	Eterm continuation;
	Eterm rest = NIL;
	Sint rest_size = 0;

	if (got > chunk_size) { /* Cannot write destructively here, 
				   the list may have 
				   been in user space */
	    rest = NIL;
	    hp = HAlloc(p, (got - chunk_size) * 2); 
	    while (got-- > chunk_size) {
		rest = CONS(hp, CAR(list_val(match_list)), rest);
		hp += 2;
		match_list = CDR(list_val(match_list));
		++rest_size;
	    }
	}
	if (rest != NIL || slot_ix >= 0) {
	    hp = HAlloc(p,3+7);
	    continuation = TUPLE6(hp, tptr[1], make_small(slot_ix), 
				  tptr[3], tptr[4], rest, 
				  make_small(rest_size));
	    hp += 7;
	    RET_TO_BIF(TUPLE2(hp, match_list, continuation),DB_ERROR_NONE);
	} else {
	    if (match_list != NIL) {
		hp = HAlloc(p, 3);
		RET_TO_BIF(TUPLE2(hp, match_list, am_EOT),DB_ERROR_NONE);
	    } else {
		RET_TO_BIF(am_EOT, DB_ERROR_NONE);
	    }
	}
    }
    RET_TO_BIF(match_list,DB_ERROR_NONE);

trap:
    BUMP_ALL_REDS(p);

    hp = HAlloc(p,7);
    continuation = TUPLE6(hp, tptr[1], make_small(slot_ix), tptr[3],
			  tptr[4], match_list, make_small(got));
    RET_TO_BIF(bif_trap1(&ets_select_continue_exp, p, 
			 continuation), 
	       DB_ERROR_NONE);

#undef RET_TO_BIF

}

static int db_select_hash(Process *p, DbTable *tbl, 
			  Eterm pattern, int reverse,
			  Eterm *ret)
{
    return db_select_chunk_hash(p, tbl, pattern, 0, reverse, ret);
}

static int db_select_chunk_hash(Process *p, DbTable *tbl, 
				Eterm pattern, Sint chunk_size, 
				int reverse, /* not used */
				Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    struct mp_info mpi;
    Sint slot_ix;
    HashDbTerm *current = 0;
    unsigned current_list_pos = 0;
    Eterm match_list;
    Eterm match_res;
    Eterm *hp;
    int num_left = 1000;
    Uint got = 0;
    Eterm continuation;
    int errcode;
    Eterm mpb;
    erts_smp_rwmtx_t* lck;


#define RET_TO_BIF(Term,RetVal) do {		\
	if (mpi.mp != NULL) {			\
	    erts_bin_free(mpi.mp);		\
	}					\
	if (mpi.lists != mpi.dlists) {		\
	    erts_free(ERTS_ALC_T_DB_SEL_LIST,	\
		      (void *) mpi.lists);	\
	}					\
	*ret = (Term);				\
	return RetVal;				\
    } while(0)


    if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) {
	RET_TO_BIF(NIL,errcode);
    }

    if (!mpi.something_can_match) {
	if (chunk_size) {
	    RET_TO_BIF(am_EOT, DB_ERROR_NONE); /* We're done */
	}  
	RET_TO_BIF(NIL, DB_ERROR_NONE);
	/* can't possibly match anything */
    }

    if (!mpi.key_given) {
    /* Run this code if pattern is variable or GETKEY(pattern)  */
    /* is a variable                                            */
	slot_ix = 0;
	lck = RLOCK_HASH(tb,slot_ix);
	for (;;) { 
	    ASSERT(slot_ix < NACTIVE(tb));
	    if ((current = BUCKET(tb,slot_ix)) != NULL) {
		break;
	    }
	    slot_ix = next_slot(tb,slot_ix,&lck);
	    if (slot_ix == 0) {
		if (chunk_size) {
		    RET_TO_BIF(am_EOT, DB_ERROR_NONE); /* We're done */
		}  
		RET_TO_BIF(NIL,DB_ERROR_NONE); 
	    }
	}
    } else {
	/* We have at least one */
	slot_ix = mpi.lists[current_list_pos].ix;
	lck = RLOCK_HASH(tb, slot_ix);
	current = *(mpi.lists[current_list_pos].bucket);
	ASSERT(current == BUCKET(tb,slot_ix));
	++current_list_pos;
    }

    match_list = NIL;

    for(;;) {
	if (current != NULL) {
	    if (current->hvalue != INVALID_HASH) {
		match_res = db_match_dbterm(&tb->common, p, mpi.mp, 0,
					    &current->dbterm, &hp, 2);
		if (is_value(match_res)) {
		    match_list = CONS(hp, match_res, match_list);
		    ++got;
		}
	    }
	    current = current->next;
	}	
	else if (mpi.key_given) {  /* Key is bound */
	    RUNLOCK_HASH(lck);
	    if (current_list_pos == mpi.num_lists) {
		slot_ix = -1; /* EOT */
		goto done;
	    } else {
		slot_ix = mpi.lists[current_list_pos].ix;
		lck = RLOCK_HASH(tb, slot_ix);
		current = *(mpi.lists[current_list_pos].bucket);
		ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix));
		++current_list_pos;		       
	    }
	}
	else { /* Key is variable */
	    --num_left;

	    if ((slot_ix=next_slot(tb,slot_ix,&lck)) == 0) {
		slot_ix = -1;
		break;
	    }
	    if (chunk_size && got >= chunk_size) {
		RUNLOCK_HASH(lck);
		break;
	    }    
	    if (num_left <= 0 || MBUF(p)) {
		/*
		 * We have either reached our limit, or just created some heap fragments.
		 * Since many heap fragments will make the GC slower, trap and GC now.
		 */
		RUNLOCK_HASH(lck);
		goto trap;
	    }
	    current = BUCKET(tb,slot_ix);
        }
    }
done:
    BUMP_REDS(p, 1000 - num_left);
    if (chunk_size) {
	Eterm continuation;
	Eterm rest = NIL;
	Sint rest_size = 0;

	if (mpi.all_objects)
	    (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS;
	if (got > chunk_size) { /* Split list in return value and 'rest' */
	    Eterm tmp = match_list;
	    rest = match_list;
	    while (got-- > chunk_size + 1) { 
		tmp = CDR(list_val(tmp));
		++rest_size;
	    }
	    ++rest_size;
	    match_list = CDR(list_val(tmp));
	    CDR(list_val(tmp)) = NIL; /* Destructive, the list has never 
					 been in 'user space' */ 
	}
	if (rest != NIL || slot_ix >= 0) { /* Need more calls */
	    hp = HAlloc(p,3+7+PROC_BIN_SIZE);
	    mpb =db_make_mp_binary(p,(mpi.mp),&hp);
	    if (mpi.all_objects)
		(mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS;
	    continuation = TUPLE6(hp, tb->common.id,make_small(slot_ix), 
				  make_small(chunk_size),  
				  mpb, rest, 
				  make_small(rest_size));
	    mpi.mp = NULL; /*otherwise the return macro will destroy it */
	    hp += 7;
	    RET_TO_BIF(TUPLE2(hp, match_list, continuation),DB_ERROR_NONE);
	} else { /* All data is exhausted */
	    if (match_list != NIL) { /* No more data to search but still a
					result to return to the caller */
		hp = HAlloc(p, 3);
		RET_TO_BIF(TUPLE2(hp, match_list, am_EOT),DB_ERROR_NONE);
	    } else { /* Reached the end of the ttable with no data to return */
		RET_TO_BIF(am_EOT, DB_ERROR_NONE);
	    }
	}
    }
    RET_TO_BIF(match_list,DB_ERROR_NONE);
trap:
    BUMP_ALL_REDS(p);
    if (mpi.all_objects)
	(mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS;
    hp = HAlloc(p,7+PROC_BIN_SIZE);
    mpb =db_make_mp_binary(p,(mpi.mp),&hp);
    continuation = TUPLE6(hp, tb->common.id, make_small(slot_ix), 
			  make_small(chunk_size), 
			  mpb, match_list, 
			  make_small(got));
    mpi.mp = NULL; /*otherwise the return macro will destroy it */
    RET_TO_BIF(bif_trap1(&ets_select_continue_exp, p, 
			 continuation), 
	       DB_ERROR_NONE);

#undef RET_TO_BIF

}

static int db_select_count_hash(Process *p, 
				DbTable *tbl, 
				Eterm pattern,
				Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    struct mp_info mpi;
    Uint slot_ix = 0;
    HashDbTerm* current = NULL;
    unsigned current_list_pos = 0;
    Eterm *hp;
    int num_left = 1000;
    Uint got = 0;
    Eterm continuation;
    int errcode;
    Eterm egot;
    Eterm mpb;
    erts_smp_rwmtx_t* lck;

#define RET_TO_BIF(Term,RetVal) do {		\
	if (mpi.mp != NULL) {			\
	    erts_bin_free(mpi.mp);		\
	}					\
	if (mpi.lists != mpi.dlists) {		\
	    erts_free(ERTS_ALC_T_DB_SEL_LIST,	\
		      (void *) mpi.lists);	\
	}					\
	*ret = (Term);				\
	return RetVal;				\
    } while(0)


    if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) {
	RET_TO_BIF(NIL,errcode);
    }

    if (!mpi.something_can_match) {
	RET_TO_BIF(make_small(0), DB_ERROR_NONE);
	/* can't possibly match anything */
    }

    if (!mpi.key_given) {
    /* Run this code if pattern is variable or GETKEY(pattern)  */
    /* is a variable                                            */      
	slot_ix = 0;
	lck = RLOCK_HASH(tb,slot_ix);
	current = BUCKET(tb,slot_ix);
    } else {
	/* We have at least one */
	slot_ix = mpi.lists[current_list_pos].ix;
	lck = RLOCK_HASH(tb, slot_ix);
	current = *(mpi.lists[current_list_pos].bucket);
	ASSERT(current == BUCKET(tb,slot_ix));
	++current_list_pos;
    }

    for(;;) {
	if (current != NULL) {
	    if (current->hvalue != INVALID_HASH) {
		if (db_match_dbterm(&tb->common, p, mpi.mp, 0,
				    &current->dbterm, NULL,0) == am_true) {
		    ++got;
		}
		--num_left;
	    }
	    current = current->next;
	}
	else { /* next bucket */
	    if (mpi.key_given) {  /* Key is bound */
		RUNLOCK_HASH(lck);
		if (current_list_pos == mpi.num_lists) {
		    goto done;
		} else {
		    slot_ix = mpi.lists[current_list_pos].ix;
		    lck = RLOCK_HASH(tb, slot_ix);
		    current = *(mpi.lists[current_list_pos].bucket);
		    ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix));
		    ++current_list_pos;		       
		}
	    }
	    else {
		if ((slot_ix=next_slot(tb,slot_ix,&lck)) == 0) {
		    goto done;
		}
		if (num_left <= 0) {
		    RUNLOCK_HASH(lck);
		    goto trap;
		}		
		current = BUCKET(tb,slot_ix);
	    }
	}
    }
done:
    BUMP_REDS(p, 1000 - num_left);
    RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE);
trap:
    BUMP_ALL_REDS(p);
    if (IS_USMALL(0, got)) {
	hp = HAlloc(p,  PROC_BIN_SIZE + 5);
	egot = make_small(got);
    }
    else {
	hp = HAlloc(p, BIG_UINT_HEAP_SIZE + PROC_BIN_SIZE + 5);
	egot = uint_to_big(got, hp);
	hp += BIG_UINT_HEAP_SIZE;
    }
    mpb = db_make_mp_binary(p,mpi.mp,&hp);
    continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix), 
			  mpb, 
			  egot);
    mpi.mp = NULL; /*otherwise the return macro will destroy it */
    RET_TO_BIF(bif_trap1(&ets_select_count_continue_exp, p, 
			 continuation), 
	       DB_ERROR_NONE);

#undef RET_TO_BIF
}

static int db_select_delete_hash(Process *p,
				 DbTable *tbl,
				 Eterm pattern,
				 Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    struct mp_info mpi;
    Uint slot_ix = 0;
    HashDbTerm **current = NULL;
    unsigned current_list_pos = 0;
    Eterm *hp;
    int num_left = 1000;
    Uint got = 0;
    Eterm continuation;
    int errcode;
    Uint last_pseudo_delete = (Uint)-1;
    Eterm mpb;
    Eterm egot;
#ifdef ERTS_SMP
    erts_aint_t fixated_by_me = tb->common.is_thread_safe ? 0 : 1; /* ToDo: something nicer */
#else
    erts_aint_t fixated_by_me = 0;
#endif
    erts_smp_rwmtx_t* lck;

#define RET_TO_BIF(Term,RetVal) do {		\
	if (mpi.mp != NULL) {			\
	    erts_bin_free(mpi.mp);		\
	}					\
	if (mpi.lists != mpi.dlists) {		\
	    erts_free(ERTS_ALC_T_DB_SEL_LIST,	\
		      (void *) mpi.lists);	\
	}					\
	*ret = (Term);				\
	return RetVal;				\
    } while(0)


    if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) {
	RET_TO_BIF(NIL,errcode);
    }

    if (!mpi.something_can_match) {
	RET_TO_BIF(make_small(0), DB_ERROR_NONE);
	/* can't possibly match anything */
    }

    if (!mpi.key_given) {
	/* Run this code if pattern is variable or GETKEY(pattern)  */
	/* is a variable                                            */
	lck = WLOCK_HASH(tb,slot_ix);
	current = &BUCKET(tb,slot_ix);
    } else {
	/* We have at least one */
	slot_ix = mpi.lists[current_list_pos].ix;
	lck = WLOCK_HASH(tb, slot_ix);
	current = mpi.lists[current_list_pos++].bucket; 
	ASSERT(*current == BUCKET(tb,slot_ix));
    }


    for(;;) {
	if ((*current) == NULL) {
	    if (mpi.key_given) {  /* Key is bound */
		WUNLOCK_HASH(lck);
		if (current_list_pos == mpi.num_lists) {
		    goto done;
		} else {
		    slot_ix = mpi.lists[current_list_pos].ix;
		    lck = WLOCK_HASH(tb, slot_ix);
		    current = mpi.lists[current_list_pos].bucket;
		    ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix));
		    ++current_list_pos;
		}
	    } else {
		if ((slot_ix=next_slot_w(tb,slot_ix,&lck)) == 0) {
		    goto done;
		}
		if (num_left <= 0) {
		    WUNLOCK_HASH(lck);
		    goto trap;
		}		
		current = &BUCKET(tb,slot_ix);
	    } 
	}
	else if ((*current)->hvalue == INVALID_HASH) {
	    current = &((*current)->next);
	} 
	else {
	    int did_erase = 0;
	    if (db_match_dbterm(&tb->common, p, mpi.mp, 0,
				&(*current)->dbterm, NULL, 0) == am_true) {
		if (NFIXED(tb) > fixated_by_me) { /* fixated by others? */
		    if (slot_ix != last_pseudo_delete) {
			add_fixed_deletion(tb, slot_ix);
			last_pseudo_delete = slot_ix;
		    }
		    (*current)->hvalue = INVALID_HASH;
		} else {
		    HashDbTerm *del = *current;
		    *current = (*current)->next;
		    free_term(tb, del);
		    did_erase = 1;
		}
		erts_smp_atomic_dec_nob(&tb->common.nitems);
		++got;
	    }	    
	    --num_left;
	    if (!did_erase) {
		current = &((*current)->next);
	    }
	}
    }
done:
    BUMP_REDS(p, 1000 - num_left);
    if (got) {
	try_shrink(tb);
    }
    RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE);
trap:
    BUMP_ALL_REDS(p);
    if (IS_USMALL(0, got)) {
	hp = HAlloc(p,  PROC_BIN_SIZE + 5);
	egot = make_small(got);
    }
    else {
	hp = HAlloc(p, BIG_UINT_HEAP_SIZE + PROC_BIN_SIZE + 5);
	egot = uint_to_big(got, hp);
	hp += BIG_UINT_HEAP_SIZE;
    }
    mpb = db_make_mp_binary(p,mpi.mp,&hp);
    continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix), 
			  mpb, 
			  egot);
    mpi.mp = NULL; /*otherwise the return macro will destroy it */
    RET_TO_BIF(bif_trap1(&ets_select_delete_continue_exp, p, 
			 continuation), 
	       DB_ERROR_NONE);

#undef RET_TO_BIF

}
/*
** This is called when select_delete traps
*/
static int db_select_delete_continue_hash(Process *p, 
					  DbTable *tbl,
					  Eterm continuation,
					  Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    Uint slot_ix;
    Uint last_pseudo_delete = (Uint)-1;
    HashDbTerm **current = NULL;
    Eterm *hp;
    int num_left = 1000;
    Uint got;
    Eterm *tptr;
    Binary *mp;
    Eterm egot;
    int fixated_by_me = ONLY_WRITER(p,tb) ? 0 : 1; /* ToDo: something nicer */
    erts_smp_rwmtx_t* lck;

#define RET_TO_BIF(Term,RetVal) do {		\
	*ret = (Term);				\
	return RetVal;				\
    } while(0)

    
    tptr = tuple_val(continuation);
    slot_ix = unsigned_val(tptr[2]);
    mp = ((ProcBin *) binary_val(tptr[3]))->val;
    if (is_big(tptr[4])) {
	got = big_to_uint32(tptr[4]);
    } else {
	got = unsigned_val(tptr[4]);
    }
    
    lck = WLOCK_HASH(tb,slot_ix);
    if (slot_ix >= NACTIVE(tb)) {
	WUNLOCK_HASH(lck);
	goto done;
    } 
    current = &BUCKET(tb,slot_ix);

    for(;;) {
	if ((*current) == NULL) {
	    if ((slot_ix=next_slot_w(tb,slot_ix,&lck)) == 0) {
		goto done;
	    }
	    if (num_left <= 0) {
		WUNLOCK_HASH(lck);
		goto trap;
	    }		
	    current = &BUCKET(tb,slot_ix);
	}
	else if ((*current)->hvalue == INVALID_HASH) {
	    current = &((*current)->next);
	} 
	else {
	    int did_erase = 0;
	    if (db_match_dbterm(&tb->common, p, mp, 0,
				&(*current)->dbterm, NULL, 0) == am_true) {
		if (NFIXED(tb) > fixated_by_me) { /* fixated by others? */
		    if (slot_ix != last_pseudo_delete) {
			add_fixed_deletion(tb, slot_ix);
			last_pseudo_delete = slot_ix;
		    }
		    (*current)->hvalue = INVALID_HASH;
		} else {
		    HashDbTerm *del = *current;
		    *current = (*current)->next;
		    free_term(tb, del);
		    did_erase = 1;
		}
		erts_smp_atomic_dec_nob(&tb->common.nitems);
		++got;
	    }
	    
	    --num_left;
	    if (!did_erase) {
		current = &((*current)->next);
	    }
	}
    }
done:
    BUMP_REDS(p, 1000 - num_left);
    if (got) {
	try_shrink(tb);
    }
    RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE);
trap:
    BUMP_ALL_REDS(p);
    if (IS_USMALL(0, got)) {
	hp = HAlloc(p,  5);
	egot = make_small(got);
    }
    else {
	hp = HAlloc(p, BIG_UINT_HEAP_SIZE + 5);
	egot = uint_to_big(got, hp);
	hp += BIG_UINT_HEAP_SIZE;
    }
    continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix), 
			  tptr[3], 
			  egot);
    RET_TO_BIF(bif_trap1(&ets_select_delete_continue_exp, p, 
			 continuation), 
	       DB_ERROR_NONE);

#undef RET_TO_BIF

}
    
/*
** This is called when select_count traps
*/
static int db_select_count_continue_hash(Process *p, 
					 DbTable *tbl,
					 Eterm continuation,
					 Eterm *ret)
{
    DbTableHash *tb = &tbl->hash;
    Uint slot_ix;
    HashDbTerm* current;
    Eterm *hp;
    int num_left = 1000;
    Uint got;
    Eterm *tptr;
    Binary *mp;
    Eterm egot;
    erts_smp_rwmtx_t* lck;

#define RET_TO_BIF(Term,RetVal) do {		\
	*ret = (Term);				\
	return RetVal;				\
    } while(0)

    
    tptr = tuple_val(continuation);
    slot_ix = unsigned_val(tptr[2]);
    mp = ((ProcBin *) binary_val(tptr[3]))->val;
    if (is_big(tptr[4])) {
	got = big_to_uint32(tptr[4]);
    } else {
	got = unsigned_val(tptr[4]);
    }
    

    lck = RLOCK_HASH(tb, slot_ix);
    if (slot_ix >= NACTIVE(tb)) { /* Is this posible? */
	RUNLOCK_HASH(lck);
	goto done;
    }     
    current = BUCKET(tb,slot_ix);
    
    for(;;) {
	if (current != NULL) {
	    if (current->hvalue == INVALID_HASH) {
		current = current->next;
		continue;
	    }
	    if (db_match_dbterm(&tb->common, p, mp, 0, &current->dbterm,
				NULL, 0) == am_true) {
		++got;
	    }
	    --num_left;
	    current = current->next;
	}
	else { /* next bucket */
            if ((slot_ix = next_slot(tb,slot_ix,&lck)) == 0) {
		goto done;
	    }
	    if (num_left <= 0) {
		RUNLOCK_HASH(lck);
		goto trap;
	    }
	    current = BUCKET(tb,slot_ix);
	}
    }
done:
    BUMP_REDS(p, 1000 - num_left);
    RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE);
trap:
    BUMP_ALL_REDS(p);
    if (IS_USMALL(0, got)) {
	hp = HAlloc(p, 5);
	egot = make_small(got);
    }
    else {
	hp = HAlloc(p, BIG_UINT_HEAP_SIZE + 5);
	egot = uint_to_big(got, hp);
	hp += BIG_UINT_HEAP_SIZE;
    }
    continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix), 
			  tptr[3], 
			  egot);
    RET_TO_BIF(bif_trap1(&ets_select_count_continue_exp, p, 
			 continuation), 
	       DB_ERROR_NONE);

#undef RET_TO_BIF

}
    
/*
** Other interface routines (not directly coupled to one bif)
*/

void db_initialize_hash(void)
{
}


int db_mark_all_deleted_hash(DbTable *tbl)
{
    DbTableHash *tb = &tbl->hash;
    HashDbTerm* list;
    int i;

    ERTS_SMP_LC_ASSERT(IS_TAB_WLOCKED(tb));

    for (i = 0; i < NACTIVE(tb); i++) {
	if ((list = BUCKET(tb,i)) != NULL) {
	    add_fixed_deletion(tb, i);
	    do {
		list->hvalue = INVALID_HASH;
		list = list->next;
	    }while(list != NULL);
	}
    }
    erts_smp_atomic_set_nob(&tb->common.nitems, 0);    
    return DB_ERROR_NONE;
}


/* Display hash table contents (for dump) */
static void db_print_hash(int to, void *to_arg, int show, DbTable *tbl)
{
    DbTableHash *tb = &tbl->hash;
    int i;

    erts_print(to, to_arg, "Buckets: %d \n", NACTIVE(tb));
    
    if (show) {
	for (i = 0; i < NACTIVE(tb); i++) {
	    HashDbTerm* list = BUCKET(tb,i);
	    if (list == NULL)
		continue;
	    erts_print(to, to_arg, "%d: [", i);
	    while(list != 0) {
		if (list->hvalue == INVALID_HASH)
		    erts_print(to, to_arg, "*");
		if (tb->common.compress) {
		    Eterm key = GETKEY(tb, list->dbterm.tpl);
		    erts_print(to, to_arg, "key=%R", key, list->dbterm.tpl);
		}
		else {
		    Eterm obj = make_tuple_rel(list->dbterm.tpl,list->dbterm.tpl);
		    erts_print(to, to_arg, "%R", obj, list->dbterm.tpl);
		}
		if (list->next != 0)
		    erts_print(to, to_arg, ",");
		list = list->next;
	    }
	    erts_print(to, to_arg, "]\n");
	}
    }
}

/* release all memory occupied by a single table */
static int db_free_table_hash(DbTable *tbl)
{
    while (!db_free_table_continue_hash(tbl))
	;
    return 0;
}

static int db_free_table_continue_hash(DbTable *tbl)
{
    DbTableHash *tb = &tbl->hash;
    int done;
    FixedDeletion* fixdel = (FixedDeletion*) erts_smp_atomic_read_acqb(&tb->fixdel);
    ERTS_SMP_LC_ASSERT(IS_TAB_WLOCKED(tb));

    done = 0;
    while (fixdel != NULL) {
	FixedDeletion *fx = fixdel;

	fixdel = fx->next;
	erts_db_free(ERTS_ALC_T_DB_FIX_DEL,
		     (DbTable *) tb,
		     (void *) fx,
		     sizeof(FixedDeletion));
	ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion));
	if (++done >= 2*DELETE_RECORD_LIMIT) {
	    erts_smp_atomic_set_relb(&tb->fixdel, (erts_aint_t)fixdel);
	    return 0;		/* Not done */
	}
    }
    erts_smp_atomic_set_relb(&tb->fixdel, (erts_aint_t)NULL);

    done /= 2;
    while(tb->nslots != 0) {
	free_seg(tb, 1);

	/*
	 * If we have done enough work, get out here.
	 */
	if (++done >= (DELETE_RECORD_LIMIT / CHAIN_LEN / SEGSZ)) {
	    return 0;	/* Not done */
	}
    }
#ifdef ERTS_SMP
    if (tb->locks != NULL) {
	int i;
	for (i=0; i<DB_HASH_LOCK_CNT; ++i) {
	    erts_rwmtx_destroy(GET_LOCK(tb,i)); 
	}
	erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb,
		     (void*)tb->locks, sizeof(DbTableHashFineLocks));
	tb->locks = NULL;
    }
#endif    
    ASSERT(erts_smp_atomic_read_nob(&tb->common.memory_size) == sizeof(DbTable));
    return 1;			/* Done */
}



/*
** Utility routines. (static)
*/
/*
** For the select functions, analyzes the pattern and determines which
** slots should be searched. Also compiles the match program
*/
static int analyze_pattern(DbTableHash *tb, Eterm pattern, 
			   struct mp_info *mpi)
{
    Eterm *ptpl;
    Eterm lst, tpl, ttpl;
    Eterm *matches,*guards, *bodies;
    Eterm sbuff[30];
    Eterm *buff = sbuff;
    Eterm key = NIL;	       
    HashValue hval = NIL;      
    int num_heads = 0;
    int i;

    mpi->lists = mpi->dlists;
    mpi->num_lists = 0;
    mpi->key_given = 1;
    mpi->something_can_match = 0;
    mpi->all_objects = 1;
    mpi->mp = NULL;

    for (lst = pattern; is_list(lst); lst = CDR(list_val(lst)))
	++num_heads;

    if (lst != NIL) {/* proper list... */
	return DB_ERROR_BADPARAM;
    }

    if (num_heads > 10) {
	buff = erts_alloc(ERTS_ALC_T_DB_TMP, sizeof(Eterm) * num_heads * 3);
	mpi->lists = erts_alloc(ERTS_ALC_T_DB_SEL_LIST,
				sizeof(*(mpi->lists)) * num_heads);	
    }

    matches = buff;
    guards = buff + num_heads;
    bodies = buff + (num_heads * 2);

    i = 0;
    for(lst = pattern; is_list(lst); lst = CDR(list_val(lst))) {
	Eterm body;
	ttpl = CAR(list_val(lst));
	if (!is_tuple(ttpl)) {
	    if (buff != sbuff) { 
		erts_free(ERTS_ALC_T_DB_TMP, buff);
	    }
	    return DB_ERROR_BADPARAM;
	}
	ptpl = tuple_val(ttpl);
	if (ptpl[0] != make_arityval(3U)) {
	    if (buff != sbuff) { 
		erts_free(ERTS_ALC_T_DB_TMP, buff);
	    }
	    return DB_ERROR_BADPARAM;
	}
	matches[i] = tpl = ptpl[1];
	guards[i] = ptpl[2];
	bodies[i] = body = ptpl[3];
	if (!is_list(body) || CDR(list_val(body)) != NIL ||
	    CAR(list_val(body)) != am_DollarUnderscore) {
	    mpi->all_objects = 0;
	}
	++i;
	if (!(mpi->key_given)) {
	    continue;
	}
	if (tpl == am_Underscore || db_is_variable(tpl) != -1) {
	    (mpi->key_given) = 0;
	    (mpi->something_can_match) = 1;
	} else {
	    key = db_getkey(tb->common.keypos, tpl);
	    if (is_value(key)) {
		if (!db_has_variable(key)) {   /* Bound key */
		    int ix, search_slot;
		    HashDbTerm** bp;
		    erts_smp_rwmtx_t* lck;
		    hval = MAKE_HASH(key);
		    lck = RLOCK_HASH(tb,hval);
		    ix = hash_to_ix(tb, hval);
		    bp = &BUCKET(tb,ix);
		    if (lck == NULL) {
			search_slot = search_list(tb,key,hval,*bp) != NULL;
		    } else {
			/* No point to verify if key exist now as there may be
			   concurrent inserters/deleters anyway */
			RUNLOCK_HASH(lck);
			search_slot = 1;
		    }
		    if (search_slot) {
			int j;
			for (j=0; ; ++j) {
			    if (j == mpi->num_lists) {
				mpi->lists[mpi->num_lists].bucket = bp;
				mpi->lists[mpi->num_lists].ix = ix;
				++mpi->num_lists;
				break;
			    }
			    if (mpi->lists[j].bucket == bp) {
				ASSERT(mpi->lists[j].ix == ix);
				break;
			    }
			    ASSERT(mpi->lists[j].ix != ix);
			}
			mpi->something_can_match = 1;
		    }
		} else {
		    mpi->key_given = 0;
		    mpi->something_can_match = 1;
		}
	    }
	}
    }

    /*
     * It would be nice not to compile the match_spec if nothing could match,
     * but then the select calls would not fail like they should on bad 
     * match specs that happen to specify non existent keys etc.
     */
    if ((mpi->mp = db_match_compile(matches, guards, bodies,
				    num_heads, DCOMP_TABLE, NULL)) 
	== NULL) {
	if (buff != sbuff) { 
	    erts_free(ERTS_ALC_T_DB_TMP, buff);
	}
	return DB_ERROR_BADPARAM;
    }
    if (buff != sbuff) { 
	erts_free(ERTS_ALC_T_DB_TMP, buff);
    }
    return DB_ERROR_NONE;
}

static struct ext_segment* alloc_ext_seg(DbTableHash* tb, unsigned seg_ix,
					 struct segment** old_segtab)
{
    int nsegs;
    struct ext_segment* eseg;
    
    switch (seg_ix) {
    case 0: nsegs = NSEG_1; break;
    case 1: nsegs = NSEG_2; break; 
    default: nsegs = seg_ix + NSEG_INC; break;
    }    
    eseg = (struct ext_segment*) erts_db_alloc_fnf(ERTS_ALC_T_DB_SEG,
						   (DbTable *) tb,
						   SIZEOF_EXTSEG(nsegs));
    ASSERT(eseg != NULL);	
    sys_memset(&eseg->s, 0, sizeof(struct segment));
    IF_DEBUG(eseg->s.is_ext_segment = 1);
    eseg->prev_segtab = old_segtab;
    eseg->nsegs = nsegs;
    if (old_segtab) {
	ASSERT(nsegs > tb->nsegs);
	sys_memcpy(eseg->segtab, old_segtab, tb->nsegs*sizeof(struct segment*));
    }
#ifdef DEBUG
    sys_memset(&eseg->segtab[seg_ix], 0, (nsegs-seg_ix)*sizeof(struct segment*));
#endif
    eseg->segtab[seg_ix] = &eseg->s;
    return eseg;
}

/* Extend table with one new segment
*/
static int alloc_seg(DbTableHash *tb)
{    
    int seg_ix = tb->nslots >> SEGSZ_EXP;

    if (seg_ix+1 == tb->nsegs) { /* New segtab needed (extended segment) */
	struct segment** segtab = SEGTAB(tb);
	struct ext_segment* seg = alloc_ext_seg(tb, seg_ix, segtab);
    	if (seg == NULL) return 0;
	segtab[seg_ix] = &seg->s;
	/* We don't use the new segtab until next call (see "shrink race") */
    }
    else { /* Just a new plain segment */
	struct segment** segtab;
	if (seg_ix == tb->nsegs) { /* Time to start use segtab from last call */
	    struct ext_segment* eseg;
	    eseg = (struct ext_segment*) SEGTAB(tb)[seg_ix-1];
	    MY_ASSERT(eseg!=NULL && eseg->s.is_ext_segment);
	    SET_SEGTAB(tb, eseg->segtab);	    
	    tb->nsegs = eseg->nsegs;
	}
	ASSERT(seg_ix < tb->nsegs);
	segtab = SEGTAB(tb);
	ASSERT(segtab[seg_ix] == NULL);
	segtab[seg_ix] = (struct segment*) erts_db_alloc_fnf(ERTS_ALC_T_DB_SEG,
							     (DbTable *) tb,
							     sizeof(struct segment));
	if (segtab[seg_ix] == NULL) return 0;
	sys_memset(segtab[seg_ix], 0, sizeof(struct segment));
    }
    tb->nslots += SEGSZ;
    return 1;
}

/* Shrink table by freeing the top segment
** free_records: 1=free any records in segment, 0=assume segment is empty 
*/
static int free_seg(DbTableHash *tb, int free_records)
{
    int seg_ix = (tb->nslots >> SEGSZ_EXP) - 1;
    int bytes;
    struct segment** segtab = SEGTAB(tb);
    struct ext_segment* top = (struct ext_segment*) segtab[seg_ix];
    int nrecords = 0;

    ASSERT(top != NULL); 
#ifndef DEBUG
    if (free_records)
#endif
    {	
	int i;
	for (i=0; i<SEGSZ; ++i) {
	    HashDbTerm* p = top->s.buckets[i];	    
	    while(p != 0) {		
		HashDbTerm* nxt = p->next;
		ASSERT(free_records); /* segment not empty as assumed? */
		free_term(tb, p);
		p = nxt;
		++nrecords;
	    }
	}
    }

    /* The "shrink race":
     * We must avoid deallocating an extended segment while its segtab may
     * still be used by other threads.
     * The trick is to stop use a segtab one call earlier. That is, stop use
     * a segtab when the segment above it is deallocated. When the segtab is
     * later deallocated, it has not been used for a very long time.
     * It is even theoretically safe as we have by then rehashed the entire
     * segment, seizing *all* locks, so there cannot exist any retarded threads
     * still hanging in BUCKET macro with an old segtab pointer.
     * For this to work, we must of course allocate a new segtab one call
     * earlier in alloc_seg() as well. And this is also the reason why
     * the minimum size of the first segtab is 2 and not 1 (NSEG_1).
     */
    
    if (seg_ix == tb->nsegs-1 || seg_ix==0) { /* Dealloc extended segment */
	MY_ASSERT(top->s.is_ext_segment);   
    	ASSERT(segtab != top->segtab || seg_ix==0);    
	bytes = SIZEOF_EXTSEG(top->nsegs);
    }
    else { /* Dealloc plain segment */
	struct ext_segment* newtop = (struct ext_segment*) segtab[seg_ix-1];
	MY_ASSERT(!top->s.is_ext_segment);
	
	if (segtab == newtop->segtab) { /* New top segment is extended */
	    MY_ASSERT(newtop->s.is_ext_segment);
	    if (newtop->prev_segtab != NULL) {
		/* Time to use a smaller segtab */
		SET_SEGTAB(tb, newtop->prev_segtab);
		tb->nsegs = seg_ix;
		ASSERT(tb->nsegs == EXTSEG(SEGTAB(tb))->nsegs);
	    }
	    else {
		ASSERT(NSEG_1 > 2 && seg_ix==1);
	    }
	}
	bytes = sizeof(struct segment);
    }
    
    erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb,
		 (void*)top, bytes);
#ifdef DEBUG
    if (seg_ix > 0) {
	if (seg_ix < tb->nsegs) SEGTAB(tb)[seg_ix] = NULL;
    } else {
	SET_SEGTAB(tb, NULL);
    }
#endif
    tb->nslots -= SEGSZ;
    ASSERT(tb->nslots >= 0);
    return nrecords;
}


/*
** Copy terms from ptr1 until ptr2
** works for ptr1 == ptr2 == 0  => []
** or ptr2 == 0
*/
static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2,
			   DbTableHash* tb)
{
    int sz = 0;
    HashDbTerm* ptr;
    Eterm list = NIL;
    Eterm copy;
    Eterm *hp, *hend;

    ptr = ptr1;
    while(ptr != ptr2) {

	if (ptr->hvalue != INVALID_HASH)
	    sz += ptr->dbterm.size + 2;

	ptr = ptr->next;
    }

    hp = HAlloc(p, sz);
    hend = hp + sz;

    ptr = ptr1;
    while(ptr != ptr2) {
	if (ptr->hvalue != INVALID_HASH) {
	    copy = db_copy_object_from_ets(&tb->common, &ptr->dbterm, &hp, &MSO(p));
	    list = CONS(hp, copy, list);
	    hp  += 2;
	}
	ptr = ptr->next;
    }
    HRelease(p,hend,hp);

    return list;
}

/* Grow table with one new bucket.
** Allocate new segment if needed.
*/
static void grow(DbTableHash* tb, int nactive)
{
    HashDbTerm** pnext;
    HashDbTerm** to_pnext;
    HashDbTerm* p;
    erts_smp_rwmtx_t* lck;
    int from_ix;
    int szm;

    if (erts_smp_atomic_xchg_acqb(&tb->is_resizing, 1)) { 
	return; /* already in progress */
    }
    if (NACTIVE(tb) != nactive) {
	goto abort; /* already done (race) */
    }

    /* Ensure that the slot nactive exists */
    if (nactive == tb->nslots) {
	/* Time to get a new segment */    
	ASSERT((nactive & SEGSZ_MASK) == 0);
	if (!alloc_seg(tb)) goto abort;	    
    }
    ASSERT(nactive < tb->nslots);

    szm = erts_smp_atomic_read_nob(&tb->szm);
    if (nactive <= szm) {
	from_ix = nactive & (szm >> 1);
    } else {
	ASSERT(nactive == szm+1);
	from_ix = 0;
	szm = (szm<<1) | 1;
    }

    lck = WLOCK_HASH(tb, from_ix);
    /* Now a final double check (with the from_ix lock held)
     * that we did not get raced by a table fixer.
     */
    if (IS_FIXED(tb)) {
	WUNLOCK_HASH(lck);
	goto abort;
    }
    erts_smp_atomic_inc_nob(&tb->nactive);
    if (from_ix == 0) {
	erts_smp_atomic_set_relb(&tb->szm, szm);
    }
    erts_smp_atomic_set_relb(&tb->is_resizing, 0);

    /* Finally, let's split the bucket. We try to do it in a smart way
       to keep link order and avoid unnecessary updates of next-pointers */
    pnext = &BUCKET(tb, from_ix);
    p = *pnext;
    to_pnext = &BUCKET(tb, nactive);
    while (p != NULL) {
	if (p->hvalue == INVALID_HASH) { /* rare but possible with fine locking */
	    *pnext = p->next;
	    free_term(tb, p);
	    p = *pnext;
	}
	else {
	    int ix = p->hvalue & szm;
	    if (ix != from_ix) {
		ASSERT(ix == (from_ix ^ ((szm+1)>>1)));
		*to_pnext = p;
		/* Swap "from" and "to": */
		from_ix = ix;
		to_pnext = pnext;
	    }
	    pnext = &p->next;
	    p = *pnext;		
	}
    }
    *to_pnext = NULL;

    WUNLOCK_HASH(lck);
    return;
   
abort:
    erts_smp_atomic_set_relb(&tb->is_resizing, 0);
}


/* Shrink table by joining top bucket.
** Remove top segment if it gets empty.
*/
static void shrink(DbTableHash* tb, int nactive)
{     
    if (erts_smp_atomic_xchg_acqb(&tb->is_resizing, 1)) {
	return; /* already in progress */
    }
    if (NACTIVE(tb) == nactive) {
	erts_smp_rwmtx_t* lck;
	int src_ix = nactive - 1;
	int low_szm = erts_smp_atomic_read_nob(&tb->szm) >> 1;
	int dst_ix = src_ix & low_szm;

	ASSERT(dst_ix < src_ix);
	ASSERT(nactive > SEGSZ);
	lck = WLOCK_HASH(tb, dst_ix);
	/* Double check for racing table fixers */
	if (!IS_FIXED(tb)) {
	    HashDbTerm** src_bp = &BUCKET(tb, src_ix);
	    HashDbTerm** dst_bp = &BUCKET(tb, dst_ix);
	    HashDbTerm** bp = src_bp;

	    /* Q: Why join lists by appending "dst" at the end of "src"?
	       A: Must step through "src" anyway to purge pseudo deleted. */
	    while(*bp != NULL) {
		if ((*bp)->hvalue == INVALID_HASH) {
		    HashDbTerm* deleted = *bp;
		    *bp = deleted->next;
		    free_term(tb, deleted);
		} else {
		    bp = &(*bp)->next;
		}
	    }
	    *bp = *dst_bp;
	    *dst_bp = *src_bp;
	    *src_bp = NULL;
	    
	    erts_smp_atomic_set_nob(&tb->nactive, src_ix);
	    if (dst_ix == 0) {
		erts_smp_atomic_set_relb(&tb->szm, low_szm);
	    }
	    WUNLOCK_HASH(lck);
	    
	    if (tb->nslots - src_ix >= SEGSZ) {
		free_seg(tb, 0);
	    }
	}
	else {
	    WUNLOCK_HASH(lck);
	}

    }
    /*else already done */
    erts_smp_atomic_set_relb(&tb->is_resizing, 0);
}


/* Search a list of tuples for a matching key */

static HashDbTerm* search_list(DbTableHash* tb, Eterm key, 
			       HashValue hval, HashDbTerm *list)
{
    while (list != 0) {
	if (has_live_key(tb,list,key,hval))
	    return list;
	list = list->next;
    }
    return 0;
}


/* This function is called by the next AND the select BIF */
/* It return the next live object in a table, NULL if no more */
/* In-bucket: RLOCKED */
/* Out-bucket: RLOCKED unless NULL */
static HashDbTerm* next(DbTableHash *tb, Uint *iptr, erts_smp_rwmtx_t** lck_ptr,
			HashDbTerm *list)
{
    int i;

    ERTS_SMP_LC_ASSERT(IS_HASH_RLOCKED(tb,*iptr));

    for (list = list->next; list != NULL; list = list->next) {
	if (list->hvalue != INVALID_HASH)
	    return list;        
    }

    i = *iptr;
    while ((i=next_slot(tb, i, lck_ptr)) != 0) {

	list = BUCKET(tb,i);
	while (list != NULL) {
	    if (list->hvalue != INVALID_HASH) {
		*iptr = i;
		return list;        
	    }
	    list = list->next;
	}
    }
    /* *iptr = ??? */
    return NULL;
}

static int db_lookup_dbterm_hash(DbTable *tbl, Eterm key, DbUpdateHandle* handle)
{
    DbTableHash *tb = &tbl->hash;
    HashDbTerm* b;
    HashDbTerm** prevp;
    int ix;
    HashValue hval;
    erts_smp_rwmtx_t* lck;

    hval = MAKE_HASH(key);
    lck = WLOCK_HASH(tb,hval);
    ix = hash_to_ix(tb, hval);
    prevp = &BUCKET(tb, ix);
    b = *prevp;

    while (b != 0) {
	if (has_live_key(tb,b,key,hval)) {
	    handle->tb = tbl;
	    handle->bp = (void**) prevp;
	    handle->dbterm = &b->dbterm;
	    handle->mustResize = 0;
	    handle->new_size = b->dbterm.size;
	#if HALFWORD_HEAP
	    handle->abs_vec = NULL;
	#endif
	    handle->lck = lck;
	    /* KEEP hval WLOCKED, db_finalize_dbterm_hash will WUNLOCK */
	    return 1;
	}
	prevp = &b->next;
	b = *prevp;
    }
    WUNLOCK_HASH(lck);
    return 0;
}

/* Must be called after call to db_lookup_dbterm
*/
static void db_finalize_dbterm_hash(DbUpdateHandle* handle)
{
    DbTable* tbl = handle->tb;
    HashDbTerm* oldp = (HashDbTerm*) *(handle->bp);
    erts_smp_rwmtx_t* lck = (erts_smp_rwmtx_t*) handle->lck;

    ERTS_SMP_LC_ASSERT(IS_HASH_WLOCKED(&tbl->hash,lck));  /* locked by db_lookup_dbterm_hash */

    ASSERT((&oldp->dbterm == handle->dbterm) == !(tbl->common.compress && handle->mustResize));

    if (handle->mustResize) {
	db_finalize_resize(handle, offsetof(HashDbTerm,dbterm));
	WUNLOCK_HASH(lck);

	free_term(&tbl->hash, oldp);
    }
    else {
	WUNLOCK_HASH(lck);
    }
#ifdef DEBUG
    handle->dbterm = 0;
#endif
    return;
}

static int db_delete_all_objects_hash(Process* p, DbTable* tbl)
{
    if (IS_FIXED(tbl)) {
	db_mark_all_deleted_hash(tbl);
    } else {
	db_free_table_hash(tbl);
	db_create_hash(p, tbl);
	erts_smp_atomic_set_nob(&tbl->hash.common.nitems, 0);
    }
    return 0;
}

void db_foreach_offheap_hash(DbTable *tbl,
			     void (*func)(ErlOffHeap *, void *),
			     void * arg)
{
    DbTableHash *tb = &tbl->hash;
    HashDbTerm* list;
    int i;
    int nactive = NACTIVE(tb);
    
    for (i = 0; i < nactive; i++) {
	list = BUCKET(tb,i);
	while(list != 0) {
	    ErlOffHeap tmp_offheap;
	    tmp_offheap.first = list->dbterm.first_oh;
	    tmp_offheap.overhead = 0;
	    (*func)(&tmp_offheap, arg);
	    list->dbterm.first_oh = tmp_offheap.first;
	    list = list->next;
	}
    }
}

void db_calc_stats_hash(DbTableHash* tb, DbHashStats* stats)
{
    HashDbTerm* b;
    erts_smp_rwmtx_t* lck;
    int sum = 0;
    int sq_sum = 0;
    int ix;
    int len;
    
    stats->min_chain_len = INT_MAX;
    stats->max_chain_len = 0;
    ix = 0;
    lck = RLOCK_HASH(tb,ix);
    do {
	len = 0;
	for (b = BUCKET(tb,ix); b!=NULL; b=b->next) {
	    len++;
	}
	sum += len;
	sq_sum += len*len;
	if (len < stats->min_chain_len) stats->min_chain_len = len;
	if (len > stats->max_chain_len) stats->max_chain_len = len;
	ix = next_slot(tb,ix,&lck);
    }while (ix);
    stats->avg_chain_len = (float)sum / NACTIVE(tb);	
    stats->std_dev_chain_len = sqrt((sq_sum - stats->avg_chain_len*sum) / NACTIVE(tb));
    /* Expected	standard deviation from a good uniform hash function, 
       ie binomial distribution (not taking the linear hashing into acount) */
    stats->std_dev_expected = sqrt(stats->avg_chain_len * (1 - 1.0/NACTIVE(tb)));	
}
#ifdef HARDDEBUG

void db_check_table_hash(DbTable *tbl)
{
    DbTableHash *tb = &tbl->hash;
    HashDbTerm* list;
    int j;
    
    for (j = 0; j < tb->nactive; j++) {
	if ((list = BUCKET(tb,j)) != 0) {
	    while (list != 0) {
		if (!is_tuple(make_tuple(list->dbterm.tpl))) {
		    erl_exit(1, "Bad term in slot %d of ets table", j);
		}
		list = list->next;
	    }
	}
    }
}

#endif