/*
 * %CopyrightBegin%
 *
 * Copyright Ericsson AB 1998-2013. All Rights Reserved.
 *
 * The contents of this file are subject to the Erlang Public License,
 * Version 1.1, (the "License"); you may not use this file except in
 * compliance with the License. You should have received a copy of the
 * Erlang Public License along with this software. If not, it can be
 * retrieved online at http://www.erlang.org/.
 *
 * Software distributed under the License is distributed on an "AS IS"
 * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
 * the License for the specific language governing rights and limitations
 * under the License.
 *
 * %CopyrightEnd%
 */

/*
** Implementation of ordered ETS tables.
** The tables are implemented as AVL trees (Published by Adelson-Velski 
** and Landis). A nice source for learning about these trees is
** Wirth's Algorithms + Datastructures = Programs.
** The implementation here is however not made with recursion
** as the examples in Wirths book are.
*/

/*
#ifdef DEBUG
#define HARDDEBUG 1
#endif
*/
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "sys.h"
#include "erl_vm.h"
#include "global.h"
#include "erl_process.h"
#include "error.h"
#define ERTS_WANT_DB_INTERNAL__
#include "erl_db.h"
#include "bif.h"
#include "big.h"
#include "erl_binary.h"

#include "erl_db_tree.h"

#define GETKEY_WITH_POS(Keypos, Tplp) (*((Tplp) + Keypos))
#define NITEMS(tb) ((int)erts_smp_atomic_read_nob(&(tb)->common.nitems))

/*
** A stack of this size is enough for an AVL tree with more than
** 0xFFFFFFFF elements. May be subject to change if
** the datatype of the element counter is changed to a 64 bit integer.
** The Maximal height of an AVL tree is calculated as:
** h(n) <= 1.4404 * log(n + 2) - 0.328
** Where n denotes the number of nodes, h(n) the height of the tree
** with n nodes and log is the binary logarithm.
*/

#define STACK_NEED 50
#define TREE_MAX_ELEMENTS 0xFFFFFFFFUL

#define PUSH_NODE(Dtt, Tdt)                     \
    ((Dtt)->array[(Dtt)->pos++] = Tdt)

#define POP_NODE(Dtt)			\
     (((Dtt)->pos) ? 			\
      (Dtt)->array[--((Dtt)->pos)] : NULL)

#define TOP_NODE(Dtt)                   \
     ((Dtt->pos) ? 			\
      (Dtt)->array[(Dtt)->pos - 1] : NULL)

#define EMPTY_NODE(Dtt) (TOP_NODE(Dtt) == NULL)



/* Obtain table static stack if available. NULL if not.
** Must be released with release_stack()
*/
static DbTreeStack* get_static_stack(DbTableTree* tb)
{
    if (!erts_smp_atomic_xchg_acqb(&tb->is_stack_busy, 1)) {
	return &tb->static_stack;
    }
    return NULL;
}

/* Obtain static stack if available, otherwise empty dynamic stack.
** Must be released with release_stack()
*/
static DbTreeStack* get_any_stack(DbTableTree* tb)
{
    DbTreeStack* stack;
    if (!erts_smp_atomic_xchg_acqb(&tb->is_stack_busy, 1)) {
	return &tb->static_stack;
    }
    stack = erts_db_alloc(ERTS_ALC_T_DB_STK, (DbTable *) tb,
			  sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
    stack->pos = 0;
    stack->slot = 0;
    stack->array = (TreeDbTerm**) (stack + 1);
    return stack;
}

static void release_stack(DbTableTree* tb, DbTreeStack* stack)
{
    if (stack == &tb->static_stack) {
	ASSERT(erts_smp_atomic_read_nob(&tb->is_stack_busy) == 1);
	erts_smp_atomic_set_relb(&tb->is_stack_busy, 0);
    }
    else {
	erts_db_free(ERTS_ALC_T_DB_STK, (DbTable *) tb,
		     (void *) stack, sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
    }
}

static ERTS_INLINE void reset_static_stack(DbTableTree* tb)
{
    tb->static_stack.pos = 0;
    tb->static_stack.slot = 0;
}

static ERTS_INLINE void free_term(DbTableTree *tb, TreeDbTerm* p)
{
    db_free_term((DbTable*)tb, p, offsetof(TreeDbTerm, dbterm));
}

static ERTS_INLINE TreeDbTerm* new_dbterm(DbTableTree *tb, Eterm obj)
{
    TreeDbTerm* p;
    if (tb->common.compress) {
	p = db_store_term_comp(&tb->common, NULL, offsetof(TreeDbTerm,dbterm), obj);
    }
    else {
	p = db_store_term(&tb->common, NULL, offsetof(TreeDbTerm,dbterm), obj);
    }
    return p;
}
static ERTS_INLINE TreeDbTerm* replace_dbterm(DbTableTree *tb, TreeDbTerm* old,
					      Eterm obj)
{
    TreeDbTerm* p;
    ASSERT(old != NULL);
    if (tb->common.compress) {
	p = db_store_term_comp(&tb->common, &(old->dbterm), offsetof(TreeDbTerm,dbterm), obj);
    }
    else {
	p = db_store_term(&tb->common, &(old->dbterm), offsetof(TreeDbTerm,dbterm), obj);
    }
    return p;
}

/*
** Some macros for "direction stacks"
*/
#define DIR_LEFT 0
#define DIR_RIGHT 1
#define DIR_END 2 

/*
 * Special binary flag
 */
#define BIN_FLAG_ALL_OBJECTS         BIN_FLAG_USR1

/*
 * Number of records to delete before trapping.
 */
#define DELETE_RECORD_LIMIT 12000

/* 
** Debugging
*/
#ifdef HARDDEBUG
static TreeDbTerm *traverse_until(TreeDbTerm *t, int *current, int to);
static void check_slot_pos(DbTableTree *tb);
static void check_saved_stack(DbTableTree *tb);
static int check_table_tree(DbTableTree* tb, TreeDbTerm *t);

#define TREE_DEBUG
#endif

#ifdef TREE_DEBUG
/*
** Primitive trace macro
*/
#define DBG erts_fprintf(stderr,"%d\n",__LINE__)

/*
** Debugging dump
*/

static void do_dump_tree2(DbTableTree*, int to, void *to_arg, int show,
			  TreeDbTerm *t, int offset);

#else

#define DBG /* nothing */

#endif

/*
** Datatypes
*/

/* 
 * This structure is filled in by analyze_pattern() for the select 
 * functions.
 */
struct mp_info {
    int all_objects;		/* True if complete objects are always
				 * returned from the match_spec (can use 
				 * copy_shallow on the return value) */
    int something_can_match;	/* The match_spec is not "impossible" */
    int some_limitation;	/* There is some limitation on the search
				 * area, i. e. least and/or most is set.*/
    int got_partial;		/* The limitation has a partially bound
				 * key */
    Eterm least;		/* The lowest matching key (possibly 
				 * partially bound expression) */
    Eterm most;                 /* The highest matching key (possibly 
				 * partially bound expression) */

    TreeDbTerm *save_term;      /* If the key is completely bound, this
				 * will be the Tree node we're searching
				 * for, otherwise it will be useless */
    Binary *mp;                 /* The compiled match program */
};

/*
 * Used by doit_select(_chunk)
 */
struct select_context {
    Process *p;
    Eterm accum;
    Binary *mp;
    Eterm end_condition;
    Eterm *lastobj;
    Sint32 max;
    int keypos;
    int all_objects;
    Sint got;
    Sint chunk_size;
};

/*
 * Used by doit_select_count
 */
struct select_count_context {
    Process *p;
    Binary *mp;
    Eterm end_condition;
    Eterm *lastobj;
    Sint32 max;
    int keypos;
    int all_objects;
    Sint got;
};

/*
 * Used by doit_select_delete
 */
struct select_delete_context {
    Process *p;
    DbTableTree *tb;
    Uint accum;
    Binary *mp;
    Eterm end_condition;
    int erase_lastterm;
    TreeDbTerm *lastterm;
    Sint32 max;
    int keypos;
};

/*
** Forward declarations 
*/
static TreeDbTerm *linkout_tree(DbTableTree *tb, Eterm key, Eterm* key_base);
static TreeDbTerm *linkout_object_tree(DbTableTree *tb, 
				       Eterm object);
static int do_free_tree_cont(DbTableTree *tb, int num_left);
static void free_term(DbTableTree *tb, TreeDbTerm* p);
static int balance_left(TreeDbTerm **this); 
static int balance_right(TreeDbTerm **this); 
static int delsub(TreeDbTerm **this); 
static TreeDbTerm *slot_search(Process *p, DbTableTree *tb, Sint slot);
static TreeDbTerm *find_node(DbTableTree *tb, Eterm key);
static TreeDbTerm **find_node2(DbTableTree *tb, Eterm key);
static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack*, Eterm key, Eterm* kbase);
static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack*, Eterm key, Eterm* kbase);
static TreeDbTerm *find_next_from_pb_key(DbTableTree *tb, DbTreeStack*,
					 Eterm key);
static TreeDbTerm *find_prev_from_pb_key(DbTableTree *tb, DbTreeStack*,
					 Eterm key);
static void traverse_backwards(DbTableTree *tb,
			       DbTreeStack*,
			       Eterm lastkey, Eterm* lk_base,
			       int (*doit)(DbTableTree *tb,
					   TreeDbTerm *,
					   void *,
					   int),
			       void *context); 
static void traverse_forward(DbTableTree *tb,
			     DbTreeStack*,
			     Eterm lastkey, Eterm* lk_base,
			     int (*doit)(DbTableTree *tb,
					 TreeDbTerm *,
					 void *,
					 int),
			     void *context); 
static int key_given(DbTableTree *tb, Eterm pattern, TreeDbTerm **ret,
		     Eterm *partly_bound_key);
static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key, Eterm* bk_base);
static Sint do_cmp_partly_bound(Eterm a, Eterm b, Eterm* b_base, int *done);

static int analyze_pattern(DbTableTree *tb, Eterm pattern, 
			   struct mp_info *mpi);
static int doit_select(DbTableTree *tb,
		       TreeDbTerm *this,
		       void *ptr,
		       int forward);
static int doit_select_count(DbTableTree *tb,
			     TreeDbTerm *this,
			     void *ptr,
			     int forward);
static int doit_select_chunk(DbTableTree *tb,
			     TreeDbTerm *this,
			     void *ptr,
			     int forward);
static int doit_select_delete(DbTableTree *tb,
			      TreeDbTerm *this,
			      void *ptr,
			      int forward);

static int partly_bound_can_match_lesser(Eterm partly_bound_1, 
					 Eterm partly_bound_2);
static int partly_bound_can_match_greater(Eterm partly_bound_1, 
					  Eterm partly_bound_2); 
static int do_partly_bound_can_match_lesser(Eterm a, Eterm b, 
					    int *done);
static int do_partly_bound_can_match_greater(Eterm a, Eterm b, 
					     int *done);
static BIF_RETTYPE ets_select_reverse(BIF_ALIST_3);


/* Method interface functions */
static int db_first_tree(Process *p, DbTable *tbl, 
		  Eterm *ret);
static int db_next_tree(Process *p, DbTable *tbl, 
			Eterm key, Eterm *ret);
static int db_last_tree(Process *p, DbTable *tbl, 
			Eterm *ret);
static int db_prev_tree(Process *p, DbTable *tbl, 
			Eterm key,
			Eterm *ret);
static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail);
static int db_get_tree(Process *p, DbTable *tbl, 
		       Eterm key,  Eterm *ret);
static int db_member_tree(DbTable *tbl, Eterm key, Eterm *ret);
static int db_get_element_tree(Process *p, DbTable *tbl, 
			       Eterm key,int ndex,
			       Eterm *ret);
static int db_erase_tree(DbTable *tbl, Eterm key, Eterm *ret);
static int db_erase_object_tree(DbTable *tbl, Eterm object,Eterm *ret);
static int db_slot_tree(Process *p, DbTable *tbl, 
			Eterm slot_term,  Eterm *ret);
static int db_select_tree(Process *p, DbTable *tbl, 
			  Eterm pattern, int reversed, Eterm *ret);
static int db_select_count_tree(Process *p, DbTable *tbl, 
				Eterm pattern,  Eterm *ret);
static int db_select_chunk_tree(Process *p, DbTable *tbl, 
				Eterm pattern, Sint chunk_size,
				int reversed, Eterm *ret);
static int db_select_continue_tree(Process *p, DbTable *tbl,
				   Eterm continuation, Eterm *ret);
static int db_select_count_continue_tree(Process *p, DbTable *tbl,
					 Eterm continuation, Eterm *ret);
static int db_select_delete_tree(Process *p, DbTable *tbl, 
				 Eterm pattern,  Eterm *ret);
static int db_select_delete_continue_tree(Process *p, DbTable *tbl, 
					  Eterm continuation, Eterm *ret);
static int db_take_tree(Process *, DbTable *, Eterm, Eterm *);
static void db_print_tree(int to, void *to_arg,
			  int show, DbTable *tbl);
static int db_free_table_tree(DbTable *tbl);

static int db_free_table_continue_tree(DbTable *tbl);

static void db_foreach_offheap_tree(DbTable *,
				    void (*)(ErlOffHeap *, void *),
				    void *);

static int db_delete_all_objects_tree(Process* p, DbTable* tbl);

#ifdef HARDDEBUG
static void db_check_table_tree(DbTable *tbl);
#endif
static int db_lookup_dbterm_tree(DbTable *, Eterm key, DbUpdateHandle*);
static void db_finalize_dbterm_tree(DbUpdateHandle*);

/*
** Static variables
*/

Export ets_select_reverse_exp;

/*
** External interface 
*/
DbTableMethod db_tree =
{
    db_create_tree,
    db_first_tree,
    db_next_tree,
    db_last_tree,
    db_prev_tree,
    db_put_tree,
    db_get_tree,
    db_get_element_tree,
    db_member_tree,
    db_erase_tree,
    db_erase_object_tree,
    db_slot_tree,
    db_select_chunk_tree,
    db_select_tree, /* why not chunk size=0 ??? */
    db_select_delete_tree,
    db_select_continue_tree,
    db_select_delete_continue_tree,
    db_select_count_tree,
    db_select_count_continue_tree,
    db_take_tree,
    db_delete_all_objects_tree,
    db_free_table_tree,
    db_free_table_continue_tree,
    db_print_tree,
    db_foreach_offheap_tree,
#ifdef HARDDEBUG
    db_check_table_tree,
#else
    NULL,
#endif
    db_lookup_dbterm_tree,
    db_finalize_dbterm_tree

};





void db_initialize_tree(void)
{
    erts_init_trap_export(&ets_select_reverse_exp, am_ets, am_reverse, 3,
			  &ets_select_reverse);
    return;
};

/*
** Table interface routines ie what's called by the bif's 
*/

int db_create_tree(Process *p, DbTable *tbl)
{
    DbTableTree *tb = &tbl->tree;
    tb->root = NULL;
    tb->static_stack.array = erts_db_alloc(ERTS_ALC_T_DB_STK,
					   (DbTable *) tb,
					   sizeof(TreeDbTerm *) * STACK_NEED);
    tb->static_stack.pos = 0;
    tb->static_stack.slot = 0;
    erts_smp_atomic_init_nob(&tb->is_stack_busy, 0);
    tb->deletion = 0;
    return DB_ERROR_NONE;
}

static int db_first_tree(Process *p, DbTable *tbl, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    DbTreeStack* stack;
    TreeDbTerm *this;

    if (( this = tb->root ) == NULL) {
	*ret = am_EOT;
	return DB_ERROR_NONE;
    }
    /* Walk down the tree to the left */
    if ((stack = get_static_stack(tb)) != NULL) {
	stack->pos = stack->slot = 0;
    }
    while (this->left != NULL) {
	if (stack) PUSH_NODE(stack, this);
	this = this->left;
    }
    if (stack) {
	PUSH_NODE(stack, this);
	stack->slot = 1;
	release_stack(tb,stack);
    }
    *ret = db_copy_key(p, tbl, &this->dbterm);
    return DB_ERROR_NONE;
}

static int db_next_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    DbTreeStack* stack;
    TreeDbTerm *this;

    if (is_atom(key) && key == am_EOT)
	return DB_ERROR_BADKEY;
    stack = get_any_stack(tb);
    this = find_next(tb, stack, key, NULL);
    release_stack(tb,stack);
    if (this == NULL) {
	*ret = am_EOT;
	return DB_ERROR_NONE;
    }
    *ret = db_copy_key(p, tbl, &this->dbterm);
    return DB_ERROR_NONE;
}

static int db_last_tree(Process *p, DbTable *tbl, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    TreeDbTerm *this;
    DbTreeStack* stack;

    if (( this = tb->root ) == NULL) {
	*ret = am_EOT;
	return DB_ERROR_NONE;
    }
    /* Walk down the tree to the right */
    if ((stack = get_static_stack(tb)) != NULL) {
	stack->pos = stack->slot = 0;
    }    
    while (this->right != NULL) {
	if (stack) PUSH_NODE(stack, this);
	this = this->right;
    }
    if (stack) {
	PUSH_NODE(stack, this);
	stack->slot = NITEMS(tb);
	release_stack(tb,stack);
    }
    *ret = db_copy_key(p, tbl, &this->dbterm);
    return DB_ERROR_NONE;
}

static int db_prev_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    TreeDbTerm *this;
    DbTreeStack* stack;

    if (is_atom(key) && key == am_EOT)
	return DB_ERROR_BADKEY;
    stack = get_any_stack(tb);
    this = find_prev(tb, stack, key, NULL);
    release_stack(tb,stack);
    if (this == NULL) {
	*ret = am_EOT;
	return DB_ERROR_NONE;
    }
    *ret = db_copy_key(p, tbl, &this->dbterm);
    return DB_ERROR_NONE;
}

static ERTS_INLINE Sint cmp_key(DbTableTree* tb, Eterm key, Eterm* key_base,
			       TreeDbTerm* obj)
{
    return cmp_rel(key, key_base,
		   GETKEY(tb,obj->dbterm.tpl), obj->dbterm.tpl);
}

static ERTS_INLINE int cmp_key_eq(DbTableTree* tb, Eterm key, Eterm* key_base,
				  TreeDbTerm* obj)
{
    Eterm obj_key = GETKEY(tb,obj->dbterm.tpl);
    return is_same(key, key_base, obj_key, obj->dbterm.tpl)
	|| cmp_rel(key, key_base, obj_key, obj->dbterm.tpl) == 0;
}

static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail)
{
    DbTableTree *tb = &tbl->tree;
    /* Non recursive insertion in AVL tree, building our own stack */
    TreeDbTerm **tstack[STACK_NEED];
    int tpos = 0;
    int dstack[STACK_NEED+1];
    int dpos = 0;
    int state = 0;
    TreeDbTerm **this = &tb->root;
    Sint c;
    Eterm key;
    int dir;
    TreeDbTerm *p1, *p2, *p;

    key = GETKEY(tb, tuple_val(obj));

    reset_static_stack(tb);

    dstack[dpos++] = DIR_END;
    for (;;)
	if (!*this) { /* Found our place */
	    state = 1;
	    if (erts_smp_atomic_inc_read_nob(&tb->common.nitems) >= TREE_MAX_ELEMENTS) {
		erts_smp_atomic_dec_nob(&tb->common.nitems);
		return DB_ERROR_SYSRES;
	    }
	    *this = new_dbterm(tb, obj);
	    (*this)->balance = 0;
	    (*this)->left = (*this)->right = NULL;
	    break;
	} else if ((c = cmp_key(tb, key, NULL, *this)) < 0) {
	    /* go lefts */
	    dstack[dpos++] = DIR_LEFT;
	    tstack[tpos++] = this;
	    this = &((*this)->left);
	} else if (c > 0) { /* go right */
	    dstack[dpos++] = DIR_RIGHT;
	    tstack[tpos++] = this;
	    this = &((*this)->right);
	} else if (!key_clash_fail) { /* Equal key and this is a set, replace. */
	    *this = replace_dbterm(tb, *this, obj);
	    break;
	} else {
	    return DB_ERROR_BADKEY; /* key already exists */
	}

    while (state && ( dir = dstack[--dpos] ) != DIR_END) {
	this = tstack[--tpos];
	p = *this;
	if (dir == DIR_LEFT) {
	    switch (p->balance) {
	    case 1:
		p->balance = 0;
		state = 0;
		break;
	    case 0:
		p->balance = -1;
		break;
	    case -1: /* The icky case */
		p1 = p->left;
		if (p1->balance == -1) { /* Single LL rotation */
		    p->left = p1->right;
		    p1->right = p;
		    p->balance = 0;
		    (*this) = p1;
		} else { /* Double RR rotation */
		    p2 = p1->right;
		    p1->right = p2->left;
		    p2->left = p1;
		    p->left = p2->right;
		    p2->right = p;
		    p->balance = (p2->balance == -1) ? +1 : 0;
		    p1->balance = (p2->balance == 1) ? -1 : 0;
		    (*this) = p2;
		}
		(*this)->balance = 0;
		state = 0;
		break;
	    }
	} else { /* dir == DIR_RIGHT */
	    switch (p->balance) {
	    case -1:
		p->balance = 0;
		state = 0;
		break;
	    case 0:
		p->balance = 1;
		break;
	    case 1:
		p1 = p->right;
		if (p1->balance == 1) { /* Single RR rotation */
		    p->right = p1->left;
		    p1->left = p;
		    p->balance = 0;
		    (*this) = p1;
		} else { /* Double RL rotation */
		    p2 = p1->left;
		    p1->left = p2->right;
		    p2->right = p1;
		    p->right = p2->left;
		    p2->left = p;
		    p->balance = (p2->balance == 1) ? -1 : 0;
		    p1->balance = (p2->balance == -1) ? 1 : 0;
		    (*this) = p2;
		}
		(*this)->balance = 0; 
		state = 0;
		break;
	    }
	}
    }
    return DB_ERROR_NONE;
}

static int db_get_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    Eterm copy;
    Eterm *hp, *hend;
    TreeDbTerm *this;

    /*
     * This is always a set, so we know exactly how large
     * the data is when we have found it.
     * The list created around it is purely for interface conformance.
     */
    
    this = find_node(tb,key);
    if (this == NULL) {
	*ret = NIL;
    } else {
	hp = HAlloc(p, this->dbterm.size + 2);
	hend = hp + this->dbterm.size + 2;
	copy = db_copy_object_from_ets(&tb->common, &this->dbterm, &hp, &MSO(p));
	*ret = CONS(hp, copy, NIL);
	hp += 2;
	HRelease(p,hend,hp);
    }
    return DB_ERROR_NONE;
}

static int db_member_tree(DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;

    *ret = (find_node(tb,key) == NULL) ? am_false : am_true;
    return DB_ERROR_NONE;
}

static int db_get_element_tree(Process *p, DbTable *tbl,
			       Eterm key, int ndex, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    /*
     * Look the node up:
     */
    Eterm *hp;
    TreeDbTerm *this;

    /*
     * This is always a set, so we know exactly how large
     * the data is when we have found it.
     * No list is created around elements in set's so there are no list
     * around the element here either.
     */
    
    this = find_node(tb,key);
    if (this == NULL) {
	return DB_ERROR_BADKEY;
    } else {
	if (ndex > arityval(this->dbterm.tpl[0])) {
	    return DB_ERROR_BADPARAM;
	}
	*ret = db_copy_element_from_ets(&tb->common, p, &this->dbterm, ndex, &hp, 0);
    }
    return DB_ERROR_NONE;
}

static int db_erase_tree(DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    TreeDbTerm *res;

    *ret = am_true;

    if ((res = linkout_tree(tb, key, NULL)) != NULL) {
	free_term(tb, res);
    }
    return DB_ERROR_NONE;
}

static int db_erase_object_tree(DbTable *tbl, Eterm object, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    TreeDbTerm *res;

    *ret = am_true;

    if ((res = linkout_object_tree(tb, object)) != NULL) {
	free_term(tb, res);
    }
    return DB_ERROR_NONE;
}


static int db_slot_tree(Process *p, DbTable *tbl, 
			Eterm slot_term, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    Sint slot;
    TreeDbTerm *st;
    Eterm *hp, *hend;
    Eterm copy;

    /*
     * The notion of a "slot" is not natural in a tree, but we try to
     * simulate it by giving the n'th node in the tree instead.
     * Traversing a tree in this way is not very convenient, but by
     * using the saved stack we at least sometimes will get acceptable 
     * performance.
     */

    if (is_not_small(slot_term) ||
	((slot = signed_val(slot_term)) < 0) ||
	(slot > NITEMS(tb)))
	return DB_ERROR_BADPARAM;

    if (slot == NITEMS(tb)) {
	*ret = am_EOT;
	return DB_ERROR_NONE;
    }

    /* 
     * We use the slot position and search from there, slot positions 
     * are counted from 1 and up.
     */
    ++slot;
    st = slot_search(p, tb, slot); 
    if (st == NULL) {
	*ret = am_false;
	return DB_ERROR_UNSPEC;
    }
    hp = HAlloc(p, st->dbterm.size + 2);
    hend = hp + st->dbterm.size + 2;
    copy = db_copy_object_from_ets(&tb->common, &st->dbterm, &hp, &MSO(p));
    *ret = CONS(hp, copy, NIL);
    hp += 2;
    HRelease(p,hend,hp);
    return DB_ERROR_NONE;
}



static BIF_RETTYPE ets_select_reverse(BIF_ALIST_3)
{
    Process *p = BIF_P;
    Eterm a1 = BIF_ARG_1;
    Eterm a2 = BIF_ARG_2;
    Eterm a3 = BIF_ARG_3;
    Eterm list;
    Eterm result;
    Eterm* hp;
    Eterm* hend;

    int max_iter = CONTEXT_REDS * 10;

    if (is_nil(a1)) {
	hp = HAlloc(p, 3);
	BIF_RET(TUPLE2(hp,a2,a3));
    } else if (is_not_list(a1)) {
    error:
	BIF_ERROR(p, BADARG);
    }
    
    list = a1;
    result = a2;
    hp = hend = NULL;
    while (is_list(list)) {
	Eterm* pair = list_val(list);
	if (--max_iter == 0) {
	    BUMP_ALL_REDS(p);
	    HRelease(p, hend, hp);
	    BIF_TRAP3(&ets_select_reverse_exp, p, list, result, a3);
	}
	if (hp == hend) {
	    hp = HAlloc(p, 64);
	    hend = hp + 64;
	}
	result = CONS(hp, CAR(pair), result);
	hp += 2;
	list = CDR(pair);
    }
    if (is_not_nil(list))  {
	goto error;
    }
    HRelease(p, hend, hp);
    BUMP_REDS(p,CONTEXT_REDS - max_iter / 10);
    hp = HAlloc(p,3);
    BIF_RET(TUPLE2(hp, result, a3));
}

static BIF_RETTYPE bif_trap1(Export *bif,
			     Process *p, 
			     Eterm p1) 
{
    BIF_TRAP1(bif, p, p1);
}
    
static BIF_RETTYPE bif_trap3(Export *bif,
			     Process *p, 
			     Eterm p1, 
			     Eterm p2,
			     Eterm p3) 
{
    BIF_TRAP3(bif, p, p1, p2, p3);
}
    
/*
** This is called either when the select bif traps or when ets:select/1 
** is called. It does mostly the same as db_select_tree and may in either case
** trap to itself again (via the ets:select/1 bif).
** Note that this is common for db_select_tree and db_select_chunk_tree.
*/
static int db_select_continue_tree(Process *p, 
				   DbTable *tbl,
				   Eterm continuation,
				   Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    DbTreeStack* stack;
    struct select_context sc;
    unsigned sz;
    Eterm *hp; 
    Eterm lastkey;
    Eterm end_condition; 
    Binary *mp;
    Eterm key;
    Eterm *tptr;
    Sint chunk_size;
    Sint reverse;


#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);

    /* Decode continuation. We know it's a tuple but not the arity or 
       anything else */

    tptr = tuple_val(continuation);

    if (arityval(*tptr) != 8)
	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
    
    if (!is_small(tptr[4]) || !is_binary(tptr[5]) || 
	!(is_list(tptr[6]) || tptr[6] == NIL) || !is_small(tptr[7]) ||
	!is_small(tptr[8]))
	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
    
    lastkey = tptr[2];
    end_condition = tptr[3];
    if (!(thing_subtag(*binary_val(tptr[5])) == REFC_BINARY_SUBTAG))
	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
    mp = ((ProcBin *) binary_val(tptr[5]))->val;
    if (!IsMatchProgBinary(mp))
	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
    chunk_size = signed_val(tptr[4]);

    sc.p = p;
    sc.accum = tptr[6];
    sc.mp = mp;
    sc.end_condition = NIL;
    sc.lastobj = NULL;
    sc.max = 1000;
    sc.keypos = tb->common.keypos;
    sc.all_objects = mp->flags & BIN_FLAG_ALL_OBJECTS;
    sc.chunk_size = chunk_size;
    reverse = unsigned_val(tptr[7]);
    sc.got = signed_val(tptr[8]);

    stack = get_any_stack(tb);
    if (chunk_size) {
	if (reverse) {
	    traverse_backwards(tb, stack, lastkey, NULL, &doit_select_chunk, &sc);
	} else {
	    traverse_forward(tb, stack, lastkey, NULL, &doit_select_chunk, &sc);
	}
    } else {
	if (reverse) {
	    traverse_forward(tb, stack, lastkey, NULL, &doit_select, &sc);
	} else {
	    traverse_backwards(tb, stack, lastkey, NULL, &doit_select, &sc);
	}
    }
    release_stack(tb,stack);

    BUMP_REDS(p, 1000 - sc.max);

    if (sc.max > 0 || (chunk_size && sc.got == chunk_size)) {
	if (chunk_size) {
	    Eterm *hp; 
	    unsigned sz;

	    if (sc.got < chunk_size || sc.lastobj == NULL) { 
		/* end of table, sc.lastobj may be NULL as we may have been
		   at the very last object in the table when trapping. */
		if (!sc.got) {
		    RET_TO_BIF(am_EOT, DB_ERROR_NONE);
		} else {
		    RET_TO_BIF(bif_trap3(&ets_select_reverse_exp, p,
					 sc.accum, NIL, am_EOT), 
			       DB_ERROR_NONE);
		}
	    }

	    key = GETKEY(tb, sc.lastobj);
	    sz = size_object_rel(key,sc.lastobj);
	    hp = HAlloc(p, 9 + sz);
	    key = copy_struct_rel(key, sz, &hp, &MSO(p), sc.lastobj, NULL);
	    continuation = TUPLE8
		(hp,
		 tptr[1],
		 key,
		 tptr[3], 
		 tptr[4],
		 tptr[5],
		 NIL,
		 tptr[7],
		 make_small(0));
	    RET_TO_BIF(bif_trap3(&ets_select_reverse_exp, p,
				 sc.accum, NIL, continuation), 
		       DB_ERROR_NONE);
	} else {
	    RET_TO_BIF(sc.accum, DB_ERROR_NONE);
	}
    }	
    key = GETKEY(tb, sc.lastobj);
    if (chunk_size) {
	if (end_condition != NIL && 
	    ((!reverse && cmp_partly_bound(end_condition,key,sc.lastobj) < 0) ||
	     (reverse && cmp_partly_bound(end_condition,key,sc.lastobj) > 0))) {
	    /* done anyway */
	    if (!sc.got) {
		RET_TO_BIF(am_EOT, DB_ERROR_NONE);
	    } else {
		RET_TO_BIF(bif_trap3(&ets_select_reverse_exp, p, 
				     sc.accum, NIL, am_EOT), 
			   DB_ERROR_NONE);
	    }
	}
    } else {
	if (end_condition != NIL && 
	    ((!reverse && cmp_partly_bound(end_condition,key,sc.lastobj) > 0) ||
	     (reverse && cmp_partly_bound(end_condition,key,sc.lastobj) < 0))) {
	    /* done anyway */
	    RET_TO_BIF(sc.accum,DB_ERROR_NONE);
	}
    }
    /* Not done yet, let's trap. */
    sz = size_object_rel(key,sc.lastobj);
    hp = HAlloc(p, 9 + sz);
    key = copy_struct_rel(key, sz, &hp, &MSO(p), sc.lastobj, NULL);
    continuation = TUPLE8
	(hp,
	 tptr[1],
	 key,
	 tptr[3], 
	 tptr[4],
	 tptr[5],
	 sc.accum,
	 tptr[7],
	 make_small(sc.got));
    RET_TO_BIF(bif_trap1(bif_export[BIF_ets_select_1], p, continuation), 
	       DB_ERROR_NONE);

#undef RET_TO_BIF
}


static int db_select_tree(Process *p, DbTable *tbl, 
			  Eterm pattern, int reverse, Eterm *ret)
{
    /* Strategy: Traverse backwards to build resulting list from tail to head */
    DbTableTree *tb = &tbl->tree;
    DbTreeStack* stack;
    struct select_context sc;
    struct mp_info mpi;
    Eterm lastkey = THE_NON_VALUE;
    Eterm* lk_base = NULL;
    Eterm key;
    Eterm continuation;
    unsigned sz;
    Eterm *hp; 
    TreeDbTerm *this;
    int errcode;
    Eterm mpb;


#define RET_TO_BIF(Term,RetVal) do { 	       	\
	if (mpi.mp != NULL) {			\
	    erts_bin_free(mpi.mp);       	\
	}					\
	*ret = (Term); 				\
	return RetVal; 			        \
    } while(0)

    mpi.mp = NULL;

    sc.accum = NIL;
    sc.lastobj = NULL;
    sc.p = p;
    sc.max = 1000; 
    sc.end_condition = NIL;
    sc.keypos = tb->common.keypos;
    sc.got = 0;
    sc.chunk_size = 0;

    if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) {
	RET_TO_BIF(NIL,errcode);
    }

    if (!mpi.something_can_match) {
	RET_TO_BIF(NIL,DB_ERROR_NONE);  
	/* can't possibly match anything */
    }

    sc.mp = mpi.mp;
    sc.all_objects = mpi.all_objects;

    if (!mpi.got_partial && mpi.some_limitation && 
	CMP(mpi.least,mpi.most) == 0) {
	doit_select(tb,mpi.save_term,&sc,0 /* direction doesn't matter */);
	RET_TO_BIF(sc.accum,DB_ERROR_NONE);
    }

    stack = get_any_stack(tb);
    if (reverse) {
	if (mpi.some_limitation) {
	    if ((this = find_prev_from_pb_key(tb, stack, mpi.least)) != NULL) {
		lastkey = GETKEY(tb, this->dbterm.tpl);
		lk_base = this->dbterm.tpl;
	    }
	    sc.end_condition = mpi.most;
	}
	traverse_forward(tb, stack, lastkey, lk_base, &doit_select, &sc);
    } else {
	if (mpi.some_limitation) {
	    if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) {
		lastkey = GETKEY(tb, this->dbterm.tpl);
		lk_base = this->dbterm.tpl;
	    }
	    sc.end_condition = mpi.least;
	}
	traverse_backwards(tb, stack, lastkey, lk_base, &doit_select, &sc);
    }
    release_stack(tb,stack);
#ifdef HARDDEBUG
	erts_fprintf(stderr,"Least: %T\n", mpi.least);
	erts_fprintf(stderr,"Most: %T\n", mpi.most);
#endif
    BUMP_REDS(p, 1000 - sc.max);
    if (sc.max > 0) {
	RET_TO_BIF(sc.accum,DB_ERROR_NONE);
    }

    key = GETKEY(tb, sc.lastobj);
    sz = size_object_rel(key, sc.lastobj);
    hp = HAlloc(p, 9 + sz + PROC_BIN_SIZE);
    key = copy_struct_rel(key, sz, &hp, &MSO(p), sc.lastobj, NULL);
    if (mpi.all_objects)
	(mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS;
    mpb=db_make_mp_binary(p,mpi.mp,&hp);
	    
    continuation = TUPLE8
	(hp,
	 tb->common.id,
	 key,
	 sc.end_condition, /* From the match program, needn't be copied */
	 make_small(0), /* Chunk size of zero means not chunked to the
			   continuation BIF */
	 mpb,
	 sc.accum,
	 make_small(reverse),
	 make_small(sc.got));

    /* Don't free mpi.mp, so don't use macro */
    *ret = bif_trap1(bif_export[BIF_ets_select_1], p, continuation); 
    return DB_ERROR_NONE;

#undef RET_TO_BIF

}

    
/*
** This is called either when the select_count bif traps.
*/
static int db_select_count_continue_tree(Process *p, 
					 DbTable *tbl,
					 Eterm continuation,
					 Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    DbTreeStack* stack;
    struct select_count_context sc;
    unsigned sz;
    Eterm *hp; 
    Eterm lastkey;
    Eterm end_condition; 
    Binary *mp;
    Eterm key;
    Eterm *tptr;
    Eterm egot;


#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);

    /* Decode continuation. We know it's a tuple and everything else as
     this is only called by ourselves */

    /* continuation: 
       {Table, Lastkey, EndCondition, MatchProgBin, HowManyGot}*/

    tptr = tuple_val(continuation);

    if (arityval(*tptr) != 5)
	erl_exit(1,"Internal error in ets:select_count/1");
    
    lastkey = tptr[2];
    end_condition = tptr[3];
    if (!(thing_subtag(*binary_val(tptr[4])) == REFC_BINARY_SUBTAG))
	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
    mp = ((ProcBin *) binary_val(tptr[4]))->val;
    if (!IsMatchProgBinary(mp))
	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);

    sc.p = p;
    sc.mp = mp;
    sc.end_condition = NIL;
    sc.lastobj = NULL;
    sc.max = 1000;
    sc.keypos = tb->common.keypos;
    if (is_big(tptr[5])) {
	sc.got = big_to_uint32(tptr[5]);
    } else {
	sc.got = unsigned_val(tptr[5]);
    }

    stack = get_any_stack(tb);
    traverse_backwards(tb, stack, lastkey, NULL, &doit_select_count, &sc);
    release_stack(tb,stack);

    BUMP_REDS(p, 1000 - sc.max);

    if (sc.max > 0) {
	RET_TO_BIF(erts_make_integer(sc.got,p), DB_ERROR_NONE);
    }	
    key = GETKEY(tb, sc.lastobj);
    if (end_condition != NIL && 
	(cmp_partly_bound(end_condition,key,sc.lastobj) > 0)) {
	/* done anyway */
	RET_TO_BIF(make_small(sc.got),DB_ERROR_NONE);
    }
    /* Not done yet, let's trap. */
    sz = size_object_rel(key, sc.lastobj);
    if (IS_USMALL(0, sc.got)) {
	hp = HAlloc(p, sz + 6);
	egot = make_small(sc.got);
    }
    else {
	hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + 6);
	egot = uint_to_big(sc.got, hp);
	hp += BIG_UINT_HEAP_SIZE;
    }
    key = copy_struct_rel(key, sz, &hp, &MSO(p), sc.lastobj, NULL);
    continuation = TUPLE5
	(hp,
	 tptr[1],
	 key,
	 tptr[3], 
	 tptr[4],
	 egot);
    RET_TO_BIF(bif_trap1(&ets_select_count_continue_exp, p, continuation), 
	       DB_ERROR_NONE);

#undef RET_TO_BIF
}


static int db_select_count_tree(Process *p, DbTable *tbl, 
				Eterm pattern, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    DbTreeStack* stack;
    struct select_count_context sc;
    struct mp_info mpi;
    Eterm lastkey = THE_NON_VALUE;
    Eterm* lk_base = NULL;
    Eterm key;
    Eterm continuation;
    unsigned sz;
    Eterm *hp; 
    TreeDbTerm *this;
    int errcode;
    Eterm egot;
    Eterm mpb;


#define RET_TO_BIF(Term,RetVal) do { 	       	\
	if (mpi.mp != NULL) {			\
	    erts_bin_free(mpi.mp);       	\
	}					\
	*ret = (Term); 				\
	return RetVal; 			        \
    } while(0)

    mpi.mp = NULL;

    sc.lastobj = NULL;
    sc.p = p;
    sc.max = 1000; 
    sc.end_condition = NIL;
    sc.keypos = tb->common.keypos;
    sc.got = 0;

    if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) {
	RET_TO_BIF(NIL,errcode);
    }

    if (!mpi.something_can_match) {
	RET_TO_BIF(make_small(0),DB_ERROR_NONE);  
	/* can't possibly match anything */
    }

    sc.mp = mpi.mp;
    sc.all_objects = mpi.all_objects;

    if (!mpi.got_partial && mpi.some_limitation && 
	CMP(mpi.least,mpi.most) == 0) {
	doit_select_count(tb,mpi.save_term,&sc,0 /* dummy */);
	RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE);
    }

    stack = get_any_stack(tb);
    if (mpi.some_limitation) {
	if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) {
	    lastkey = GETKEY(tb, this->dbterm.tpl);
	    lk_base = this->dbterm.tpl;
	}
	sc.end_condition = mpi.least;
    }
    
    traverse_backwards(tb, stack, lastkey, lk_base, &doit_select_count, &sc);
    release_stack(tb,stack);
    BUMP_REDS(p, 1000 - sc.max);
    if (sc.max > 0) {
	RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE);
    }

    key = GETKEY(tb, sc.lastobj);
    sz = size_object_rel(key, sc.lastobj);
    if (IS_USMALL(0, sc.got)) {
	hp = HAlloc(p, sz + PROC_BIN_SIZE + 6);
	egot = make_small(sc.got);
    }
    else {
	hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + PROC_BIN_SIZE + 6);
	egot = uint_to_big(sc.got, hp);
	hp += BIG_UINT_HEAP_SIZE;
    }
    key = copy_struct_rel(key, sz, &hp, &MSO(p), sc.lastobj, NULL);
    if (mpi.all_objects)
	(mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS;
    mpb = db_make_mp_binary(p,mpi.mp,&hp);
	    
    continuation = TUPLE5
	(hp,
	 tb->common.id,
	 key,
	 sc.end_condition, /* From the match program, needn't be copied */
	 mpb,
	 egot);

    /* Don't free mpi.mp, so don't use macro */
    *ret = bif_trap1(&ets_select_count_continue_exp, p, continuation); 
    return DB_ERROR_NONE;

#undef RET_TO_BIF

}

static int db_select_chunk_tree(Process *p, DbTable *tbl, 
				Eterm pattern, Sint chunk_size,
				int reverse,
				Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    DbTreeStack* stack;
    struct select_context sc;
    struct mp_info mpi;
    Eterm lastkey = THE_NON_VALUE;
    Eterm* lk_base = NULL;
    Eterm key;
    Eterm continuation;
    unsigned sz;
    Eterm *hp; 
    TreeDbTerm *this;
    int errcode;
    Eterm mpb;


#define RET_TO_BIF(Term,RetVal) do { 		\
	if (mpi.mp != NULL) {			\
	    erts_bin_free(mpi.mp);		\
	}					\
	*ret = (Term); 				\
	return RetVal; 			        \
    } while(0)

    mpi.mp = NULL;

    sc.accum = NIL;
    sc.lastobj = NULL;
    sc.p = p;
    sc.max = 1000; 
    sc.end_condition = NIL;
    sc.keypos = tb->common.keypos;
    sc.got = 0;
    sc.chunk_size = chunk_size;

    if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) {
	RET_TO_BIF(NIL,errcode);
    }

    if (!mpi.something_can_match) {
	RET_TO_BIF(am_EOT,DB_ERROR_NONE);
	/* can't possibly match anything */
    }

    sc.mp = mpi.mp;
    sc.all_objects = mpi.all_objects;

    if (!mpi.got_partial && mpi.some_limitation && 
	CMP(mpi.least,mpi.most) == 0) {
	doit_select(tb,mpi.save_term,&sc, 0 /* direction doesn't matter */);
	if (sc.accum != NIL) {
	    hp=HAlloc(p, 3);
	    RET_TO_BIF(TUPLE2(hp,sc.accum,am_EOT),DB_ERROR_NONE);
	} else {
	    RET_TO_BIF(am_EOT,DB_ERROR_NONE);
	}
    }

    stack = get_any_stack(tb);
    if (reverse) {
	if (mpi.some_limitation) {
	    if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) {
		lastkey = GETKEY(tb, this->dbterm.tpl);
		lk_base = this->dbterm.tpl;
	    }
	    sc.end_condition = mpi.least;
	}
	traverse_backwards(tb, stack, lastkey, lk_base, &doit_select_chunk, &sc);
    } else {
	if (mpi.some_limitation) {
	    if ((this = find_prev_from_pb_key(tb, stack, mpi.least)) != NULL) {
		lastkey = GETKEY(tb, this->dbterm.tpl);
		lk_base = this->dbterm.tpl;
	    }
	    sc.end_condition = mpi.most;
	}
	traverse_forward(tb, stack, lastkey, lk_base, &doit_select_chunk, &sc);
    }
    release_stack(tb,stack);

    BUMP_REDS(p, 1000 - sc.max);
    if (sc.max > 0 || sc.got == chunk_size) {
	Eterm *hp; 
	unsigned sz;

	if (sc.got < chunk_size ||
	    sc.lastobj == NULL) { 
	    /* We haven't got all and we haven't trapped 
	       which should mean we are at the end of the 
	       table, sc.lastobj may be NULL if the table was empty */
	    
	    if (!sc.got) {
		RET_TO_BIF(am_EOT, DB_ERROR_NONE);
	    } else {
		RET_TO_BIF(bif_trap3(&ets_select_reverse_exp, p,
				     sc.accum, NIL, am_EOT), 
			   DB_ERROR_NONE);
	    }
	}

	key = GETKEY(tb, sc.lastobj);
	sz = size_object_rel(key, sc.lastobj);
	hp = HAlloc(p, 9 + sz + PROC_BIN_SIZE);
	key = copy_struct_rel(key, sz, &hp, &MSO(p), sc.lastobj, NULL);
	if (mpi.all_objects)
	    (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS;
	mpb = db_make_mp_binary(p,mpi.mp,&hp);
	
	continuation = TUPLE8
	    (hp,
	     tb->common.id,
	     key,
	     sc.end_condition, /* From the match program, 
				  needn't be copied */
	     make_small(chunk_size),
	     mpb,
	     NIL,
	     make_small(reverse),
	     make_small(0));
	/* Don't let RET_TO_BIF macro free mpi.mp*/
	*ret = bif_trap3(&ets_select_reverse_exp, p,
			 sc.accum, NIL, continuation);
	return DB_ERROR_NONE; 
    }

    key = GETKEY(tb, sc.lastobj);
    sz = size_object_rel(key, sc.lastobj);
    hp = HAlloc(p, 9 + sz + PROC_BIN_SIZE);
    key = copy_struct_rel(key, sz, &hp, &MSO(p), sc.lastobj, NULL);

    if (mpi.all_objects)
	(mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS;
    mpb = db_make_mp_binary(p,mpi.mp,&hp);    
    continuation = TUPLE8
	(hp,
	 tb->common.id,
	 key,
	 sc.end_condition, /* From the match program, needn't be copied */
	 make_small(chunk_size),
	 mpb,
	 sc.accum,
	 make_small(reverse),
	 make_small(sc.got));
    /* Don't let RET_TO_BIF macro free mpi.mp*/
    *ret = bif_trap1(bif_export[BIF_ets_select_1], p, continuation);
    return DB_ERROR_NONE;

#undef RET_TO_BIF

}

/*
** This is called when select_delete traps
*/
static int db_select_delete_continue_tree(Process *p, 
					  DbTable *tbl,
					  Eterm continuation,
					  Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    struct select_delete_context sc;
    unsigned sz;
    Eterm *hp; 
    Eterm lastkey;
    Eterm end_condition; 
    Binary *mp;
    Eterm key;
    Eterm *tptr;
    Eterm eaccsum;


#define RET_TO_BIF(Term, State) do { 		\
	if (sc.erase_lastterm) {		\
	    free_term(tb, sc.lastterm);		\
	}					\
	*ret = (Term); 				\
	return State; 				\
    } while(0);

    /* Decode continuation. We know it's correct, this can only be called
       by trapping */

    tptr = tuple_val(continuation);

    lastkey = tptr[2];
    end_condition = tptr[3];

    sc.erase_lastterm = 0; /* Before first RET_TO_BIF */
    sc.lastterm = NULL;

    mp = ((ProcBin *) binary_val(tptr[4]))->val;
    sc.p = p;
    sc.tb = tb;
    if (is_big(tptr[5])) {
	sc.accum = big_to_uint32(tptr[5]);
    } else {
	sc.accum = unsigned_val(tptr[5]);
    }
    sc.mp = mp;
    sc.end_condition = NIL;
    sc.max = 1000;
    sc.keypos = tb->common.keypos;

    ASSERT(!erts_smp_atomic_read_nob(&tb->is_stack_busy));
    traverse_backwards(tb, &tb->static_stack, lastkey, NULL, &doit_select_delete, &sc);

    BUMP_REDS(p, 1000 - sc.max);

    if (sc.max > 0) {
	RET_TO_BIF(erts_make_integer(sc.accum, p), DB_ERROR_NONE);
    }	
    key = GETKEY(tb, (sc.lastterm)->dbterm.tpl);
    if (end_condition != NIL && 
	cmp_partly_bound(end_condition,key,sc.lastterm->dbterm.tpl) > 0) { /* done anyway */
	RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE);
    }
    /* Not done yet, let's trap. */
    sz = size_object_rel(key, sc.lastterm->dbterm.tpl);
    if (IS_USMALL(0, sc.accum)) {
	hp = HAlloc(p, sz + 6);
	eaccsum = make_small(sc.accum);
    }
    else {
	hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + 6);
	eaccsum = uint_to_big(sc.accum, hp);
	hp += BIG_UINT_HEAP_SIZE;
    }
    key = copy_struct_rel(key, sz, &hp, &MSO(p), sc.lastterm->dbterm.tpl, NULL);
    continuation = TUPLE5
	(hp,
	 tptr[1],
	 key,
	 tptr[3], 
	 tptr[4],
	 eaccsum);
    RET_TO_BIF(bif_trap1(&ets_select_delete_continue_exp, p, continuation), 
	       DB_ERROR_NONE);

#undef RET_TO_BIF
}

static int db_select_delete_tree(Process *p, DbTable *tbl, 
				 Eterm pattern, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    struct select_delete_context sc;
    struct mp_info mpi;
    Eterm lastkey = THE_NON_VALUE;
    Eterm* lk_base = NULL;
    Eterm key;
    Eterm continuation;
    unsigned sz;
    Eterm *hp; 
    TreeDbTerm *this;
    int errcode;
    Eterm mpb;
    Eterm eaccsum;

#define RET_TO_BIF(Term,RetVal) do { 	       	\
	if (mpi.mp != NULL) {			\
	    erts_bin_free(mpi.mp);       	\
	}					\
	if (sc.erase_lastterm) {                \
	    free_term(tb, sc.lastterm);         \
	}                                       \
	*ret = (Term); 				\
	return RetVal; 			        \
    } while(0)

    mpi.mp = NULL;

    sc.accum = 0;
    sc.erase_lastterm = 0;
    sc.lastterm = NULL;
    sc.p = p;
    sc.max = 1000; 
    sc.end_condition = NIL;
    sc.keypos = tb->common.keypos;
    sc.tb = tb;
    
    if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) {
	RET_TO_BIF(0,errcode);
    }

    if (!mpi.something_can_match) {
	RET_TO_BIF(make_small(0),DB_ERROR_NONE);  
	/* can't possibly match anything */
    }

    sc.mp = mpi.mp;

    if (!mpi.got_partial && mpi.some_limitation && 
	CMP(mpi.least,mpi.most) == 0) {
	doit_select_delete(tb,mpi.save_term,&sc, 0 /* direction doesn't 
						      matter */);
	RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE);
    }

    if (mpi.some_limitation) {
	if ((this = find_next_from_pb_key(tb, &tb->static_stack, mpi.most)) != NULL) {
	    lastkey = GETKEY(tb, this->dbterm.tpl);
	    lk_base = this->dbterm.tpl;
	}
	sc.end_condition = mpi.least;
    }

    traverse_backwards(tb, &tb->static_stack, lastkey, lk_base, &doit_select_delete, &sc);
    BUMP_REDS(p, 1000 - sc.max);

    if (sc.max > 0) {
	RET_TO_BIF(erts_make_integer(sc.accum,p), DB_ERROR_NONE);
    }

    key = GETKEY(tb, (sc.lastterm)->dbterm.tpl);
    sz = size_object_rel(key, sc.lastterm->dbterm.tpl);
    if (IS_USMALL(0, sc.accum)) {
	hp = HAlloc(p, sz + PROC_BIN_SIZE + 6);
	eaccsum = make_small(sc.accum);
    }
    else {
	hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + PROC_BIN_SIZE + 6);
	eaccsum = uint_to_big(sc.accum, hp);
	hp += BIG_UINT_HEAP_SIZE;
    }
    key = copy_struct_rel(key, sz, &hp, &MSO(p), sc.lastterm->dbterm.tpl, NULL);
    mpb = db_make_mp_binary(p,mpi.mp,&hp);
    
    continuation = TUPLE5
	(hp,
	 tb->common.id,
	 key,
	 sc.end_condition, /* From the match program, needn't be copied */
	 mpb,
	 eaccsum);

    /* Don't free mpi.mp, so don't use macro */
    if (sc.erase_lastterm) {
	free_term(tb, sc.lastterm);
    }
    *ret = bif_trap1(&ets_select_delete_continue_exp, p, continuation); 
    return DB_ERROR_NONE;

#undef RET_TO_BIF

}

static int db_take_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    TreeDbTerm *this;

    *ret = NIL;
    this = linkout_tree(tb, key, NULL);
    if (this) {
        Eterm copy, *hp, *hend;

        hp = HAlloc(p, this->dbterm.size + 2);
        hend = hp + this->dbterm.size + 2;
        copy = db_copy_object_from_ets(&tb->common,
                                       &this->dbterm, &hp, &MSO(p));
        *ret = CONS(hp, copy, NIL);
        hp += 2;
        HRelease(p, hend, hp);
        free_term(tb, this);
    }
    return DB_ERROR_NONE;
}

/*
** Other interface routines (not directly coupled to one bif)
*/


/* Display tree contents (for dump) */
static void db_print_tree(int to, void *to_arg, 
			  int show,
			  DbTable *tbl)
{
    DbTableTree *tb = &tbl->tree;
#ifdef TREE_DEBUG
    if (show)
	erts_print(to, to_arg, "\nTree data dump:\n"
		   "------------------------------------------------\n");
    do_dump_tree2(&tbl->tree, to, to_arg, show, tb->root, 0);
    if (show)
	erts_print(to, to_arg, "\n"
		   "------------------------------------------------\n");
#else
    erts_print(to, to_arg, "Ordered set (AVL tree), Elements: %d\n", NITEMS(tb));
#endif
}

/* release all memory occupied by a single table */
static int db_free_table_tree(DbTable *tbl)
{
    while (!db_free_table_continue_tree(tbl))
	;
    return 1;
}

static int db_free_table_continue_tree(DbTable *tbl)
{
    DbTableTree *tb = &tbl->tree;
    int result;

    if (!tb->deletion) {
	tb->static_stack.pos = 0;
	tb->deletion = 1;
	PUSH_NODE(&tb->static_stack, tb->root);
    }
    result = do_free_tree_cont(tb, DELETE_RECORD_LIMIT);
    if (result) {		/* Completely done. */
	erts_db_free(ERTS_ALC_T_DB_STK,
		     (DbTable *) tb,
		     (void *) tb->static_stack.array,
		     sizeof(TreeDbTerm *) * STACK_NEED);
	ASSERT(erts_smp_atomic_read_nob(&tb->common.memory_size)
	       == sizeof(DbTable));
    }
    return result;
}

static int db_delete_all_objects_tree(Process* p, DbTable* tbl)
{
    db_free_table_tree(tbl);
    db_create_tree(p, tbl);
    erts_smp_atomic_set_nob(&tbl->tree.common.nitems, 0);
    return 0;
}

static void do_db_tree_foreach_offheap(TreeDbTerm *,
				       void (*)(ErlOffHeap *, void *),
				       void *);

static void db_foreach_offheap_tree(DbTable *tbl,
				    void (*func)(ErlOffHeap *, void *),
				    void * arg)
{
    do_db_tree_foreach_offheap(tbl->tree.root, func, arg);
}


/*
** Functions for internal use
*/


static void
do_db_tree_foreach_offheap(TreeDbTerm *tdbt,
			   void (*func)(ErlOffHeap *, void *),
			   void * arg)
{
    ErlOffHeap tmp_offheap;
    if(!tdbt)
	return;
    do_db_tree_foreach_offheap(tdbt->left, func, arg);
    tmp_offheap.first = tdbt->dbterm.first_oh;
    tmp_offheap.overhead = 0;
    (*func)(&tmp_offheap, arg);
    tdbt->dbterm.first_oh = tmp_offheap.first;
    do_db_tree_foreach_offheap(tdbt->right, func, arg);
}

static TreeDbTerm *linkout_tree(DbTableTree *tb, 
				Eterm key, Eterm* key_base)
{
    TreeDbTerm **tstack[STACK_NEED];
    int tpos = 0;
    int dstack[STACK_NEED+1];
    int dpos = 0;
    int state = 0;
    TreeDbTerm **this = &tb->root;
    Sint c;
    int dir;
    TreeDbTerm *q = NULL;

    /*
     * Somewhat complicated, deletion in an AVL tree,
     * The two helpers balance_left and balance_right are used to
     * keep the balance. As in insert, we do the stacking ourselves.
     */

    reset_static_stack(tb);
    dstack[dpos++] = DIR_END;
    for (;;) {
	if (!*this) { /* Failure */
	    return NULL;
	} else if ((c = cmp_key(tb, key, key_base, *this)) < 0) {
	    dstack[dpos++] = DIR_LEFT;
	    tstack[tpos++] = this;
	    this = &((*this)->left);
	} else if (c > 0) { /* go right */
	    dstack[dpos++] = DIR_RIGHT;
	    tstack[tpos++] = this;
	    this = &((*this)->right);
	} else { /* Equal key, found the one to delete*/
	    q = (*this);
	    if (q->right == NULL) {
		(*this) = q->left;
		state = 1;
	    } else if (q->left == NULL) {
		(*this) = q->right;
		state = 1;
	    } else {
		dstack[dpos++] = DIR_LEFT;
		tstack[tpos++] = this;
		state = delsub(this);
	    }
	    erts_smp_atomic_dec_nob(&tb->common.nitems);
	    break;
	}
    }
    while (state && ( dir = dstack[--dpos] ) != DIR_END) {
	this = tstack[--tpos];
	if (dir == DIR_LEFT) {
	    state = balance_left(this);
	} else {
	    state = balance_right(this);
	}
    }
    return q;
}

static TreeDbTerm *linkout_object_tree(DbTableTree *tb, 
				       Eterm object)
{
    TreeDbTerm **tstack[STACK_NEED];
    int tpos = 0;
    int dstack[STACK_NEED+1];
    int dpos = 0;
    int state = 0;
    TreeDbTerm **this = &tb->root;
    Sint c;
    int dir;
    TreeDbTerm *q = NULL;
    Eterm key;

    /*
     * Somewhat complicated, deletion in an AVL tree,
     * The two helpers balance_left and balance_right are used to
     * keep the balance. As in insert, we do the stacking ourselves.
     */

    
    key = GETKEY(tb, tuple_val(object));

    reset_static_stack(tb);
    dstack[dpos++] = DIR_END;
    for (;;) {
	if (!*this) { /* Failure */
	    return NULL;
	} else if ((c = cmp_key(tb,key,NULL,*this)) < 0) {
	    dstack[dpos++] = DIR_LEFT;
	    tstack[tpos++] = this;
	    this = &((*this)->left);
	} else if (c > 0) { /* go right */
	    dstack[dpos++] = DIR_RIGHT;
	    tstack[tpos++] = this;
	    this = &((*this)->right);
	} else { /* Equal key, found the only possible matching object*/
	    if (!db_eq(&tb->common,object,&(*this)->dbterm)) {
		return NULL;
	    }
	    q = (*this);
	    if (q->right == NULL) {
		(*this) = q->left;
		state = 1;
	    } else if (q->left == NULL) {
		(*this) = q->right;
		state = 1;
	    } else {
		dstack[dpos++] = DIR_LEFT;
		tstack[tpos++] = this;
		state = delsub(this);
	    }
	    erts_smp_atomic_dec_nob(&tb->common.nitems);
	    break;
	}
    }
    while (state && ( dir = dstack[--dpos] ) != DIR_END) {
	this = tstack[--tpos];
	if (dir == DIR_LEFT) {
	    state = balance_left(this);
	} else {
	    state = balance_right(this);
	}
    }
    return q;
}

/*
** For the select functions, analyzes the pattern and determines which
** part of the tree should be searched. Also compiles the match program
*/
static int analyze_pattern(DbTableTree *tb, Eterm pattern, 
			   struct mp_info *mpi)
{
    Eterm lst, tpl, ttpl;
    Eterm *matches,*guards, *bodies;
    Eterm sbuff[30];
    Eterm *buff = sbuff;
    Eterm *ptpl;
    int i;
    int num_heads = 0;
    Eterm key;
    Eterm partly_bound;
    int res;
    Eterm least = 0;
    Eterm most = 0;

    mpi->some_limitation = 1;
    mpi->got_partial = 0;
    mpi->something_can_match = 0;
    mpi->mp = NULL;
    mpi->all_objects = 1;
    mpi->save_term = NULL;

    for (lst = pattern; is_list(lst); lst = CDR(list_val(lst)))
	++num_heads;

    if (lst != NIL) {/* proper list... */
	return DB_ERROR_BADPARAM;
    }
    if (num_heads > 10) {
	buff = erts_alloc(ERTS_ALC_T_DB_TMP, sizeof(Eterm) * num_heads * 3);
    }

    matches = buff;
    guards = buff + num_heads;
    bodies = buff + (num_heads * 2);

    i = 0;
    for(lst = pattern; is_list(lst); lst = CDR(list_val(lst))) {
	Eterm body;
	ttpl = CAR(list_val(lst));
	if (!is_tuple(ttpl)) {
	    if (buff != sbuff) { 
		erts_free(ERTS_ALC_T_DB_TMP, buff);
	    }
	    return DB_ERROR_BADPARAM;
	}
	ptpl = tuple_val(ttpl);
	if (ptpl[0] != make_arityval(3U)) {
	    if (buff != sbuff) { 
		erts_free(ERTS_ALC_T_DB_TMP, buff);
	    }
	    return DB_ERROR_BADPARAM;
	}
	matches[i] = tpl = ptpl[1];
	guards[i] = ptpl[2];
	bodies[i] = body = ptpl[3];
	if (!is_list(body) || CDR(list_val(body)) != NIL ||
	    CAR(list_val(body)) != am_DollarUnderscore) {
	    mpi->all_objects = 0;
	}
	++i;

	partly_bound = NIL;
	res = key_given(tb, tpl, &mpi->save_term, &partly_bound);
	if ( res >= 0 ) {   /* Can match something */
	    key = 0;
	    mpi->something_can_match = 1;
	    if (res > 0) {
		key = GETKEY(tb,tuple_val(tpl)); 
	    } else if (partly_bound != NIL) {
		mpi->got_partial = 1;
		key = partly_bound;
	    } else {
		mpi->some_limitation = 0;
	    }
	    if (key != 0) {
		if (least == 0 || 
		    partly_bound_can_match_lesser(key,least)) {
		    least = key;
		}
		if (most == 0 || 
		    partly_bound_can_match_greater(key,most)) {
		    most = key;
		}
	    }
	}
    }
    mpi->least = least;
    mpi->most = most;

    /*
     * It would be nice not to compile the match_spec if nothing could match,
     * but then the select calls would not fail like they should on bad 
     * match specs that happen to specify non existent keys etc.
     */
    if ((mpi->mp = db_match_compile(matches, guards, bodies,
				    num_heads, DCOMP_TABLE, NULL)) 
	== NULL) {
	if (buff != sbuff) { 
	    erts_free(ERTS_ALC_T_DB_TMP, buff);
	}
	return DB_ERROR_BADPARAM;
    }
    if (buff != sbuff) { 
	erts_free(ERTS_ALC_T_DB_TMP, buff);
    }
    return DB_ERROR_NONE;
}

static int do_free_tree_cont(DbTableTree *tb, int num_left)
{
    TreeDbTerm *root;
    TreeDbTerm *p;

    for (;;) {
	root = POP_NODE(&tb->static_stack);
	if (root == NULL) break;
	for (;;) {
	    if ((p = root->left) != NULL) {
		root->left = NULL;
		PUSH_NODE(&tb->static_stack, root);
		root = p;
	    } else if ((p = root->right) != NULL) {
		root->right = NULL;
		PUSH_NODE(&tb->static_stack, root);
		root = p;
	    } else {
		free_term(tb, root);
		if (--num_left > 0) {
		    break;
		} else {
		    return 0;	/* Done enough for now */
		}
	    }
	}
    }
    return 1;
}

/*
 * Deletion helpers
 */
static int balance_left(TreeDbTerm **this) 
{
    TreeDbTerm *p, *p1, *p2;
    int b1, b2, h = 1;
    
    p = *this;
    switch (p->balance) {
    case -1:
	p->balance = 0;
	break;
    case 0:
	p->balance = 1;
	h = 0;
	break;
    case 1:
	p1 = p->right;
	b1 = p1->balance;
	if (b1 >= 0) { /* Single RR rotation */
	    p->right = p1->left;
	    p1->left = p;
	    if (b1 == 0) {
		p->balance = 1;
		p1->balance = -1;
		h = 0;
	    } else {
		p->balance = p1->balance = 0;
	    }
	    (*this) = p1;
	} else { /* Double RL rotation */
	    p2 = p1->left;
	    b2 = p2->balance;
	    p1->left = p2->right;
	    p2->right = p1;
	    p->right = p2->left;
	    p2->left = p;
	    p->balance = (b2 == 1) ? -1 : 0;
	    p1->balance = (b2 == -1) ? 1 : 0;
	    p2->balance = 0;
	    (*this) = p2;
	}
	break;
    }
    return h;
}

static int balance_right(TreeDbTerm **this) 
{
    TreeDbTerm *p, *p1, *p2;
    int b1, b2, h = 1;
    
    p = *this;
    switch (p->balance) {
    case 1:
	p->balance = 0;
	break;
    case 0:
	p->balance = -1;
	h = 0;
	break;
    case -1:
	p1 = p->left;
	b1 = p1->balance;
	if (b1 <= 0) { /* Single LL rotation */
	    p->left = p1->right;
	    p1->right = p;
	    if (b1 == 0) {
		p->balance = -1;
		p1->balance = 1;
		h = 0;
	    } else {
		p->balance = p1->balance = 0;
	    }
	    (*this) = p1;
	} else { /* Double LR rotation */
	    p2 = p1->right;
	    b2 = p2->balance;
	    p1->right = p2->left;
	    p2->left = p1;
	    p->left = p2->right;
	    p2->right = p;
	    p->balance = (b2 == -1) ? 1 : 0;
	    p1->balance = (b2 == 1) ? -1 : 0;
	    p2->balance = 0;
	    (*this) = p2;
	}
    }
    return h;
}

static int delsub(TreeDbTerm **this) 
{
    TreeDbTerm **tstack[STACK_NEED];
    int tpos = 0;
    TreeDbTerm *q = (*this);
    TreeDbTerm **r = &(q->left);
    int h;

    /*
     * Walk down the tree to the right and search 
     * for a void right child, pick that child out
     * and return it to be put in the deleted 
     * object's place.
     */
    
    while ((*r)->right != NULL) {
	tstack[tpos++] = r;
	r = &((*r)->right);
    }
    *this = *r;
    *r = (*r)->left;
    (*this)->left = q->left;
    (*this)->right = q->right;
    (*this)->balance = q->balance;
    tstack[0] = &((*this)->left);
    h = 1;
    while (tpos && h) {
	r = tstack[--tpos];
	h = balance_right(r);
    }
    return h;
}

/*
 * Helper for db_slot
 */

static TreeDbTerm *slot_search(Process *p, DbTableTree *tb, Sint slot)
{
    TreeDbTerm *this;
    TreeDbTerm *tmp;
    DbTreeStack* stack = get_any_stack(tb);
    ASSERT(stack != NULL);

    if (slot == 1) { /* Don't search from where we are if we are 
			looking for the first slot */
	stack->slot = 0;
    }

    if (stack->slot == 0) { /* clear stack if slot positions 
				are not recorded */
	stack->pos = 0;
    }
    if (EMPTY_NODE(stack)) {
	this = tb->root;
	if (this == NULL)
	    goto done;
	while (this->left != NULL){
	    PUSH_NODE(stack, this);
	    this = this->left;
	}
	PUSH_NODE(stack, this);
	stack->slot = 1;
    }
    this = TOP_NODE(stack);
    while (stack->slot != slot && this != NULL) {
	if (slot > stack->slot) {
	    if (this->right != NULL) {
		this = this->right;
		while (this->left != NULL) {
		    PUSH_NODE(stack, this);
		    this = this->left;
		}
		PUSH_NODE(stack, this);
	    } else {
		for (;;) {
		    tmp = POP_NODE(stack);
		    this = TOP_NODE(stack);
		    if (this == NULL || this->left == tmp)
			break;
		}
	    }		
	    ++(stack->slot);
	} else {
	    if (this->left != NULL) {
		this = this->left;
		while (this->right != NULL) {
		    PUSH_NODE(stack, this);
		    this = this->right;
		}
		PUSH_NODE(stack, this);
	    } else {
		for (;;) {
		    tmp = POP_NODE(stack);
		    this = TOP_NODE(stack);
		    if (this == NULL || this->right == tmp)
			break;
		}
	    }		
	    --(stack->slot);
	}
    }
done:
    release_stack(tb,stack);
    return this;
}

/*
 * Find next and previous in sort order
 */

static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack* stack,
			     Eterm key, Eterm* key_base)
{
    TreeDbTerm *this;
    TreeDbTerm *tmp;
    Sint c;

    if(( this = TOP_NODE(stack)) != NULL) {
	if (!cmp_key_eq(tb,key,key_base,this)) {
	    /* Start from the beginning */
	    stack->pos = stack->slot = 0;
	}
    }
    if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */
	if (( this = tb->root ) == NULL)
	    return NULL;
	for (;;) {
	    PUSH_NODE(stack, this);
	    if (( c = cmp_key(tb,key,key_base,this) ) > 0) {
		if (this->right == NULL) /* We are at the previos 
					    and the element does
					    not exist */
		    break;
		else
		    this = this->right;
	    } else if (c < 0) {
		if (this->left == NULL) /* Done */
		    return this;
		else
		    this = this->left;
	    } else
		break;
	}
    }
    /* The next element from this... */
    if (this->right != NULL) {
	this = this->right;
	PUSH_NODE(stack,this);
	while (this->left != NULL) {
	    this = this->left;
	    PUSH_NODE(stack, this);
	}
	if (stack->slot > 0) 
	    ++(stack->slot);
    } else {
	do {
	    tmp = POP_NODE(stack);
	    if (( this = TOP_NODE(stack)) == NULL) {
		stack->slot = 0;
		return NULL;
	    }
	} while (this->right == tmp);
	if (stack->slot > 0) 
	    ++(stack->slot);
    }
    return this;
}

static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack* stack,
			     Eterm key, Eterm* key_base)
{
    TreeDbTerm *this;
    TreeDbTerm *tmp;
    Sint c;

    if(( this = TOP_NODE(stack)) != NULL) {
	if (!cmp_key_eq(tb,key,key_base,this)) {
	    /* Start from the beginning */
	    stack->pos = stack->slot = 0;
	}
    }
    if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */
	if (( this = tb->root ) == NULL)
	    return NULL;
	for (;;) {
	    PUSH_NODE(stack, this);
	    if (( c = cmp_key(tb,key,key_base,this) ) < 0) {
		if (this->left == NULL) /* We are at the next 
					   and the element does
					   not exist */
		    break;
		else
		    this = this->left;
	    } else if (c > 0) {
		if (this->right == NULL) /* Done */
		    return this;
		else
		    this = this->right;
	    } else
		break;
	}
    }
    /* The previous element from this... */
    if (this->left != NULL) {
	this = this->left;
	PUSH_NODE(stack,this);
	while (this->right != NULL) {
	    this = this->right;
	    PUSH_NODE(stack, this);
	}
	if (stack->slot > 0) 
	    --(stack->slot);
    } else {
	do {
	    tmp = POP_NODE(stack);
	    if (( this = TOP_NODE(stack)) == NULL) {
		stack->slot = 0;
		return NULL;
	    }
	} while (this->left == tmp);
	if (stack->slot > 0) 
	    --(stack->slot);
    }
    return this;
}

static TreeDbTerm *find_next_from_pb_key(DbTableTree *tb, DbTreeStack* stack,
					 Eterm key)
{
    TreeDbTerm *this;
    TreeDbTerm *tmp;
    Sint c;

    /* spool the stack, we have to "re-search" */
    stack->pos = stack->slot = 0;
    if (( this = tb->root ) == NULL)
	return NULL;
    for (;;) {
	PUSH_NODE(stack, this);
	if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl),
				   this->dbterm.tpl) ) >= 0) {
	    if (this->right == NULL) {
		do {
		    tmp = POP_NODE(stack);
		    if (( this = TOP_NODE(stack)) == NULL) {
			return NULL;
		    }
		} while (this->right == tmp);
		return this;
	    } else
		this = this->right;
	} else /*if (c < 0)*/ {
	    if (this->left == NULL) /* Done */
		return this;
	    else
		this = this->left;
	} 
    }
}

static TreeDbTerm *find_prev_from_pb_key(DbTableTree *tb, DbTreeStack* stack,
					 Eterm key)
{
    TreeDbTerm *this;
    TreeDbTerm *tmp;
    Sint c;

    /* spool the stack, we have to "re-search" */
    stack->pos = stack->slot = 0;
    if (( this = tb->root ) == NULL)
	return NULL;
    for (;;) {
	PUSH_NODE(stack, this);
	if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl),
				   this->dbterm.tpl) ) <= 0) {
	    if (this->left == NULL) {
		do {
		    tmp = POP_NODE(stack);
		    if (( this = TOP_NODE(stack)) == NULL) {
			return NULL;
		    }
		} while (this->left == tmp);
		return this;
	    } else
		this = this->left;
	} else /*if (c < 0)*/ {
	    if (this->right == NULL) /* Done */
		return this;
	    else
		this = this->right;
	} 
    }
}


/*
 * Just lookup a node
 */
static TreeDbTerm *find_node(DbTableTree *tb, Eterm key)
{
    TreeDbTerm *this;
    Sint res;
    DbTreeStack* stack = get_static_stack(tb);

    if(!stack || EMPTY_NODE(stack)
       || !cmp_key_eq(tb, key, NULL, (this=TOP_NODE(stack)))) {

	this = tb->root;
	while (this != NULL && (res = cmp_key(tb,key,NULL,this)) != 0) {
	    if (res < 0)
		this = this->left;
	    else
		this = this->right;
	}
    }
    if (stack) {
	release_stack(tb,stack);
    }
    return this;
}

/*
 * Lookup a node and return the address of the node pointer in the tree
 */
static TreeDbTerm **find_node2(DbTableTree *tb, Eterm key)
{
    TreeDbTerm **this;
    Sint res;

    this = &tb->root;
    while ((*this) != NULL && (res = cmp_key(tb, key, NULL, *this)) != 0) {
	if (res < 0)
	    this = &((*this)->left);
	else
	    this = &((*this)->right);
    }
    if (*this == NULL)
	return NULL;
    return this;
}

static int db_lookup_dbterm_tree(DbTable *tbl, Eterm key, DbUpdateHandle* handle)
{
    DbTableTree *tb = &tbl->tree;
    TreeDbTerm **pp = find_node2(tb, key);

    if (pp == NULL) return 0;

    handle->tb = tbl;
    handle->dbterm = &(*pp)->dbterm;
    handle->mustResize = 0;
    handle->bp = (void**) pp;
    handle->new_size = (*pp)->dbterm.size;
#if HALFWORD_HEAP
    handle->abs_vec = NULL;
#endif
    return 1;
}

static void db_finalize_dbterm_tree(DbUpdateHandle* handle)
{
    if (handle->mustResize) {
	TreeDbTerm* oldp = (TreeDbTerm*) *handle->bp;

	db_finalize_resize(handle, offsetof(TreeDbTerm,dbterm));
	reset_static_stack(&handle->tb->tree);

	free_term(&handle->tb->tree, oldp);
    }
#ifdef DEBUG
    handle->dbterm = 0;
#endif
    return;
}   

/*
 * Traverse the tree with a callback function, used by db_match_xxx
 */
static void traverse_backwards(DbTableTree *tb,
			       DbTreeStack* stack,
			       Eterm lastkey, Eterm* lk_base,
			       int (*doit)(DbTableTree *,
					   TreeDbTerm *,
					   void *,
					   int),
			       void *context) 
{
    TreeDbTerm *this, *next;

    if (lastkey == THE_NON_VALUE) {
	stack->pos = stack->slot = 0;
	if (( this = tb->root ) == NULL) {
	    return;
	}
	while (this != NULL) {
	    PUSH_NODE(stack, this);
	    this = this->right;
	}
	this = TOP_NODE(stack);
	next = find_prev(tb, stack, GETKEY(tb, this->dbterm.tpl),
			 this->dbterm.tpl);
	if (!((*doit)(tb, this, context, 0)))
	    return;
    } else {
	next = find_prev(tb, stack, lastkey, lk_base);
    }

    while ((this = next) != NULL) {
	next = find_prev(tb, stack, GETKEY(tb, this->dbterm.tpl), this->dbterm.tpl);
	if (!((*doit)(tb, this, context, 0)))
	    return;
    }
}

/*
 * Traverse the tree with a callback function, used by db_match_xxx
 */
static void traverse_forward(DbTableTree *tb,
			     DbTreeStack* stack,
			     Eterm lastkey, Eterm* lk_base,
			     int (*doit)(DbTableTree *,
					 TreeDbTerm *,
					 void *,
					 int),
			     void *context) 
{
    TreeDbTerm *this, *next;

    if (lastkey == THE_NON_VALUE) {
	stack->pos = stack->slot = 0;
	if (( this = tb->root ) == NULL) {
	    return;
	}
	while (this != NULL) {
	    PUSH_NODE(stack, this);
	    this = this->left;
	}
	this = TOP_NODE(stack);
	next = find_next(tb, stack, GETKEY(tb, this->dbterm.tpl), this->dbterm.tpl);
	if (!((*doit)(tb, this, context, 1)))
	    return;
    } else {
	next = find_next(tb, stack, lastkey, lk_base);
    }

    while ((this = next) != NULL) {
	next = find_next(tb, stack, GETKEY(tb, this->dbterm.tpl), this->dbterm.tpl);
	if (!((*doit)(tb, this, context, 1)))
	    return;
    }
}

/*
 * Returns 0 if not given 1 if given and -1 on no possible match
 * if key is given; *ret is set to point to the object concerned.
 */
static int key_given(DbTableTree *tb, Eterm pattern, TreeDbTerm **ret, 
		     Eterm *partly_bound)
{
    TreeDbTerm *this;
    Eterm key;

    ASSERT(ret != NULL);
    if (pattern == am_Underscore || db_is_variable(pattern) != -1)
	return 0;
    key = db_getkey(tb->common.keypos, pattern);
    if (is_non_value(key))
	return -1;  /* can't possibly match anything */
    if (!db_has_variable(key)) {   /* Bound key */
	if (( this = find_node(tb, key) ) == NULL) {
	    return -1;
	}
	*ret = this;
	return 1;
    } else if (partly_bound != NULL && key != am_Underscore && 
	       db_is_variable(key) < 0)
	*partly_bound = key;
	
    return 0;
}



static Sint do_cmp_partly_bound(Eterm a, Eterm b, Eterm* b_base, int *done)
{
    Eterm* aa;
    Eterm* bb;
    Eterm a_hdr;
    Eterm b_hdr;
    int i;
    Sint j;

    /* A variable matches anything */
    if (is_atom(a) && (a == am_Underscore || (db_is_variable(a) >= 0))) {
	*done = 1;
	return 0;
    }
    if (is_same(a,NULL,b,b_base))
	return 0;
    
    switch (a & _TAG_PRIMARY_MASK) {
    case TAG_PRIMARY_LIST:
	if (!is_list(b)) {
	    return cmp_rel(a,NULL,b,b_base);
	}
	aa = list_val(a);
	bb = list_val_rel(b,b_base);
	while (1) {
	    if ((j = do_cmp_partly_bound(*aa++, *bb++, b_base, done)) != 0 || *done)
		return j;
	    if (is_same(*aa, NULL, *bb, b_base))
		return 0;
	    if (is_not_list(*aa) || is_not_list(*bb))
		return do_cmp_partly_bound(*aa, *bb, b_base, done);
	    aa = list_val(*aa);
	    bb = list_val_rel(*bb,b_base);
	}
    case TAG_PRIMARY_BOXED:
	if ((b & _TAG_PRIMARY_MASK) != TAG_PRIMARY_BOXED) {
	    return cmp_rel(a,NULL,b,b_base);
	}
	a_hdr = ((*boxed_val(a)) & _TAG_HEADER_MASK) >> _TAG_PRIMARY_SIZE;
	b_hdr = ((*boxed_val_rel(b,b_base)) & _TAG_HEADER_MASK) >> _TAG_PRIMARY_SIZE;
	if (a_hdr != b_hdr) {
	    return cmp_rel(a, NULL, b, b_base);
	}
	if (a_hdr == (_TAG_HEADER_ARITYVAL >> _TAG_PRIMARY_SIZE)) {
	    aa = tuple_val(a);
	    bb = tuple_val_rel(b, b_base);
	    /* compare the arities */
	    i = arityval(*aa);	/* get the arity*/
	    if (i < arityval(*bb)) return(-1);
	    if (i > arityval(*bb)) return(1);
	    while (i--) {
		if ((j = do_cmp_partly_bound(*++aa, *++bb, b_base, done)) != 0
		    || *done) 
		    return j;
	    }
	    return 0;
	}
	/* Drop through */
      default:
	  return cmp_rel(a, NULL, b, b_base);
    }
}

static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key, Eterm* bk_base)
{
    int done = 0;
    Sint ret = do_cmp_partly_bound(partly_bound_key, bound_key, bk_base, &done);
#ifdef HARDDEBUG
    erts_fprintf(stderr,"\ncmp_partly_bound: %T", partly_bound_key);
    if (ret < 0)
	erts_fprintf(stderr," < ");
    else if (ret > 0)
	erts_fprintf(stderr," > ");
    else
	erts_fprintf(stderr," == ");
    erts_fprintf(stderr,"%R\n", bound_key, bk_base);
#endif
    return ret;
}

/*
** For partly_bound debugging....
**
BIF_RETTYPE ets_testnisse_2(BIF_ALIST_2)
BIF_ADECL_2
{
    Eterm r1 = make_small(partly_bound_can_match_lesser(BIF_ARG_1,
							BIF_ARG_2));
    Eterm r2 = make_small(partly_bound_can_match_greater(BIF_ARG_1,
							 BIF_ARG_2));
    Eterm *hp = HAlloc(BIF_P,3);
    Eterm ret;

    ret = TUPLE2(hp,r1,r2);
    BIF_RET(ret);
}
**
*/
static int partly_bound_can_match_lesser(Eterm partly_bound_1, 
					 Eterm partly_bound_2) 
{
    int done = 0;
    int ret = do_partly_bound_can_match_lesser(partly_bound_1, 
					       partly_bound_2, 
					       &done);
#ifdef HARDDEBUG
    erts_fprintf(stderr,"\npartly_bound_can_match_lesser: %T",partly_bound_1);
    if (ret)
	erts_fprintf(stderr," can match lesser than ");
    else
	erts_fprintf(stderr," can not match lesser than ");
    erts_fprintf(stderr,"%T\n",partly_bound_2);
#endif
    return ret;
}

static int partly_bound_can_match_greater(Eterm partly_bound_1, 
					  Eterm partly_bound_2) 
{
    int done = 0;
    int ret = do_partly_bound_can_match_greater(partly_bound_1, 
						partly_bound_2, 
						&done);
#ifdef HARDDEBUG
    erts_fprintf(stderr,"\npartly_bound_can_match_greater: %T",partly_bound_1);
    if (ret)
	erts_fprintf(stderr," can match greater than ");
    else
	erts_fprintf(stderr," can not match greater than ");
    erts_fprintf(stderr,"%T\n",partly_bound_2);
#endif
    return ret;
}

static int do_partly_bound_can_match_lesser(Eterm a, Eterm b, 
					    int *done)
{
    Eterm* aa;
    Eterm* bb;
    Sint i;
    int j;

    if (is_atom(a) && (a == am_Underscore || 
		       (db_is_variable(a) >= 0))) {
	*done = 1;
	if (is_atom(b) && (b == am_Underscore || 
			   (db_is_variable(b) >= 0))) {
	    return 0;
	} else {
	    return 1;
	}
    } else if (is_atom(b) && (b == am_Underscore || 
			      (db_is_variable(b) >= 0))) {
	*done = 1;
	return 0;
    }

    if (a == b)
	return 0;

    if (not_eq_tags(a,b)) {
	*done = 1;
	return (CMP(a, b) < 0) ? 1 : 0;
    }

    /* we now know that tags are the same */
    switch (tag_val_def(a)) {
    case TUPLE_DEF:
	aa = tuple_val(a);
	bb = tuple_val(b);
	/* compare the arities */
	if (arityval(*aa) < arityval(*bb)) return 1;
	if (arityval(*aa) > arityval(*bb)) return 0;
	i = arityval(*aa);	/* get the arity*/
	while (i--) {
	    if ((j = do_partly_bound_can_match_lesser(*++aa, *++bb, 
						      done)) != 0 
		|| *done) 
		return j;
	}
	return 0;
    case LIST_DEF:
	aa = list_val(a);
	bb = list_val(b);
	while (1) {
	    if ((j = do_partly_bound_can_match_lesser(*aa++, *bb++, 
						      done)) != 0 
		|| *done) 
		return j;
	    if (*aa==*bb)
		return 0;
	    if (is_not_list(*aa) || is_not_list(*bb))
		return do_partly_bound_can_match_lesser(*aa, *bb, 
							done);
	    aa = list_val(*aa);
	    bb = list_val(*bb);
	}
    default:
	if((i = CMP(a, b)) != 0) {
	    *done = 1;
	}
	return (i < 0) ? 1 : 0;
    }
}

static int do_partly_bound_can_match_greater(Eterm a, Eterm b, 
					    int *done)
{
    Eterm* aa;
    Eterm* bb;
    Sint i;
    int j;

    if (is_atom(a) && (a == am_Underscore || 
		       (db_is_variable(a) >= 0))) {
	*done = 1;
	if (is_atom(b) && (b == am_Underscore || 
			   (db_is_variable(b) >= 0))) {
	    return 0;
	} else {
	    return 1;
	}
    } else if (is_atom(b) && (b == am_Underscore || 
			      (db_is_variable(b) >= 0))) {
	*done = 1;
	return 0;
    }

    if (a == b)
	return 0;

    if (not_eq_tags(a,b)) {
	*done = 1;
	return (CMP(a, b) > 0) ? 1 : 0;
    }

    /* we now know that tags are the same */
    switch (tag_val_def(a)) {
    case TUPLE_DEF:
	aa = tuple_val(a);
	bb = tuple_val(b);
	/* compare the arities */
	if (arityval(*aa) < arityval(*bb)) return 0;
	if (arityval(*aa) > arityval(*bb)) return 1;
	i = arityval(*aa);	/* get the arity*/
	while (i--) {
	    if ((j = do_partly_bound_can_match_greater(*++aa, *++bb, 
						      done)) != 0 
		|| *done) 
		return j;
	}
	return 0;
    case LIST_DEF:
	aa = list_val(a);
	bb = list_val(b);
	while (1) {
	    if ((j = do_partly_bound_can_match_greater(*aa++, *bb++, 
						      done)) != 0 
		|| *done) 
		return j;
	    if (*aa==*bb)
		return 0;
	    if (is_not_list(*aa) || is_not_list(*bb))
		return do_partly_bound_can_match_greater(*aa, *bb, 
							done);
	    aa = list_val(*aa);
	    bb = list_val(*bb);
	}
    default:
	if((i = CMP(a, b)) != 0) {
	    *done = 1;
	}
	return (i > 0) ? 1 : 0;
    }
}

/*
 * Callback functions for the different match functions
 */

static int doit_select(DbTableTree *tb, TreeDbTerm *this, void *ptr,
		       int forward)
{
    struct select_context *sc = (struct select_context *) ptr;
    Eterm ret;
    Eterm* hp;

    sc->lastobj = this->dbterm.tpl;
    
    if (sc->end_condition != NIL && 
	((forward && 
	  cmp_partly_bound(sc->end_condition, 
			   GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl),
			   this->dbterm.tpl) < 0) ||
	 (!forward && 
	  cmp_partly_bound(sc->end_condition, 
			   GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl),
			   this->dbterm.tpl) > 0))) {
	return 0;
    }
    ret = db_match_dbterm(&tb->common,sc->p,sc->mp,sc->all_objects,
			  &this->dbterm, &hp, 2);
    if (is_value(ret)) {
	sc->accum = CONS(hp, ret, sc->accum);
    }
    if (MBUF(sc->p)) {
	/*
	 * Force a trap and GC if a heap fragment was created. Many heap fragments
	 * make the GC slow.
	 */
	sc->max = 0;
    }
    if (--(sc->max) <= 0) {
	return 0;
    }
    return 1;
}

static int doit_select_count(DbTableTree *tb, TreeDbTerm *this, void *ptr,
			     int forward)
{
    struct select_count_context *sc = (struct select_count_context *) ptr;
    Eterm ret;

    sc->lastobj = this->dbterm.tpl;
    
    /* Always backwards traversing */
    if (sc->end_condition != NIL && 
	(cmp_partly_bound(sc->end_condition, 
			  GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl),
			  this->dbterm.tpl) > 0)) {
	return 0;
    }
    ret = db_match_dbterm(&tb->common, sc->p, sc->mp, 0,
			  &this->dbterm, NULL, 0);
    if (ret == am_true) {
	++(sc->got);
    }
    if (--(sc->max) <= 0) {
	return 0;
    }
    return 1;
}

static int doit_select_chunk(DbTableTree *tb, TreeDbTerm *this, void *ptr,
			     int forward)
{
    struct select_context *sc = (struct select_context *) ptr;
    Eterm ret;
    Eterm* hp;

    sc->lastobj = this->dbterm.tpl;
    
    if (sc->end_condition != NIL && 
	((forward && 
	  cmp_partly_bound(sc->end_condition, 
			   GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl),
			   this->dbterm.tpl) < 0) ||
	 (!forward && 
	  cmp_partly_bound(sc->end_condition, 
			   GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl),
			   this->dbterm.tpl) > 0))) {
	return 0;
    }

    ret = db_match_dbterm(&tb->common, sc->p, sc->mp, sc->all_objects,
			  &this->dbterm, &hp, 2);
    if (is_value(ret)) {
	++(sc->got);
	sc->accum = CONS(hp, ret, sc->accum);
    }
    if (MBUF(sc->p)) {
	/*
	 * Force a trap and GC if a heap fragment was created. Many heap fragments
	 * make the GC slow.
	 */
	sc->max = 0;
    }
    if (--(sc->max) <= 0 || sc->got == sc->chunk_size) {
	return 0;
    }
    return 1;
}


static int doit_select_delete(DbTableTree *tb, TreeDbTerm *this, void *ptr,
			      int forward)
{
    struct select_delete_context *sc = (struct select_delete_context *) ptr;
    Eterm ret;
    Eterm key;

    if (sc->erase_lastterm)
	free_term(tb, sc->lastterm);
    sc->erase_lastterm = 0;
    sc->lastterm = this;
    
    if (sc->end_condition != NIL && 
	cmp_partly_bound(sc->end_condition, 
			 GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl),
			 this->dbterm.tpl) > 0)
	return 0;
    ret = db_match_dbterm(&tb->common, sc->p, sc->mp, 0,
			  &this->dbterm, NULL, 0);
    if (ret == am_true) {
	key = GETKEY(sc->tb, this->dbterm.tpl);
	linkout_tree(sc->tb, key, this->dbterm.tpl);
	sc->erase_lastterm = 1;
	++sc->accum;
    }
    if (--(sc->max) <= 0) {
	return 0;
    }
    return 1;
}

#ifdef TREE_DEBUG
static void do_dump_tree2(DbTableTree* tb, int to, void *to_arg, int show,
			  TreeDbTerm *t, int offset)
{
    if (t == NULL)
	return;
    do_dump_tree2(tb, to, to_arg, show, t->right, offset + 4);
    if (show) {
	const char* prefix;
	Eterm term;
	if (tb->common.compress) {
	    prefix = "key=";
	    term = GETKEY(tb, t->dbterm.tpl);
	}
	else {
	    prefix = "";
	    term = make_tuple_rel(t->dbterm.tpl,t->dbterm.tpl);
	}
	erts_print(to, to_arg, "%*s%s%R (addr = %p, bal = %d)\n",
		   offset, "", prefix, term, t->dbterm.tpl,
		   t, t->balance);
    }
    do_dump_tree2(tb, to, to_arg, show, t->left, offset + 4); 
}

#endif

#ifdef HARDDEBUG

void db_check_table_tree(DbTable *tbl)
{
    DbTableTree *tb = &tbl->tree;
    check_table_tree(tb, tb->root);
    check_saved_stack(tb);
    check_slot_pos(tb);
}

static TreeDbTerm *traverse_until(TreeDbTerm *t, int *current, int to)
{
    TreeDbTerm *tmp;
    if (t == NULL) 
	return NULL;
    tmp = traverse_until(t->left, current, to);
    if (tmp != NULL)
	return tmp;
    ++(*current);
    if (*current == to)
	return t;
    return traverse_until(t->right, current, to);
}

static void check_slot_pos(DbTableTree *tb)
{
    int pos = 0;
    TreeDbTerm *t;
    if (tb->stack.slot == 0 || tb->stack.pos == 0)
	return;
    t = traverse_until(tb->root, &pos, tb->stack.slot);
    if (t != tb->stack.array[tb->stack.pos - 1]) {
	erts_fprintf(stderr, "Slot position does not correspont with stack, "
		   "element position %d is really 0x%08X, when stack says "
		   "it's 0x%08X\n", tb->stack.slot, t, 
		   tb->stack.array[tb->stack.pos - 1]);
	do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
    }
}
	

static void check_saved_stack(DbTableTree *tb)
{
     TreeDbTerm *t = tb->root;
     DbTreeStack* stack = &tb->static_stack;
     int n = 0;
     if (stack->pos == 0)
	 return;
     if (t != stack->array[0]) {
	 erts_fprintf(stderr,"tb->stack[0] is 0x%08X, should be 0x%08X\n",
		      stack->array[0], t);
	 do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
	 return;
     }
     while (n < stack->pos) {
	 if (t == NULL) {
	     erts_fprintf(stderr, "NULL pointer in tree when stack not empty,"
			" stack depth is %d\n", n);
	     do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
	     return;
	 }
	 n++;
	 if (n < stack->pos) {
	     if (stack->array[n] == t->left)
		 t = t->left;
	     else if (stack->array[n] == t->right)
		 t = t->right;
	     else {
		 erts_fprintf(stderr, "tb->stack[%d] == 0x%08X does not "
			    "represent child pointer in tree!"
			    "(left == 0x%08X, right == 0x%08X\n", 
			    n, tb->stack[n], t->left, t->right);
		 do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
		 return;
	     }
	 }
     }
}

static int check_table_tree(DbTableTree* tb, TreeDbTerm *t)
{
    int lh, rh;
    if (t == NULL)
	return 0;
    lh = check_table_tree(tb, t->left);
    rh = check_table_tree(tb, t->right);
    if ((rh - lh) != t->balance) {
	erts_fprintf(stderr, "Invalid tree balance for this node:\n");
	erts_fprintf(stderr,"balance = %d, left = 0x%08X, right = 0x%08X\n",
		     t->balance, t->left, t->right);
	erts_fprintf(stderr,"\nDump:\n---------------------------------\n");
	do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, t, 0);
	erts_fprintf(stderr,"\n---------------------------------\n");
    }
    return ((rh > lh) ? rh : lh) + 1;
}
	
#endif