aboutsummaryrefslogblamecommitdiffstats
path: root/erts/emulator/beam/erl_db_tree.c
blob: f643344477a2d219e1eaf316282038fe400f5657 (plain) (tree)
1
2
3
4
5
6
7
8
9

                   
  
                                                        
  


                                                                   
  






                                                                           
  
































                                                                        
                             
 
                                                          
                                                                    
 

                                      





                                                          
 
                                               




                                                       
                                                                 
 
                                                                      







                                                                   
                                                                            

                       


                                                                     
     
                                                






                                                                                  
                                                                                        
 


                                                                             

          
                                           



                                                                                             
                                                       
 



                        

 
                                                           
 


                                       

 
                                                                       

                  

                                                                           

          
                                                                      


             
                                                                                 



                                                        

                                                                                     

          
                                                                                


             

  










                                                                       













                                                

                                                                       







                         







                                                                   









                                                                          


                                                                         













                                                                













                             







                              


                       









                         



                               

                      




                        


                  


                                                                                         
  

                        



                                                                             
                                                                


                                                  
                                      
















                                                                                
                                            
                                             
                                                           



                                                        

                                               
                                          
                                           
                                                           


                                                      
                                            

                                                        

                                                    
                                                                    



                                                                       

                                                                         

                                                                      
 
                                                                                

                                                                                           
                                         


                                        
                                               


                                              
                                               


                                              
                                                


                                               
                                                 


                                                     








                                                                 

                                                   





















                                                                       
                                                              
                                                                   
                                                                    
                                                            
                                                                    





                                                                         
                                                                     


                                                                          
                                                                      


                                                                           
                                                              
                                                   
                                                  
                                                  
 
                                                              




                                                                   
                                                                              



                                              




                                                                 






























                                                  

                                    
                 
                               
                             


                                










                             

                                                                         















                                                                              
                                                



                         

                                                                    
 

                       
 
                                  


                             
                                        
                                                              








                                          
                                                 
     
                                              


                         
                                                              

                                 






                                                           
                     


                                      
                                                     



                             
                                              


                         
                                                                        

                                 








                                                                         

                       
 
                                  


                             
                                         
                                                              







                                          

                                                 
     
                                              


                         
                                                             

                                 





                                                                              
                     


                                      
                                                     



                             


                                              
 






                                                                        
 
 
                                                                                  
                                               
                                                           

 

                                                                        
 





                                                                     
                             






                                     
                                        




                                           

                                                                             

                                       
                                        


                                                  
                                                       
                          







                                                                                 
                                                   








































































                                                                 
                                                                   

                                 





                                                                                  
               
                     







                                                                      
                                                  



                                              
                                          
                                                                        
                                   

                            



                         
                                                                       

                                 

                                                                      
 



                                                                                     


                         
                                                              

                                 





                                                                                          












                                                                       
                                                  


                               


                                                   
                                                                            



                         

                                                               

                                 






                                                                                



                    

                                                                      



                         
                                                             

                                 





                                                                              



                    

                                                                                           



                         
                                                                       

                                 






                                                                         

                   
                     











                                                                       
                             

                                 
                              








                                                                      
                                                           




                                        
                                    
                                                                            
                               

                        


                         






                                                                     

 
                                                  
 



                         
























































                                                                    






                                                                
 






















                                                                             
                             





                                                                      

                                                








                                          
                           



                                    
                                                         

                      
                                                                                  
                
                                                                                


                      
                                                                          
                
                                                                            

         
                                                      




















                                                                           
                                  
                                   
                                                     



















                                                               

                                                                     










                                                                 

                                                                     




                                               
                          
                           
                                             














                                                                         















                                                                              
 


                                                                            
 
                                                                                


                             
                                  























                                                 
                           


                      
                                                                                      








                                           

                                                   
                                     
                                                                              


                                           
                                                        

                                  
                                                                                      



                                                       
                                                                      

                                  
                                                                                     



                                                       
                                                                        
     
                                                      









                                                      
                          
                                                       
                                             
                                                   


                         
             
















                                                                          

                                                                 

                                 










                                                                      






















                                                                             
                                                                          


                            

                                                






                                          
                           





                                        


                                                                          







                                                               
                                                    



                                                     
                          








                                                    
                                             












                                                                           






                                                            

                                 








                                                                                 


                                   
                                  























                                                 
                           

               
                                                                                      








                                                  

                                                   
                                     
                                                                 


                                                              
                                                         
                              
                                                                                 




                                                   

                                                                          





                                                              
                          
                               
                                                           


                                  
                                                                                


                                       
                                             
                                                    


                         
             












                                                                          

                                                                    

                                 









                                                                                 


                             
                                  























                                                 
                           


                               
                                                                                      








                                           

                                                   
                                     
                                                                               







                                                                 
                                                        

                                  
                                                                                     



                                                       
                                                                              

                                  
                                                                                      



                                                       
                                                                            
     
                                                      





















                                                                      
                              
                                                           
                                                 
                                                        


                             
                 














                                                          
                          
                                                       
                                             
 
                                                    

                         
             














                                                                          



                                                                    

                                 












                                                                 












                                                 
                                                         















                                                                         
                                                          
             


                         







                                          
                                   
 
                                                                                     





                                                                  
                                                          
                                
                                                                    


                                                                
                          








                                                    
                                             












                                                                            



                                                             

                                 











                                                                    

                                    
                                  













                                                 
                                                  












                                                 



                                   
    
                                                                                                










                                                   
                                     
                                                                                    




                                                                 

                                                                                           



                                     
                                                                                     





                                                                 
                                                          
                          
                                 
                                                           


                                       
                                                                                


                                            
                                             
                                                    


                         
             






                                                                          
                                    







                                                                        

                                                                     

                                 










                                                                             



























                                                                             
                                                          





                           
                           






                                             


                                                                                   





































                                                                             



                                                              

                                 







                                                                                   


























                                                 
                   

                           
                           

                    
                                                                                                    








                                                
 
                                              







                                                                
                                                               




                                                                   
                                                            

                              
                                                                                 




                                                   

                                                                                   








                                                                          
                                                           


                                            
                                                                                



                                                 
                                                    


                         
             












                                                                          

                                                                      

                                 







                                                                         


                     
                                                        




                                              
                                                    



                                                                    
                             



                         






                                                                        



                                                             

                                                                   
 



                                                                         
                                                           



                                                                         
                                                                                  


      








                                                          
                                                   
                                                 
 
                                   
                                                                



             
                                                                  

                                 





                                               

                                                      



                                                        



                                                             
     
                

 
                                                                             
 


                                                  
                           
                                                     
                





                                                                      






                                                                       



                                                                       
                                                              












                                                              
                           


                                                      



                                              


                                                       

                                                                     




                                    
                             









                                                                    
                       



                                   
                                                       



















                                                        
                                             





                                                          
                                            
                
                                             




             

                                                                             





                                    
                             













                                                                    
                              



                                   
                                                     







                                                                        
                                                     













                                          
                                             





                                                          
                                            
                
                                             








                                                                       
                                                                               

                                                                                           

















                                    

















                                                                            


                    
 













                                                   

                                           
                                   
 
                                                                                         





                                                   

                                                           



                           
                                                                         












































                                                                             
                                                               
















                                                   
                                              



                                                            


             
                

 


                   
                                         











































                                               
                                          






































































                                                  
                                  







                     


                                                            


                     
                                                           











                                                                 
                    















































                                                           
                                            






                                       

                                                                 




                                           
                                       




                                                            
                                    


                                   
                                                   





                                                                  
                               































                                                    

                                                                 




                                           
                                       




                                                            
                                    


                                   
                                                   





                                                               
                               































                                                    

                                                                             






                                                 
                                


                               
                                                                             


















                                                            

                                                                             






                                                 
                                


                               
                                                                             






















                                                            

                                                                     


                     
                                                           
 
                                  
                                                         
 
                    
                                                                   





                                   
                
                                                          
     





                                                                       
                                                                               



                      
                
                                                                     









                                     




                                                                           

                                                                    











                                                            
                                    
























                                                               
                                               






                                


                                                                              
 
                                                          









                                         
                                                


                                                                
                                           

                                   

                                                                               


                         
                                                     




                                    


                                    
                          

                                          


             


                                                                     
 
                                 






                                                                                 
                                                
 

                                                                 
                                                                       
                                                
                                                                
                                            
 
                           






                       







                                                         


                                                                   

                                                 
                                                  
                                             
                                                           






                                                        
                                   
                                     
                                       






                                   
                                                                         


                                             
                                                    


                                   
                                                                         







                                                                   

                                               
                                                
                                           
                                                         






                                                      
                                   
                                     
                                       






                                   
                                                                         


                                             
                                                    


                                   
                                                                         





                                             

                                                                                

                                                        

                                                         
                                                                 









                                                               
                                       






                                   
                                                   


                                                
                                                                                


                   
                                                    


                                   
                                                   


                                                
                                                                                





                   


                                                                 

                                                                         
 
                      




                                                                  
                                         


                                                       
                                                           

                      
                    

                                                               
                                                           






                            
                                                            












                                                                         
                     




                                    
                            

                         
                         
                   
                                                                            
                         
                                  

                                                     
                                                           
                               
                               


                                                           
                            

                                                                          
                                                                          
                             
                            


                                                                   
                              




                                                  
                                                                      






                              
                          


     
                                                                       
                 
                                                                       







                                                                    
                                           

































                                                                              
                                                          
















                                                                               
                                                           
































                                                              
                                       


































                                                                    
                                  

































                                                               
                                       


































                                                                     
                                  









                                                       
                                                                      



                                                              
              





                                              
                                                                                 

                                              
                                                                                  

                 
                                                                   
                        














                                                                                  
                                                                            



                                                                          





                                             
                                                                                

                 
                                                                     








                           
                                                                            



                                                              
              





                                              
                                                                                 

                                              
                                                                                  


                 
                                                                    
                        
                    















                                                                                  
                                                                             



                                                                            


                           
                                              




                                            
                                                                            
                 
                                                                     

                                               
                                                       








                               
                                                                               


                                                                              
              








                                                                                   
                                                                        
 
                        

                                
            
                                               

                                           
      





                                      
                                     







                           
                 
                                                                            
                                                    

                  

                                                              
               

                           
                           




                                             
                                             
         

                                                                 
     
                                                              





                


                                              


                                      
                                   





























                                                                             
                                                                            













                                                                          
                                                                             





                                                                              
                                                                                 












                                                                         
                                                                                     





                        
                                                           



                  

                                        

                                                                      

                                                                            
                                                                            
                                                                     





                                                                     
/*
 * %CopyrightBegin%
 *
 * Copyright Ericsson AB 1998-2018. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * %CopyrightEnd%
 */

/*
** Implementation of ordered ETS tables.
** The tables are implemented as AVL trees (Published by Adelson-Velski 
** and Landis). A nice source for learning about these trees is
** Wirth's Algorithms + Datastructures = Programs.
** The implementation here is however not made with recursion
** as the examples in Wirths book are.
*/

/*
#ifdef DEBUG
#define HARDDEBUG 1
#endif
*/
#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "sys.h"
#include "erl_vm.h"
#include "global.h"
#include "erl_process.h"
#include "error.h"
#define ERTS_WANT_DB_INTERNAL__
#include "erl_db.h"
#include "bif.h"
#include "big.h"
#include "erl_binary.h"

#include "erl_db_tree.h"
#include "erl_db_tree_util.h"

#define GETKEY_WITH_POS(Keypos, Tplp) (*((Tplp) + Keypos))
#define NITEMS(tb) ((int)erts_atomic_read_nob(&(tb)->common.nitems))

#define TREE_MAX_ELEMENTS 0xFFFFFFFFUL

#define TOPN_NODE(Dtt, Pos)                   \
     (((Pos) < Dtt->pos) ? 			\
      (Dtt)->array[(Dtt)->pos - ((Pos) + 1)] : NULL)

#define REPLACE_TOP_NODE(Dtt, Node)          \
     if ((Dtt)->pos) (Dtt)->array[(Dtt)->pos - 1] = (Node)

#define EMPTY_NODE(Dtt) (TOP_NODE(Dtt) == NULL)


/* Obtain table static stack if available. NULL if not.
** Must be released with release_stack()
*/
ERTS_INLINE static DbTreeStack* get_static_stack(DbTableTree* tb)
{
    if (tb != NULL && !erts_atomic_xchg_acqb(&tb->is_stack_busy, 1)) {
	return &tb->static_stack;
    }
    return NULL;
}

/* Obtain static stack if available, otherwise empty dynamic stack.
** Must be released with release_stack()
*/
static DbTreeStack* get_any_stack(DbTable* tb, DbTableTree* stack_container)
{
    DbTreeStack* stack;
    if (stack_container != NULL &&
        !erts_atomic_xchg_acqb(&stack_container->is_stack_busy, 1)) {
	return &stack_container->static_stack;
    }
    stack = erts_db_alloc(ERTS_ALC_T_DB_STK, tb,
			  sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
    stack->pos = 0;
    stack->slot = 0;
    stack->array = (TreeDbTerm**) (stack + 1);
    return stack;
}

static void release_stack(DbTable* tb, DbTableTree* stack_container, DbTreeStack* stack)
{
    if (stack_container != NULL && stack == &stack_container->static_stack) {
	ASSERT(erts_atomic_read_nob(&stack_container->is_stack_busy) == 1);
	erts_atomic_set_relb(&stack_container->is_stack_busy, 0);
    }
    else {
	erts_db_free(ERTS_ALC_T_DB_STK, tb,
		     (void *) stack, sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
    }
}

static ERTS_INLINE void reset_stack(DbTreeStack* stack)
{
    if (stack != NULL) {
        stack->pos = 0;
        stack->slot = 0;
    }
}

static ERTS_INLINE void reset_static_stack(DbTableTree* tb)
{
    if (tb != NULL) {
        reset_stack(&tb->static_stack);
    }
}

static ERTS_INLINE TreeDbTerm* new_dbterm(DbTableCommon *tb, Eterm obj)
{
    TreeDbTerm* p;
    if (tb->compress) {
	p = db_store_term_comp(tb, NULL, offsetof(TreeDbTerm,dbterm), obj);
    }
    else {
	p = db_store_term(tb, NULL, offsetof(TreeDbTerm,dbterm), obj);
    }
    return p;
}
static ERTS_INLINE TreeDbTerm* replace_dbterm(DbTableCommon *tb, TreeDbTerm* old,
					      Eterm obj)
{
    TreeDbTerm* p;
    ASSERT(old != NULL);
    if (tb->compress) {
	p = db_store_term_comp(tb, &(old->dbterm), offsetof(TreeDbTerm,dbterm), obj);
    }
    else {
	p = db_store_term(tb, &(old->dbterm), offsetof(TreeDbTerm,dbterm), obj);
    }
    return p;
}

/*
 * Number of records to delete before trapping.
 */
#define DELETE_RECORD_LIMIT 12000

/* 
** Debugging
*/
#ifdef HARDDEBUG
static TreeDbTerm *traverse_until(TreeDbTerm *t, int *current, int to);
static void check_slot_pos(DbTableTree *tb);
static void check_saved_stack(DbTableTree *tb);

#define TREE_DEBUG
#endif

#ifdef TREE_DEBUG
/*
** Primitive trace macro
*/
#define DBG erts_fprintf(stderr,"%d\n",__LINE__)

/*
** Debugging dump
*/

static void do_dump_tree2(DbTableTree*, int to, void *to_arg, int show,
			  TreeDbTerm *t, int offset);

#else

#define DBG /* nothing */

#endif

/*
** Datatypes
*/

/* 
 * This structure is filled in by analyze_pattern() for the select 
 * functions.
 */
struct mp_info {
    int something_can_match;	/* The match_spec is not "impossible" */
    int some_limitation;	/* There is some limitation on the search
				 * area, i. e. least and/or most is set.*/
    int got_partial;		/* The limitation has a partially bound
				 * key */
    Eterm least;		/* The lowest matching key (possibly 
				 * partially bound expression) */
    Eterm most;                 /* The highest matching key (possibly 
				 * partially bound expression) */

    TreeDbTerm **save_term;      /* If the key is completely bound, this
	 			  * will be the Tree node we're searching
				  * for, otherwise it will be useless */
    Binary *mp;                 /* The compiled match program */
};

/*
 * Used by doit_select(_chunk)
 */
struct select_context {
    Process *p;
    Eterm accum;
    Binary *mp;
    Eterm end_condition;
    Eterm *lastobj;
    Sint32 max;
    int keypos;
    Sint got;
    Sint chunk_size;
};

/*
 * Used by doit_select_count
 */
struct select_count_context {
    Process *p;
    Binary *mp;
    Eterm end_condition;
    Eterm *lastobj;
    Sint32 max;
    int keypos;
    Sint got;
};

/*
 * Used by doit_select_delete
 */
struct select_delete_context {
    Process *p;
    DbTableCommon *tb;
    TreeDbTerm **root;
    DbTreeStack *stack;
    Uint accum;
    Binary *mp;
    Eterm end_condition;
    int erase_lastterm;
    TreeDbTerm *lastterm;
    Sint32 max;
    int keypos;
};

/*
 * Used by doit_select_replace
 */
struct select_replace_context {
    Process *p;
    DbTableCommon *tb;
    TreeDbTerm **root;
    Binary *mp;
    Eterm end_condition;
    Eterm *lastobj;
    Sint32 max;
    int keypos;
    Sint replaced;
};

/* Used by select_replace on analyze_pattern */
typedef int (*extra_match_validator_t)(int keypos, Eterm match, Eterm guard, Eterm body);

/*
** Forward declarations 
*/
static TreeDbTerm *linkout_tree(DbTableCommon *tb, TreeDbTerm **root,
                                Eterm key, DbTreeStack *stack);
static TreeDbTerm *linkout_object_tree(DbTableCommon *tb,  TreeDbTerm **root,
				       Eterm object, DbTableTree *stack);
static SWord do_free_tree_continue(DbTableTree *tb, SWord reds);
static void free_term(DbTable *tb, TreeDbTerm* p);
int tree_balance_left(TreeDbTerm **this); 
int tree_balance_right(TreeDbTerm **this); 
static int delsub(TreeDbTerm **this); 
static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root, Sint slot,
                               DbTable *tb, DbTableTree *stack_container);
static TreeDbTerm *find_node(DbTableCommon *tb, TreeDbTerm *root,
                             Eterm key, DbTableTree *stack_container);
static TreeDbTerm **find_node2(DbTableCommon *tb, TreeDbTerm **root, Eterm key);
static TreeDbTerm **find_ptr(DbTableCommon *tb, TreeDbTerm **root,
                             DbTreeStack *stack, TreeDbTerm *this);
static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root,
                             DbTreeStack* stack, Eterm key);
static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root,
                             DbTreeStack* stack, Eterm key);
static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
                                         DbTreeStack* stack, Eterm key);
static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
                                         DbTreeStack* stack, Eterm key);
static void traverse_backwards(DbTableCommon *tb,
                               TreeDbTerm **root,
			       DbTreeStack*,
			       Eterm lastkey,
			       int (*doit)(DbTableCommon *,
					   TreeDbTerm *,
					   void *,
					   int),
			       void *context); 
static void traverse_forward(DbTableCommon *tb,
                             TreeDbTerm **root,
			     DbTreeStack*,
			     Eterm lastkey,
			     int (*doit)(DbTableCommon *tb,
					 TreeDbTerm *,
					 void *,
					 int),
			     void *context);
static void traverse_update_backwards(DbTableCommon *tb,
                                      TreeDbTerm **root,
                                      DbTreeStack*,
                                      Eterm lastkey,
                                      int (*doit)(DbTableCommon *tb,
                                                  TreeDbTerm **, // out
                                                  void *,
                                                  int),
                                      void *context);
static int key_given(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
                     TreeDbTerm ***ret, Eterm *partly_bound);
static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key);
static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done);

static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern, 
                           extra_match_validator_t extra_validator, /* Optional callback */
                           struct mp_info *mpi);
static int doit_select(DbTableCommon *tb,
		       TreeDbTerm *this,
		       void *ptr,
		       int forward);
static int doit_select_count(DbTableCommon *tb,
			     TreeDbTerm *this,
			     void *ptr,
			     int forward);
static int doit_select_chunk(DbTableCommon *tb,
			     TreeDbTerm *this,
			     void *ptr,
			     int forward);
static int doit_select_delete(DbTableCommon *tb,
			      TreeDbTerm *this,
			      void *ptr,
			      int forward);
static int doit_select_replace(DbTableCommon *tb,
                               TreeDbTerm **this_ptr,
                               void *ptr,
                               int forward);

static int partly_bound_can_match_lesser(Eterm partly_bound_1, 
					 Eterm partly_bound_2);
static int partly_bound_can_match_greater(Eterm partly_bound_1, 
					  Eterm partly_bound_2); 
static int do_partly_bound_can_match_lesser(Eterm a, Eterm b, 
					    int *done);
static int do_partly_bound_can_match_greater(Eterm a, Eterm b, 
					     int *done);
static BIF_RETTYPE ets_select_reverse(BIF_ALIST_3);


/* Method interface functions */
static int db_first_tree(Process *p, DbTable *tbl, 
		  Eterm *ret);
static int db_next_tree(Process *p, DbTable *tbl, 
			Eterm key, Eterm *ret);
static int db_last_tree(Process *p, DbTable *tbl, 
			Eterm *ret);
static int db_prev_tree(Process *p, DbTable *tbl, 
			Eterm key,
			Eterm *ret);
static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail);
static int db_get_tree(Process *p, DbTable *tbl, 
		       Eterm key,  Eterm *ret);
static int db_member_tree(DbTable *tbl, Eterm key, Eterm *ret);
static int db_get_element_tree(Process *p, DbTable *tbl, 
			       Eterm key,int ndex,
			       Eterm *ret);
static int db_erase_tree(DbTable *tbl, Eterm key, Eterm *ret);
static int db_erase_object_tree(DbTable *tbl, Eterm object,Eterm *ret);
static int db_slot_tree(Process *p, DbTable *tbl, 
			Eterm slot_term,  Eterm *ret);
static int db_select_tree(Process *p, DbTable *tbl, Eterm tid,
			  Eterm pattern, int reversed, Eterm *ret);
static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid,
				Eterm pattern,  Eterm *ret);
static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
				Eterm pattern, Sint chunk_size,
				int reversed, Eterm *ret);
static int db_select_continue_tree(Process *p, DbTable *tbl,
				   Eterm continuation, Eterm *ret);
static int db_select_count_continue_tree(Process *p, DbTable *tbl,
					 Eterm continuation, Eterm *ret);
static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
				 Eterm pattern,  Eterm *ret);
static int db_select_delete_continue_tree(Process *p, DbTable *tbl, 
					  Eterm continuation, Eterm *ret);
static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid,
                                  Eterm pattern, Eterm *ret);
static int db_select_replace_continue_tree(Process *p, DbTable *tbl,
                                           Eterm continuation, Eterm *ret);
static int db_take_tree(Process *, DbTable *, Eterm, Eterm *);
static void db_print_tree(fmtfn_t to, void *to_arg,
			  int show, DbTable *tbl);
static int db_free_empty_table_tree(DbTable *tbl);

static SWord db_free_table_continue_tree(DbTable *tbl, SWord);

static void db_foreach_offheap_tree(DbTable *,
				    void (*)(ErlOffHeap *, void *),
				    void *);

static SWord db_delete_all_objects_tree(Process* p, DbTable* tbl, SWord reds);

#ifdef HARDDEBUG
static void db_check_table_tree(DbTable *tbl);
#endif
static int
db_lookup_dbterm_tree(Process *, DbTable *, Eterm key, Eterm obj,
                      DbUpdateHandle*);
static void
db_finalize_dbterm_tree(int cret, DbUpdateHandle *);

/*
** Static variables
*/

Export ets_select_reverse_exp;

/*
** External interface 
*/
DbTableMethod db_tree =
{
    db_create_tree,
    db_first_tree,
    db_next_tree,
    db_last_tree,
    db_prev_tree,
    db_put_tree,
    db_get_tree,
    db_get_element_tree,
    db_member_tree,
    db_erase_tree,
    db_erase_object_tree,
    db_slot_tree,
    db_select_chunk_tree,
    db_select_tree, /* why not chunk size=0 ??? */
    db_select_delete_tree,
    db_select_continue_tree,
    db_select_delete_continue_tree,
    db_select_count_tree,
    db_select_count_continue_tree,
    db_select_replace_tree,
    db_select_replace_continue_tree,
    db_take_tree,
    db_delete_all_objects_tree,
    db_free_empty_table_tree,
    db_free_table_continue_tree,
    db_print_tree,
    db_foreach_offheap_tree,
    db_lookup_dbterm_tree,
    db_finalize_dbterm_tree

};





void db_initialize_tree(void)
{
    erts_init_trap_export(&ets_select_reverse_exp, am_ets, am_reverse, 3,
			  &ets_select_reverse);
    return;
};

/*
** Table interface routines ie what's called by the bif's 
*/

int db_create_tree(Process *p, DbTable *tbl)
{
    DbTableTree *tb = &tbl->tree;
    tb->root = NULL;
    tb->static_stack.array = erts_db_alloc(ERTS_ALC_T_DB_STK,
					   (DbTable *) tb,
					   sizeof(TreeDbTerm *) * STACK_NEED);
    tb->static_stack.pos = 0;
    tb->static_stack.slot = 0;
    erts_atomic_init_nob(&tb->is_stack_busy, 0);
    tb->deletion = 0;
    return DB_ERROR_NONE;
}

int db_first_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
                         Eterm *ret, DbTableTree *stack_container)
{
    DbTreeStack* stack;
    TreeDbTerm *this;

    if (( this = root ) == NULL) {
	*ret = am_EOT;
	return DB_ERROR_NONE;
    }
    /* Walk down the tree to the left */
    if ((stack = get_static_stack(stack_container)) != NULL) {
	stack->pos = stack->slot = 0;
    }
    while (this->left != NULL) {
	if (stack) PUSH_NODE(stack, this);
	this = this->left;
    }
    if (stack) {
	PUSH_NODE(stack, this);
	stack->slot = 1;
	release_stack(tbl,stack_container,stack);
    }
    *ret = db_copy_key(p, tbl, &this->dbterm);
    return DB_ERROR_NONE;
}

static int db_first_tree(Process *p, DbTable *tbl, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    return db_first_tree_common(p, tbl, tb->root, ret, tb);
}

int db_next_tree_common(Process *p, DbTable *tbl,
                        TreeDbTerm *root, Eterm key,
                        Eterm *ret, DbTreeStack* stack)
{
    TreeDbTerm *this;

    if (is_atom(key) && key == am_EOT)
	return DB_ERROR_BADKEY;
    this = find_next(&tbl->common, root, stack, key);
    if (this == NULL) {
	*ret = am_EOT;
	return DB_ERROR_NONE;
    }
    *ret = db_copy_key(p, tbl, &this->dbterm);
    return DB_ERROR_NONE;
}

static int db_next_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    DbTreeStack* stack = get_any_stack(tbl, tb);
    int ret_val = db_next_tree_common(p, tbl, tb->root, key, ret, stack);
    release_stack(tbl,tb,stack);
    return ret_val;
}

int db_last_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
                        Eterm *ret, DbTableTree *stack_container)
{
    TreeDbTerm *this;
    DbTreeStack* stack;

    if (( this = root ) == NULL) {
	*ret = am_EOT;
	return DB_ERROR_NONE;
    }
    /* Walk down the tree to the right */
    if ((stack = get_static_stack(stack_container)) != NULL) {
	stack->pos = stack->slot = 0;
    }    
    while (this->right != NULL) {
	if (stack) PUSH_NODE(stack, this);
	this = this->right;
    }
    if (stack) {
	PUSH_NODE(stack, this);
	stack->slot = NITEMS(tbl);
	release_stack(tbl,stack_container,stack);
    }
    *ret = db_copy_key(p, tbl, &this->dbterm);
    return DB_ERROR_NONE;
}

static int db_last_tree(Process *p, DbTable *tbl, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    return db_last_tree_common(p, tbl, tb->root, ret, tb);
}

int db_prev_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, Eterm key,
                 Eterm *ret, DbTreeStack* stack)
{
    TreeDbTerm *this;

    if (is_atom(key) && key == am_EOT)
	return DB_ERROR_BADKEY;
    this = find_prev(&tbl->common, root, stack, key);
    if (this == NULL) {
	*ret = am_EOT;
	return DB_ERROR_NONE;
    }
    *ret = db_copy_key(p, tbl, &this->dbterm);
    return DB_ERROR_NONE;
}

static int db_prev_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    DbTreeStack* stack = get_any_stack(tbl, tb);
    int res = db_prev_tree_common(p, tbl, tb->root, key, ret, stack);
    release_stack(tbl,tb,stack);
    return res;
}

static ERTS_INLINE int cmp_key_eq(DbTableCommon* tb, Eterm key, TreeDbTerm* obj) {
    Eterm obj_key = GETKEY(tb,obj->dbterm.tpl);
    return is_same(key, obj_key) || CMP(key, obj_key) == 0;
}

int db_put_tree_common(DbTableCommon *tb, TreeDbTerm **root, Eterm obj,
                       int key_clash_fail, DbTableTree *stack_container)
{
    /* Non recursive insertion in AVL tree, building our own stack */
    TreeDbTerm **tstack[STACK_NEED];
    int tpos = 0;
    int dstack[STACK_NEED+1];
    int dpos = 0;
    int state = 0;
    TreeDbTerm **this = root;
    Sint c;
    Eterm key;
    int dir;
    TreeDbTerm *p1, *p2, *p;

    key = GETKEY(tb, tuple_val(obj));

    reset_static_stack(stack_container);

    dstack[dpos++] = DIR_END;
    for (;;)
	if (!*this) { /* Found our place */
	    state = 1;
	    if (erts_atomic_inc_read_nob(&tb->nitems) >= TREE_MAX_ELEMENTS) {
		erts_atomic_dec_nob(&tb->nitems);
		return DB_ERROR_SYSRES;
	    }
	    *this = new_dbterm(tb, obj);
	    (*this)->balance = 0;
	    (*this)->left = (*this)->right = NULL;
	    break;
	} else if ((c = cmp_key(tb, key, *this)) < 0) {
	    /* go lefts */
	    dstack[dpos++] = DIR_LEFT;
	    tstack[tpos++] = this;
	    this = &((*this)->left);
	} else if (c > 0) { /* go right */
	    dstack[dpos++] = DIR_RIGHT;
	    tstack[tpos++] = this;
	    this = &((*this)->right);
	} else if (!key_clash_fail) { /* Equal key and this is a set, replace. */
	    *this = replace_dbterm(tb, *this, obj);
	    break;
	} else {
	    return DB_ERROR_BADKEY; /* key already exists */
	}

    while (state && ( dir = dstack[--dpos] ) != DIR_END) {
	this = tstack[--tpos];
	p = *this;
	if (dir == DIR_LEFT) {
	    switch (p->balance) {
	    case 1:
		p->balance = 0;
		state = 0;
		break;
	    case 0:
		p->balance = -1;
		break;
	    case -1: /* The icky case */
		p1 = p->left;
		if (p1->balance == -1) { /* Single LL rotation */
		    p->left = p1->right;
		    p1->right = p;
		    p->balance = 0;
		    (*this) = p1;
		} else { /* Double RR rotation */
		    p2 = p1->right;
		    p1->right = p2->left;
		    p2->left = p1;
		    p->left = p2->right;
		    p2->right = p;
		    p->balance = (p2->balance == -1) ? +1 : 0;
		    p1->balance = (p2->balance == 1) ? -1 : 0;
		    (*this) = p2;
		}
		(*this)->balance = 0;
		state = 0;
		break;
	    }
	} else { /* dir == DIR_RIGHT */
	    switch (p->balance) {
	    case -1:
		p->balance = 0;
		state = 0;
		break;
	    case 0:
		p->balance = 1;
		break;
	    case 1:
		p1 = p->right;
		if (p1->balance == 1) { /* Single RR rotation */
		    p->right = p1->left;
		    p1->left = p;
		    p->balance = 0;
		    (*this) = p1;
		} else { /* Double RL rotation */
		    p2 = p1->left;
		    p1->left = p2->right;
		    p2->right = p1;
		    p->right = p2->left;
		    p2->left = p;
		    p->balance = (p2->balance == 1) ? -1 : 0;
		    p1->balance = (p2->balance == -1) ? 1 : 0;
		    (*this) = p2;
		}
		(*this)->balance = 0; 
		state = 0;
		break;
	    }
	}
    }
    return DB_ERROR_NONE;
}

static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail)
{
    DbTableTree *tb = &tbl->tree;
    return db_put_tree_common(&tb->common, &tb->root, obj, key_clash_fail, tb);
}

int db_get_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm *root, Eterm key,
                       Eterm *ret, DbTableTree *stack_container)
{
    Eterm copy;
    Eterm *hp, *hend;
    TreeDbTerm *this;

    /*
     * This is always a set, so we know exactly how large
     * the data is when we have found it.
     * The list created around it is purely for interface conformance.
     */
    
    this = find_node(tb,root,key,stack_container);
    if (this == NULL) {
	*ret = NIL;
    } else {
	hp = HAlloc(p, this->dbterm.size + 2);
	hend = hp + this->dbterm.size + 2;
	copy = db_copy_object_from_ets(tb, &this->dbterm, &hp, &MSO(p));
	*ret = CONS(hp, copy, NIL);
	hp += 2;
	HRelease(p,hend,hp);
    }
    return DB_ERROR_NONE;
}

static int db_get_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    return db_get_tree_common(p, &tb->common, tb->root, key, ret, tb);
}

int db_member_tree_common(DbTableCommon *tb, TreeDbTerm *root, Eterm key, Eterm *ret,
                          DbTableTree *stack_container)
{
    *ret = (find_node(tb,root,key,stack_container) == NULL) ? am_false : am_true;
    return DB_ERROR_NONE;
}

static int db_member_tree(DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    return db_member_tree_common(&tb->common, tb->root, key, ret, tb);
}

int db_get_element_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm *root, Eterm key,
                               int ndex, Eterm *ret, DbTableTree *stack_container)
{
    /*
     * Look the node up:
     */
    Eterm *hp;
    TreeDbTerm *this;

    /*
     * This is always a set, so we know exactly how large
     * the data is when we have found it.
     * No list is created around elements in set's so there are no list
     * around the element here either.
     */
    
    this = find_node(tb,root,key,stack_container);
    if (this == NULL) {
	return DB_ERROR_BADKEY;
    } else {
	if (ndex > arityval(this->dbterm.tpl[0])) {
	    return DB_ERROR_BADPARAM;
	}
	*ret = db_copy_element_from_ets(tb, p, &this->dbterm, ndex, &hp, 0);
    }
    return DB_ERROR_NONE;
}

static int db_get_element_tree(Process *p, DbTable *tbl,
			       Eterm key, int ndex, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    return db_get_element_tree_common(p, &tb->common, tb->root, key,
                                      ndex, ret, tb);
}

int db_erase_tree_common(DbTable *tbl, TreeDbTerm **root, Eterm key, Eterm *ret,
                         DbTreeStack *stack /* NULL if no static stack */)
{
    TreeDbTerm *res;

    *ret = am_true;

    if ((res = linkout_tree(&tbl->common, root,key, stack)) != NULL) {
	free_term(tbl, res);
    }
    return DB_ERROR_NONE;
}

static int db_erase_tree(DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    return db_erase_tree_common(tbl, &tb->root, key, ret, &tb->static_stack);
}

int db_erase_object_tree_common(DbTable *tbl, TreeDbTerm **root, Eterm object,
                                Eterm *ret, DbTableTree *stack_container)
{
    TreeDbTerm *res;

    *ret = am_true;

    if ((res = linkout_object_tree(&tbl->common, root, object, stack_container)) != NULL) {
	free_term(tbl, res);
    }
    return DB_ERROR_NONE;
}

static int db_erase_object_tree(DbTable *tbl, Eterm object, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    return  db_erase_object_tree_common(tbl, &tb->root, object, ret, tb);
}

int db_slot_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
                        Eterm slot_term, Eterm *ret,
                        DbTableTree *stack_container)
{
    Sint slot;
    TreeDbTerm *st;
    Eterm *hp, *hend;
    Eterm copy;

    /*
     * The notion of a "slot" is not natural in a tree, but we try to
     * simulate it by giving the n'th node in the tree instead.
     * Traversing a tree in this way is not very convenient, but by
     * using the saved stack we at least sometimes will get acceptable 
     * performance.
     */

    if (is_not_small(slot_term) ||
	((slot = signed_val(slot_term)) < 0) ||
	(slot > NITEMS(tbl)))
	return DB_ERROR_BADPARAM;

    if (slot == NITEMS(tbl)) {
	*ret = am_EOT;
	return DB_ERROR_NONE;
    }

    /* 
     * We use the slot position and search from there, slot positions 
     * are counted from 1 and up.
     */
    ++slot;
    st = slot_search(p, root, slot, tbl, stack_container); 
    if (st == NULL) {
	*ret = am_false;
	return DB_ERROR_UNSPEC;
    }
    hp = HAlloc(p, st->dbterm.size + 2);
    hend = hp + st->dbterm.size + 2;
    copy = db_copy_object_from_ets(&tbl->common, &st->dbterm, &hp, &MSO(p));
    *ret = CONS(hp, copy, NIL);
    hp += 2;
    HRelease(p,hend,hp);
    return DB_ERROR_NONE;
}

static int db_slot_tree(Process *p, DbTable *tbl, 
			Eterm slot_term, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    return db_slot_tree_common(p, tbl, tb->root, slot_term, ret, tb);
}



static BIF_RETTYPE ets_select_reverse(BIF_ALIST_3)
{
    Process *p = BIF_P;
    Eterm a1 = BIF_ARG_1;
    Eterm a2 = BIF_ARG_2;
    Eterm a3 = BIF_ARG_3;
    Eterm list;
    Eterm result;
    Eterm* hp;
    Eterm* hend;

    int max_iter = CONTEXT_REDS * 10;

    if (is_nil(a1)) {
	hp = HAlloc(p, 3);
	BIF_RET(TUPLE2(hp,a2,a3));
    } else if (is_not_list(a1)) {
    error:
	BIF_ERROR(p, BADARG);
    }
    
    list = a1;
    result = a2;
    hp = hend = NULL;
    while (is_list(list)) {
	Eterm* pair = list_val(list);
	if (--max_iter == 0) {
	    BUMP_ALL_REDS(p);
	    HRelease(p, hend, hp);
	    BIF_TRAP3(&ets_select_reverse_exp, p, list, result, a3);
	}
	if (hp == hend) {
	    hp = HAlloc(p, 64);
	    hend = hp + 64;
	}
	result = CONS(hp, CAR(pair), result);
	hp += 2;
	list = CDR(pair);
    }
    if (is_not_nil(list))  {
	goto error;
    }
    HRelease(p, hend, hp);
    BUMP_REDS(p,CONTEXT_REDS - max_iter / 10);
    hp = HAlloc(p,3);
    BIF_RET(TUPLE2(hp, result, a3));
}

static BIF_RETTYPE bif_trap1(Export *bif,
			     Process *p, 
			     Eterm p1) 
{
    BIF_TRAP1(bif, p, p1);
}
    
static BIF_RETTYPE bif_trap3(Export *bif,
			     Process *p, 
			     Eterm p1, 
			     Eterm p2,
			     Eterm p3) 
{
    BIF_TRAP3(bif, p, p1, p2, p3);
}

int db_select_continue_tree_common(Process *p, 
                                   DbTableCommon *tb,
                                   TreeDbTerm **root,
                                   Eterm continuation,
                                   Eterm *ret,
                                   DbTableTree *stack_container)
{
    DbTreeStack* stack;
    struct select_context sc;
    unsigned sz;
    Eterm *hp; 
    Eterm lastkey;
    Eterm end_condition; 
    Binary *mp;
    Eterm key;
    Eterm *tptr;
    Sint chunk_size;
    Sint reverse;


#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);

    /* Decode continuation. We know it's a tuple but not the arity or 
       anything else */

    tptr = tuple_val(continuation);

    if (arityval(*tptr) != 8)
	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
    
    if (!is_small(tptr[4]) ||
	!(is_list(tptr[6]) || tptr[6] == NIL) || !is_small(tptr[7]) ||
	!is_small(tptr[8]))
	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
    
    lastkey = tptr[2];
    end_condition = tptr[3];
    mp = erts_db_get_match_prog_binary(tptr[5]);
    if (!mp)
	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
    chunk_size = signed_val(tptr[4]);

    sc.p = p;
    sc.accum = tptr[6];
    sc.mp = mp;
    sc.end_condition = NIL;
    sc.lastobj = NULL;
    sc.max = 1000;
    sc.keypos = tb->keypos;
    sc.chunk_size = chunk_size;
    reverse = unsigned_val(tptr[7]);
    sc.got = signed_val(tptr[8]);

    stack = get_any_stack((DbTable*)tb, stack_container);
    if (chunk_size) {
	if (reverse) {
	    traverse_backwards(tb, root, stack, lastkey, &doit_select_chunk, &sc);
	} else {
	    traverse_forward(tb, root, stack, lastkey, &doit_select_chunk, &sc);
	}
    } else {
	if (reverse) {
	    traverse_forward(tb, root, stack, lastkey, &doit_select, &sc);
	} else {
	    traverse_backwards(tb, root, stack, lastkey, &doit_select, &sc);
	}
    }
    release_stack((DbTable*)tb,stack_container,stack);

    BUMP_REDS(p, 1000 - sc.max);

    if (sc.max > 0 || (chunk_size && sc.got == chunk_size)) {
	if (chunk_size) {
	    Eterm *hp; 
	    unsigned sz;

	    if (sc.got < chunk_size || sc.lastobj == NULL) { 
		/* end of table, sc.lastobj may be NULL as we may have been
		   at the very last object in the table when trapping. */
		if (!sc.got) {
		    RET_TO_BIF(am_EOT, DB_ERROR_NONE);
		} else {
		    RET_TO_BIF(bif_trap3(&ets_select_reverse_exp, p,
					 sc.accum, NIL, am_EOT), 
			       DB_ERROR_NONE);
		}
	    }

	    key = GETKEY(tb, sc.lastobj);
	    sz = size_object(key);
	    hp = HAlloc(p, 9 + sz);
	    key = copy_struct(key, sz, &hp, &MSO(p));
	    continuation = TUPLE8
		(hp,
		 tptr[1],
		 key,
		 tptr[3], 
		 tptr[4],
		 tptr[5],
		 NIL,
		 tptr[7],
		 make_small(0));
	    RET_TO_BIF(bif_trap3(&ets_select_reverse_exp, p,
				 sc.accum, NIL, continuation), 
		       DB_ERROR_NONE);
	} else {
	    RET_TO_BIF(sc.accum, DB_ERROR_NONE);
	}
    }	
    key = GETKEY(tb, sc.lastobj);
    if (chunk_size) {
	if (end_condition != NIL && 
	    ((!reverse && cmp_partly_bound(end_condition,key) < 0) ||
	     (reverse && cmp_partly_bound(end_condition,key) > 0))) {
	    /* done anyway */
	    if (!sc.got) {
		RET_TO_BIF(am_EOT, DB_ERROR_NONE);
	    } else {
		RET_TO_BIF(bif_trap3(&ets_select_reverse_exp, p, 
				     sc.accum, NIL, am_EOT), 
			   DB_ERROR_NONE);
	    }
	}
    } else {
	if (end_condition != NIL && 
	    ((!reverse && cmp_partly_bound(end_condition,key) > 0) ||
	     (reverse && cmp_partly_bound(end_condition,key) < 0))) {
	    /* done anyway */
	    RET_TO_BIF(sc.accum,DB_ERROR_NONE);
	}
    }
    /* Not done yet, let's trap. */
    sz = size_object(key);
    hp = HAlloc(p, 9 + sz);
    key = copy_struct(key, sz, &hp, &MSO(p));
    continuation = TUPLE8
	(hp,
	 tptr[1],
	 key,
	 tptr[3], 
	 tptr[4],
	 tptr[5],
	 sc.accum,
	 tptr[7],
	 make_small(sc.got));
    RET_TO_BIF(bif_trap1(bif_export[BIF_ets_select_1], p, continuation), 
	       DB_ERROR_NONE);

#undef RET_TO_BIF
}
    
/*
** This is called either when the select bif traps or when ets:select/1 
** is called. It does mostly the same as db_select_tree and may in either case
** trap to itself again (via the ets:select/1 bif).
** Note that this is common for db_select_tree and db_select_chunk_tree.
*/
static int db_select_continue_tree(Process *p, 
				   DbTable *tbl,
				   Eterm continuation,
				   Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    return db_select_continue_tree_common(p, &tb->common, &tb->root,
                                          continuation, ret, tb);
}

int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
                          Eterm tid, Eterm pattern, int reverse, Eterm *ret,
                          DbTableTree *stack_container)
{
    /* Strategy: Traverse backwards to build resulting list from tail to head */
    DbTreeStack* stack;
    struct select_context sc;
    struct mp_info mpi;
    Eterm lastkey = THE_NON_VALUE;
    Eterm key;
    Eterm continuation;
    unsigned sz;
    Eterm *hp; 
    TreeDbTerm *this;
    int errcode;
    Eterm mpb;


#define RET_TO_BIF(Term,RetVal) do { 	       	\
	if (mpi.mp != NULL) {			\
	    erts_bin_free(mpi.mp);       	\
	}					\
	*ret = (Term); 				\
	return RetVal; 			        \
    } while(0)

    mpi.mp = NULL;

    sc.accum = NIL;
    sc.lastobj = NULL;
    sc.p = p;
    sc.max = 1000; 
    sc.end_condition = NIL;
    sc.keypos = tb->keypos;
    sc.got = 0;
    sc.chunk_size = 0;

    if ((errcode = analyze_pattern(tb, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
	RET_TO_BIF(NIL,errcode);
    }

    if (!mpi.something_can_match) {
	RET_TO_BIF(NIL,DB_ERROR_NONE);  
	/* can't possibly match anything */
    }

    sc.mp = mpi.mp;

    if (!mpi.got_partial && mpi.some_limitation && 
	CMP_EQ(mpi.least,mpi.most)) {
	doit_select(tb,*(mpi.save_term),&sc,0 /* direction doesn't matter */);
	RET_TO_BIF(sc.accum,DB_ERROR_NONE);
    }

    stack = get_any_stack((DbTable*)tb,stack_container);
    if (reverse) {
	if (mpi.some_limitation) {
	    if ((this = find_prev_from_pb_key(tb, *root, stack, mpi.least)) != NULL) {
		lastkey = GETKEY(tb, this->dbterm.tpl);
	    }
	    sc.end_condition = mpi.most;
	}
	traverse_forward(tb, root, stack, lastkey, &doit_select, &sc);
    } else {
	if (mpi.some_limitation) {
	    if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
		lastkey = GETKEY(tb, this->dbterm.tpl);
	    }
	    sc.end_condition = mpi.least;
	}
	traverse_backwards(tb, root, stack, lastkey, &doit_select, &sc);
    }
    release_stack((DbTable*)tb,stack_container,stack);
#ifdef HARDDEBUG
	erts_fprintf(stderr,"Least: %T\n", mpi.least);
	erts_fprintf(stderr,"Most: %T\n", mpi.most);
#endif
    BUMP_REDS(p, 1000 - sc.max);
    if (sc.max > 0) {
	RET_TO_BIF(sc.accum,DB_ERROR_NONE);
    }

    key = GETKEY(tb, sc.lastobj);
    sz = size_object(key);
    hp = HAlloc(p, 9 + sz + ERTS_MAGIC_REF_THING_SIZE);
    key = copy_struct(key, sz, &hp, &MSO(p));
    mpb= erts_db_make_match_prog_ref(p,mpi.mp,&hp);
	    
    continuation = TUPLE8
	(hp,
	 tid,
	 key,
	 sc.end_condition, /* From the match program, needn't be copied */
	 make_small(0), /* Chunk size of zero means not chunked to the
			   continuation BIF */
	 mpb,
	 sc.accum,
	 make_small(reverse),
	 make_small(sc.got));

    /* Don't free mpi.mp, so don't use macro */
    *ret = bif_trap1(bif_export[BIF_ets_select_1], p, continuation); 
    return DB_ERROR_NONE;

#undef RET_TO_BIF

}

static int db_select_tree(Process *p, DbTable *tbl, Eterm tid,
			  Eterm pattern, int reverse, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    return db_select_tree_common(p, &tb->common, &tb->root, tid,
                                 pattern, reverse, ret, tb);
}

int db_select_count_continue_tree_common(Process *p, 
                                         DbTableCommon *tb,
                                         TreeDbTerm **root,
                                         Eterm continuation,
                                         Eterm *ret,
                                         DbTableTree *stack_container)
{
    DbTreeStack* stack;
    struct select_count_context sc;
    unsigned sz;
    Eterm *hp; 
    Eterm lastkey;
    Eterm end_condition; 
    Binary *mp;
    Eterm key;
    Eterm *tptr;
    Eterm egot;


#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);

    /* Decode continuation. We know it's a tuple and everything else as
     this is only called by ourselves */

    /* continuation: 
       {Table, Lastkey, EndCondition, MatchProgBin, HowManyGot}*/

    tptr = tuple_val(continuation);

    if (arityval(*tptr) != 5)
	erts_exit(ERTS_ERROR_EXIT,"Internal error in ets:select_count/1");
    
    lastkey = tptr[2];
    end_condition = tptr[3];
    mp = erts_db_get_match_prog_binary(tptr[4]);
    if (!mp)
	RET_TO_BIF(NIL,DB_ERROR_BADPARAM);

    sc.p = p;
    sc.mp = mp;
    sc.end_condition = NIL;
    sc.lastobj = NULL;
    sc.max = 1000;
    sc.keypos = tb->keypos;
    if (is_big(tptr[5])) {
	sc.got = big_to_uint32(tptr[5]);
    } else {
	sc.got = unsigned_val(tptr[5]);
    }

    stack = get_any_stack((DbTable*)tb, stack_container);
    traverse_backwards(tb, root, stack, lastkey, &doit_select_count, &sc);
    release_stack((DbTable*)tb,stack_container,stack);

    BUMP_REDS(p, 1000 - sc.max);

    if (sc.max > 0) {
	RET_TO_BIF(erts_make_integer(sc.got,p), DB_ERROR_NONE);
    }	
    key = GETKEY(tb, sc.lastobj);
    if (end_condition != NIL && 
	(cmp_partly_bound(end_condition,key) > 0)) {
	/* done anyway */
	RET_TO_BIF(make_small(sc.got),DB_ERROR_NONE);
    }
    /* Not done yet, let's trap. */
    sz = size_object(key);
    if (IS_USMALL(0, sc.got)) {
	hp = HAlloc(p, sz + 6);
	egot = make_small(sc.got);
    }
    else {
	hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + 6);
	egot = uint_to_big(sc.got, hp);
	hp += BIG_UINT_HEAP_SIZE;
    }
    key = copy_struct(key, sz, &hp, &MSO(p));
    continuation = TUPLE5
	(hp,
	 tptr[1],
	 key,
	 tptr[3], 
	 tptr[4],
	 egot);
    RET_TO_BIF(bif_trap1(&ets_select_count_continue_exp, p, continuation), 
	       DB_ERROR_NONE);

#undef RET_TO_BIF
}

/*
** This is called either when the select_count bif traps.
*/
static int db_select_count_continue_tree(Process *p, 
                                         DbTable *tbl,
                                         Eterm continuation,
                                         Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    return db_select_count_continue_tree_common(p, &tb->common, &tb->root,
                                                continuation, ret, tb);    
}


int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
                                Eterm tid, Eterm pattern, Eterm *ret,
                                DbTableTree *stack_container)
{
    DbTreeStack* stack;
    struct select_count_context sc;
    struct mp_info mpi;
    Eterm lastkey = THE_NON_VALUE;
    Eterm key;
    Eterm continuation;
    unsigned sz;
    Eterm *hp; 
    TreeDbTerm *this;
    int errcode;
    Eterm egot;
    Eterm mpb;


#define RET_TO_BIF(Term,RetVal) do { 	       	\
	if (mpi.mp != NULL) {			\
	    erts_bin_free(mpi.mp);       	\
	}					\
	*ret = (Term); 				\
	return RetVal; 			        \
    } while(0)

    mpi.mp = NULL;

    sc.lastobj = NULL;
    sc.p = p;
    sc.max = 1000; 
    sc.end_condition = NIL;
    sc.keypos = tb->keypos;
    sc.got = 0;

    if ((errcode = analyze_pattern(tb, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
	RET_TO_BIF(NIL,errcode);
    }

    if (!mpi.something_can_match) {
	RET_TO_BIF(make_small(0),DB_ERROR_NONE);  
	/* can't possibly match anything */
    }

    sc.mp = mpi.mp;

    if (!mpi.got_partial && mpi.some_limitation && 
	CMP_EQ(mpi.least,mpi.most)) {
	doit_select_count(tb,*(mpi.save_term),&sc,0 /* dummy */);
	RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE);
    }

    stack = get_any_stack((DbTable*)tb, stack_container);
    if (mpi.some_limitation) {
	if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
	    lastkey = GETKEY(tb, this->dbterm.tpl);
	}
	sc.end_condition = mpi.least;
    }
    
    traverse_backwards(tb, root, stack, lastkey, &doit_select_count, &sc);
    release_stack((DbTable*)tb,stack_container,stack);
    BUMP_REDS(p, 1000 - sc.max);
    if (sc.max > 0) {
	RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE);
    }

    key = GETKEY(tb, sc.lastobj);
    sz = size_object(key);
    if (IS_USMALL(0, sc.got)) {
	hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6);
	egot = make_small(sc.got);
    }
    else {
	hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + ERTS_MAGIC_REF_THING_SIZE + 6);
	egot = uint_to_big(sc.got, hp);
	hp += BIG_UINT_HEAP_SIZE;
    }
    key = copy_struct(key, sz, &hp, &MSO(p));
    mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp);
	    
    continuation = TUPLE5
	(hp,
	 tid,
	 key,
	 sc.end_condition, /* From the match program, needn't be copied */
	 mpb,
	 egot);

    /* Don't free mpi.mp, so don't use macro */
    *ret = bif_trap1(&ets_select_count_continue_exp, p, continuation); 
    return DB_ERROR_NONE;

#undef RET_TO_BIF

}

static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid,
                                Eterm pattern, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    return db_select_count_tree_common(p, &tb->common, &tb->root,
                                       tid, pattern, ret, tb);
}


int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
                                Eterm tid, Eterm pattern, Sint chunk_size,
                                int reverse, Eterm *ret,
                                DbTableTree *stack_container)
{
    DbTreeStack* stack;
    struct select_context sc;
    struct mp_info mpi;
    Eterm lastkey = THE_NON_VALUE;
    Eterm key;
    Eterm continuation;
    unsigned sz;
    Eterm *hp; 
    TreeDbTerm *this;
    int errcode;
    Eterm mpb;


#define RET_TO_BIF(Term,RetVal) do { 		\
	if (mpi.mp != NULL) {			\
	    erts_bin_free(mpi.mp);		\
	}					\
	*ret = (Term); 				\
	return RetVal; 			        \
    } while(0)

    mpi.mp = NULL;

    sc.accum = NIL;
    sc.lastobj = NULL;
    sc.p = p;
    sc.max = 1000; 
    sc.end_condition = NIL;
    sc.keypos = tb->keypos;
    sc.got = 0;
    sc.chunk_size = chunk_size;

    if ((errcode = analyze_pattern(tb, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
	RET_TO_BIF(NIL,errcode);
    }

    if (!mpi.something_can_match) {
	RET_TO_BIF(am_EOT,DB_ERROR_NONE);
	/* can't possibly match anything */
    }

    sc.mp = mpi.mp;

    if (!mpi.got_partial && mpi.some_limitation && 
	CMP_EQ(mpi.least,mpi.most)) {
	doit_select(tb,*(mpi.save_term),&sc, 0 /* direction doesn't matter */);
	if (sc.accum != NIL) {
	    hp=HAlloc(p, 3);
	    RET_TO_BIF(TUPLE2(hp,sc.accum,am_EOT),DB_ERROR_NONE);
	} else {
	    RET_TO_BIF(am_EOT,DB_ERROR_NONE);
	}
    }

    stack = get_any_stack((DbTable*)tb,stack_container);
    if (reverse) {
	if (mpi.some_limitation) {
	    if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
		lastkey = GETKEY(tb, this->dbterm.tpl);
	    }
	    sc.end_condition = mpi.least;
	}
	traverse_backwards(tb, root, stack, lastkey, &doit_select_chunk, &sc);
    } else {
	if (mpi.some_limitation) {
	    if ((this = find_prev_from_pb_key(tb, *root, stack, mpi.least)) != NULL) {
		lastkey = GETKEY(tb, this->dbterm.tpl);
	    }
	    sc.end_condition = mpi.most;
	}
	traverse_forward(tb, root, stack, lastkey, &doit_select_chunk, &sc);
    }
    release_stack((DbTable*)tb,stack_container,stack);

    BUMP_REDS(p, 1000 - sc.max);
    if (sc.max > 0 || sc.got == chunk_size) {
	Eterm *hp; 
	unsigned sz;

	if (sc.got < chunk_size ||
	    sc.lastobj == NULL) { 
	    /* We haven't got all and we haven't trapped 
	       which should mean we are at the end of the 
	       table, sc.lastobj may be NULL if the table was empty */
	    
	    if (!sc.got) {
		RET_TO_BIF(am_EOT, DB_ERROR_NONE);
	    } else {
		RET_TO_BIF(bif_trap3(&ets_select_reverse_exp, p,
				     sc.accum, NIL, am_EOT), 
			   DB_ERROR_NONE);
	    }
	}

	key = GETKEY(tb, sc.lastobj);
	sz = size_object(key);
	hp = HAlloc(p, 9 + sz + ERTS_MAGIC_REF_THING_SIZE);
	key = copy_struct(key, sz, &hp, &MSO(p));
	mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp);
	
	continuation = TUPLE8
	    (hp,
	     tid,
	     key,
	     sc.end_condition, /* From the match program, 
				  needn't be copied */
	     make_small(chunk_size),
	     mpb,
	     NIL,
	     make_small(reverse),
	     make_small(0));
	/* Don't let RET_TO_BIF macro free mpi.mp*/
	*ret = bif_trap3(&ets_select_reverse_exp, p,
			 sc.accum, NIL, continuation);
	return DB_ERROR_NONE; 
    }

    key = GETKEY(tb, sc.lastobj);
    sz = size_object(key);
    hp = HAlloc(p, 9 + sz + ERTS_MAGIC_REF_THING_SIZE);
    key = copy_struct(key, sz, &hp, &MSO(p));

    mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp);
    continuation = TUPLE8
	(hp,
	 tid,
	 key,
	 sc.end_condition, /* From the match program, needn't be copied */
	 make_small(chunk_size),
	 mpb,
	 sc.accum,
	 make_small(reverse),
	 make_small(sc.got));
    /* Don't let RET_TO_BIF macro free mpi.mp*/
    *ret = bif_trap1(bif_export[BIF_ets_select_1], p, continuation);
    return DB_ERROR_NONE;

#undef RET_TO_BIF

}

static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
                                Eterm pattern, Sint chunk_size,
                                int reverse,
                                Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    return db_select_chunk_tree_common(p, &tb->common, &tb->root,
                                       tid, pattern, chunk_size,
                                       reverse, ret, tb);
}


int db_select_delete_continue_tree_common(Process *p,
                                          DbTable *tbl,
                                          TreeDbTerm **root,
                                          Eterm continuation,
                                          Eterm *ret,
                                          DbTreeStack* stack)
{
    struct select_delete_context sc;
    unsigned sz;
    Eterm *hp; 
    Eterm lastkey;
    Eterm end_condition; 
    Binary *mp;
    Eterm key;
    Eterm *tptr;
    Eterm eaccsum;


#define RET_TO_BIF(Term, State) do { 		\
	if (sc.erase_lastterm) {		\
	    free_term(tbl, sc.lastterm);		\
	}					\
	*ret = (Term); 				\
	return State; 				\
    } while(0);

    /* Decode continuation. We know it's correct, this can only be called
       by trapping */

    tptr = tuple_val(continuation);

    lastkey = tptr[2];
    end_condition = tptr[3];

    sc.erase_lastterm = 0; /* Before first RET_TO_BIF */
    sc.lastterm = NULL;

    mp = erts_db_get_match_prog_binary_unchecked(tptr[4]);
    sc.p = p;
    sc.tb = &tbl->common;
    sc.root = root;
    sc.stack = stack;
    if (is_big(tptr[5])) {
	sc.accum = big_to_uint32(tptr[5]);
    } else {
	sc.accum = unsigned_val(tptr[5]);
    }
    sc.mp = mp;
    sc.end_condition = NIL;
    sc.max = 1000;
    sc.keypos = tbl->common.keypos;

    traverse_backwards(&tbl->common, root, stack, lastkey, &doit_select_delete, &sc);

    BUMP_REDS(p, 1000 - sc.max);

    if (sc.max > 0) {
	RET_TO_BIF(erts_make_integer(sc.accum, p), DB_ERROR_NONE);
    }	
    key = GETKEY(&tbl->common, (sc.lastterm)->dbterm.tpl);
    if (end_condition != NIL && 
	cmp_partly_bound(end_condition,key) > 0) { /* done anyway */
	RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE);
    }
    /* Not done yet, let's trap. */
    sz = size_object(key);
    if (IS_USMALL(0, sc.accum)) {
	hp = HAlloc(p, sz + 6);
	eaccsum = make_small(sc.accum);
    }
    else {
	hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + 6);
	eaccsum = uint_to_big(sc.accum, hp);
	hp += BIG_UINT_HEAP_SIZE;
    }
    key = copy_struct(key, sz, &hp, &MSO(p));
    continuation = TUPLE5
	(hp,
	 tptr[1],
	 key,
	 tptr[3], 
	 tptr[4],
	 eaccsum);
    RET_TO_BIF(bif_trap1(&ets_select_delete_continue_exp, p, continuation), 
	       DB_ERROR_NONE);

#undef RET_TO_BIF
}

static int db_select_delete_continue_tree(Process *p, 
					  DbTable *tbl,
					  Eterm continuation,
					  Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    ASSERT(!erts_atomic_read_nob(&tb->is_stack_busy));
    return db_select_delete_continue_tree_common(p, tbl, &tb->root,
                                                 continuation, ret,
                                                 &tb->static_stack);
}

int db_select_delete_tree_common(Process *p, DbTable *tbl,
                                 TreeDbTerm **root,
                                 Eterm tid, Eterm pattern,
                                 Eterm *ret,
                                 DbTreeStack* stack)
{
    struct select_delete_context sc;
    struct mp_info mpi;
    Eterm lastkey = THE_NON_VALUE;
    Eterm key;
    Eterm continuation;
    unsigned sz;
    Eterm *hp; 
    TreeDbTerm *this;
    int errcode;
    Eterm mpb;
    Eterm eaccsum;

#define RET_TO_BIF(Term,RetVal) do { 	       	\
	if (mpi.mp != NULL) {			\
	    erts_bin_free(mpi.mp);       	\
	}					\
	if (sc.erase_lastterm) {                \
	    free_term(tbl, sc.lastterm);         \
	}                                       \
	*ret = (Term); 				\
	return RetVal; 			        \
    } while(0)

    mpi.mp = NULL;

    sc.accum = 0;
    sc.erase_lastterm = 0;
    sc.lastterm = NULL;
    sc.p = p;
    sc.max = 1000; 
    sc.end_condition = NIL;
    sc.keypos = tbl->common.keypos;
    sc.tb = &tbl->common;
    sc.root = root;
    sc.stack = stack;
    
    if ((errcode = analyze_pattern(&tbl->common, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
	RET_TO_BIF(0,errcode);
    }

    if (!mpi.something_can_match) {
	RET_TO_BIF(make_small(0),DB_ERROR_NONE);  
	/* can't possibly match anything */
    }

    sc.mp = mpi.mp;

    if (!mpi.got_partial && mpi.some_limitation && 
	CMP_EQ(mpi.least,mpi.most)) {
	doit_select_delete(&tbl->common,*(mpi.save_term),&sc, 0 /* direction doesn't
						      matter */);
	RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE);
    }

    if (mpi.some_limitation) {
	if ((this = find_next_from_pb_key(&tbl->common, *root, stack, mpi.most)) != NULL) {
	    lastkey = GETKEY(&tbl->common, this->dbterm.tpl);
	}
	sc.end_condition = mpi.least;
    }

    traverse_backwards(&tbl->common, root, stack, lastkey, &doit_select_delete, &sc);
    BUMP_REDS(p, 1000 - sc.max);

    if (sc.max > 0) {
	RET_TO_BIF(erts_make_integer(sc.accum,p), DB_ERROR_NONE);
    }

    key = GETKEY(&tbl->common, (sc.lastterm)->dbterm.tpl);
    sz = size_object(key);
    if (IS_USMALL(0, sc.accum)) {
	hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6);
	eaccsum = make_small(sc.accum);
    }
    else {
	hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + ERTS_MAGIC_REF_THING_SIZE + 6);
	eaccsum = uint_to_big(sc.accum, hp);
	hp += BIG_UINT_HEAP_SIZE;
    }
    key = copy_struct(key, sz, &hp, &MSO(p));
    mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp);
    
    continuation = TUPLE5
	(hp,
	 tid,
	 key,
	 sc.end_condition, /* From the match program, needn't be copied */
	 mpb,
	 eaccsum);

    /* Don't free mpi.mp, so don't use macro */
    if (sc.erase_lastterm) {
	free_term(tbl, sc.lastterm);
    }
    *ret = bif_trap1(&ets_select_delete_continue_exp, p, continuation); 
    return DB_ERROR_NONE;

#undef RET_TO_BIF

}

static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
				 Eterm pattern, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    return db_select_delete_tree_common(p, tbl, &tb->root, tid, pattern, ret,
                                        &tb->static_stack);
}

int db_select_replace_continue_tree_common(Process *p,
                                           DbTableCommon *tb,
                                           TreeDbTerm **root,
                                           Eterm continuation,
                                           Eterm *ret,
                                           DbTableTree *stack_container)
{
    DbTreeStack* stack;
    struct select_replace_context sc;
    unsigned sz;
    Eterm *hp;
    Eterm lastkey;
    Eterm end_condition;
    Binary *mp;
    Eterm key;
    Eterm *tptr;
    Eterm ereplaced;
    Sint prev_replaced;


#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);

    /* Decode continuation. We know it's a tuple and everything else as
       this is only called by ourselves */

    /* continuation:
       {Table, Lastkey, EndCondition, MatchProgBin, HowManyReplaced}*/

    tptr = tuple_val(continuation);

    if (arityval(*tptr) != 5)
        erts_exit(ERTS_ERROR_EXIT,"Internal error in ets:select_replace/1");

    lastkey = tptr[2];
    end_condition = tptr[3];
    mp = erts_db_get_match_prog_binary_unchecked(tptr[4]);

    sc.p = p;
    sc.mp = mp;
    sc.end_condition = NIL;
    sc.lastobj = NULL;
    sc.max = 1000;
    sc.keypos = tb->keypos;
    if (is_big(tptr[5])) {
        sc.replaced = big_to_uint32(tptr[5]);
    } else {
        sc.replaced = unsigned_val(tptr[5]);
    }
    prev_replaced = sc.replaced;

    stack = get_any_stack((DbTable*)tb,stack_container);
    traverse_update_backwards(tb, root, stack, lastkey, &doit_select_replace, &sc);
    release_stack((DbTable*)tb,stack_container,stack);

    // the more objects we've replaced, the more reductions we've consumed
    BUMP_REDS(p, MIN(2000, (1000 - sc.max) + (sc.replaced - prev_replaced)));

    if (sc.max > 0) {
        RET_TO_BIF(erts_make_integer(sc.replaced,p), DB_ERROR_NONE);
    }
    key = GETKEY(tb, sc.lastobj);
    if (end_condition != NIL &&
            (cmp_partly_bound(end_condition,key) > 0)) {
        /* done anyway */
        RET_TO_BIF(make_small(sc.replaced),DB_ERROR_NONE);
    }
    /* Not done yet, let's trap. */
    sz = size_object(key);
    if (IS_USMALL(0, sc.replaced)) {
        hp = HAlloc(p, sz + 6);
        ereplaced = make_small(sc.replaced);
    }
    else {
        hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + 6);
        ereplaced = uint_to_big(sc.replaced, hp);
        hp += BIG_UINT_HEAP_SIZE;
    }
    key = copy_struct(key, sz, &hp, &MSO(p));
    continuation = TUPLE5
        (hp,
         tptr[1],
         key,
         tptr[3],
         tptr[4],
         ereplaced);
    RET_TO_BIF(bif_trap1(&ets_select_replace_continue_exp, p, continuation),
            DB_ERROR_NONE);

#undef RET_TO_BIF
}

static int db_select_replace_continue_tree(Process *p,
                                           DbTable *tbl,
                                           Eterm continuation,
                                           Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    return db_select_replace_continue_tree_common(p, &tb->common, &tb->root,
                                                  continuation, ret, tb);
}

int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
                                  Eterm tid, Eterm pattern, Eterm *ret,
                                  DbTableTree *stack_container)
{
    DbTreeStack* stack;
    struct select_replace_context sc;
    struct mp_info mpi;
    Eterm lastkey = THE_NON_VALUE;
    Eterm key;
    Eterm continuation;
    unsigned sz;
    Eterm *hp;
    TreeDbTerm *this;
    int errcode;
    Eterm ereplaced;
    Eterm mpb;


#define RET_TO_BIF(Term,RetVal) do { 	       	\
	if (mpi.mp != NULL) {			\
	    erts_bin_free(mpi.mp);       	\
	}					\
	*ret = (Term); 				\
	return RetVal; 			        \
    } while(0)

    mpi.mp = NULL;

    sc.lastobj = NULL;
    sc.p = p;
    sc.tb = tb;
    sc.root = root;
    sc.max = 1000;
    sc.end_condition = NIL;
    sc.keypos = tb->keypos;
    sc.replaced = 0;

    if ((errcode = analyze_pattern(tb, root, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) {
        RET_TO_BIF(NIL,errcode);
    }

    if (!mpi.something_can_match) {
        RET_TO_BIF(make_small(0),DB_ERROR_NONE);
        /* can't possibly match anything */
    }

    sc.mp = mpi.mp;

    stack = get_static_stack(stack_container);
    if (!mpi.got_partial && mpi.some_limitation &&
            CMP_EQ(mpi.least,mpi.most)) {
        TreeDbTerm* term = *(mpi.save_term);
        doit_select_replace(tb,mpi.save_term,&sc,0 /* dummy */);
        if (stack != NULL) {
            if (TOP_NODE(stack) == term)
                // throw away potentially invalid reference
                REPLACE_TOP_NODE(stack, *(mpi.save_term));
            release_stack((DbTable*)tb,stack_container, stack);
        }
        RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE);
    }

    if (stack == NULL)
        stack = get_any_stack((DbTable*)tb,stack_container);

    if (mpi.some_limitation) {
        if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
            lastkey = GETKEY(tb, this->dbterm.tpl);
        }
        sc.end_condition = mpi.least;
    }

    traverse_update_backwards(tb, root, stack, lastkey, &doit_select_replace, &sc);
    release_stack((DbTable*)tb,stack_container,stack);
    // the more objects we've replaced, the more reductions we've consumed
    BUMP_REDS(p, MIN(2000, (1000 - sc.max) + sc.replaced));
    if (sc.max > 0) {
        RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE);
    }

    key = GETKEY(tb, sc.lastobj);
    sz = size_object(key);
    if (IS_USMALL(0, sc.replaced)) {
        hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6);
        ereplaced = make_small(sc.replaced);
    }
    else {
        hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + ERTS_MAGIC_REF_THING_SIZE + 6);
        ereplaced = uint_to_big(sc.replaced, hp);
        hp += BIG_UINT_HEAP_SIZE;
    }
    key = copy_struct(key, sz, &hp, &MSO(p));
    mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp);

    continuation = TUPLE5
        (hp,
         tid,
         key,
         sc.end_condition, /* From the match program, needn't be copied */
         mpb,
         ereplaced);

    /* Don't free mpi.mp, so don't use macro */
    *ret = bif_trap1(&ets_select_replace_continue_exp, p, continuation);
    return DB_ERROR_NONE;

#undef RET_TO_BIF

}

static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid,
                                  Eterm pattern, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    return db_select_replace_tree_common(p, &tb->common, &tb->root,
                                         tid, pattern, ret, tb);
}

int db_take_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root,
                        Eterm key, Eterm *ret,
                        DbTreeStack *stack /* NULL if no static stack */)
{
    TreeDbTerm *this;

    *ret = NIL;
    this = linkout_tree(&tbl->common, root, key, stack);
    if (this) {
        Eterm copy, *hp, *hend;

        hp = HAlloc(p, this->dbterm.size + 2);
        hend = hp + this->dbterm.size + 2;
        copy = db_copy_object_from_ets(&tbl->common,
                                       &this->dbterm, &hp, &MSO(p));
        *ret = CONS(hp, copy, NIL);
        hp += 2;
        HRelease(p, hend, hp);
        free_term(tbl, this);
    }
    return DB_ERROR_NONE;
}

static int db_take_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableTree *tb = &tbl->tree;
    return db_take_tree_common(p, tbl, &tb->root,
                               key, ret, &tb->static_stack);
}

/*
** Other interface routines (not directly coupled to one bif)
*/

void db_print_tree_common(fmtfn_t to, void *to_arg,
                          int show, TreeDbTerm *root, DbTable *tbl)
{
#ifdef TREE_DEBUG
    if (show)
	erts_print(to, to_arg, "\nTree data dump:\n"
		   "------------------------------------------------\n");
    do_dump_tree2(&tbl->common, to, to_arg, show, root, 0);
    if (show)
	erts_print(to, to_arg, "\n"
		   "------------------------------------------------\n");
#else
    erts_print(to, to_arg, "Ordered set (AVL tree), Elements: %d\n", NITEMS(tbl));
#endif
}

/* Display tree contents (for dump) */
static void db_print_tree(fmtfn_t to, void *to_arg,
			  int show,
			  DbTable *tbl)
{
    DbTableTree *tb = &tbl->tree;
    db_print_tree_common(to, to_arg, show, tb->root, tbl);
}

/* release all memory occupied by a single table */
static int db_free_empty_table_tree(DbTable *tbl)
{
    ASSERT(tbl->tree.root == NULL);
    while (db_free_table_continue_tree(tbl, ERTS_SWORD_MAX) < 0)
	;
    return 1;
}

static SWord db_free_table_continue_tree(DbTable *tbl, SWord reds)
{
    DbTableTree *tb = &tbl->tree;

    if (!tb->deletion) {
	tb->static_stack.pos = 0;
	tb->deletion = 1;
	PUSH_NODE(&tb->static_stack, tb->root);
    }
    reds = do_free_tree_continue(tb, reds);
    if (reds >= 0) {		/* Completely done. */
	erts_db_free(ERTS_ALC_T_DB_STK,
		     (DbTable *) tb,
		     (void *) tb->static_stack.array,
		     sizeof(TreeDbTerm *) * STACK_NEED);
	ASSERT((erts_atomic_read_nob(&tb->common.memory_size)
	       == sizeof(DbTable)) ||
               (erts_atomic_read_nob(&tb->common.memory_size)
                == (sizeof(DbTable) + sizeof(DbFixation))));
    }
    return reds;
}

static SWord db_delete_all_objects_tree(Process* p, DbTable* tbl, SWord reds)
{
    reds = db_free_table_continue_tree(tbl, reds);
    if (reds < 0)
        return reds;
    db_create_tree(p, tbl);
    erts_atomic_set_nob(&tbl->tree.common.nitems, 0);
    return reds;
}

static void do_db_tree_foreach_offheap(TreeDbTerm *,
				       void (*)(ErlOffHeap *, void *),
				       void *);

void db_foreach_offheap_tree_common(TreeDbTerm *root,
                                    void (*func)(ErlOffHeap *, void *),
                                    void * arg)
{
    do_db_tree_foreach_offheap(root, func, arg);
}

static void db_foreach_offheap_tree(DbTable *tbl,
				    void (*func)(ErlOffHeap *, void *),
				    void * arg)
{
    db_foreach_offheap_tree_common(tbl->tree.root, func, arg);
}


/*
** Functions for internal use
*/


static void
do_db_tree_foreach_offheap(TreeDbTerm *tdbt,
			   void (*func)(ErlOffHeap *, void *),
			   void * arg)
{
    ErlOffHeap tmp_offheap;
    if(!tdbt)
	return;
    do_db_tree_foreach_offheap(tdbt->left, func, arg);
    tmp_offheap.first = tdbt->dbterm.first_oh;
    tmp_offheap.overhead = 0;
    (*func)(&tmp_offheap, arg);
    tdbt->dbterm.first_oh = tmp_offheap.first;
    do_db_tree_foreach_offheap(tdbt->right, func, arg);
}

static TreeDbTerm *linkout_tree(DbTableCommon *tb, TreeDbTerm **root,
                                Eterm key, DbTreeStack *stack) {
    TreeDbTerm **tstack[STACK_NEED];
    int tpos = 0;
    int dstack[STACK_NEED+1];
    int dpos = 0;
    int state = 0;
    TreeDbTerm **this = root;
    Sint c;
    int dir;
    TreeDbTerm *q = NULL;

    /*
     * Somewhat complicated, deletion in an AVL tree,
     * The two helpers balance_left and balance_right are used to
     * keep the balance. As in insert, we do the stacking ourselves.
     */

    reset_stack(stack);
    dstack[dpos++] = DIR_END;
    for (;;) {
	if (!*this) { /* Failure */
	    return NULL;
	} else if ((c = cmp_key(tb, key, *this)) < 0) {
	    dstack[dpos++] = DIR_LEFT;
	    tstack[tpos++] = this;
	    this = &((*this)->left);
	} else if (c > 0) { /* go right */
	    dstack[dpos++] = DIR_RIGHT;
	    tstack[tpos++] = this;
	    this = &((*this)->right);
	} else { /* Equal key, found the one to delete*/
	    q = (*this);
	    if (q->right == NULL) {
		(*this) = q->left;
		state = 1;
	    } else if (q->left == NULL) {
		(*this) = q->right;
		state = 1;
	    } else {
		dstack[dpos++] = DIR_LEFT;
		tstack[tpos++] = this;
		state = delsub(this);
	    }
	    erts_atomic_dec_nob(&tb->nitems);
	    break;
	}
    }
    while (state && ( dir = dstack[--dpos] ) != DIR_END) {
	this = tstack[--tpos];
	if (dir == DIR_LEFT) {
	    state = tree_balance_left(this);
	} else {
	    state = tree_balance_right(this);
	}
    }
    return q;
}

static TreeDbTerm *linkout_object_tree(DbTableCommon *tb,  TreeDbTerm **root,
				       Eterm object, DbTableTree *stack)
{
    TreeDbTerm **tstack[STACK_NEED];
    int tpos = 0;
    int dstack[STACK_NEED+1];
    int dpos = 0;
    int state = 0;
    TreeDbTerm **this = root;
    Sint c;
    int dir;
    TreeDbTerm *q = NULL;
    Eterm key;

    /*
     * Somewhat complicated, deletion in an AVL tree,
     * The two helpers balance_left and balance_right are used to
     * keep the balance. As in insert, we do the stacking ourselves.
     */

    
    key = GETKEY(tb, tuple_val(object));

    reset_static_stack(stack);
    dstack[dpos++] = DIR_END;
    for (;;) {
	if (!*this) { /* Failure */
	    return NULL;
	} else if ((c = cmp_key(tb,key,*this)) < 0) {
	    dstack[dpos++] = DIR_LEFT;
	    tstack[tpos++] = this;
	    this = &((*this)->left);
	} else if (c > 0) { /* go right */
	    dstack[dpos++] = DIR_RIGHT;
	    tstack[tpos++] = this;
	    this = &((*this)->right);
	} else { /* Equal key, found the only possible matching object*/
	    if (!db_eq(tb,object,&(*this)->dbterm)) {
		return NULL;
	    }
	    q = (*this);
	    if (q->right == NULL) {
		(*this) = q->left;
		state = 1;
	    } else if (q->left == NULL) {
		(*this) = q->right;
		state = 1;
	    } else {
		dstack[dpos++] = DIR_LEFT;
		tstack[tpos++] = this;
		state = delsub(this);
	    }
	    erts_atomic_dec_nob(&tb->nitems);
	    break;
	}
    }
    while (state && ( dir = dstack[--dpos] ) != DIR_END) {
	this = tstack[--tpos];
	if (dir == DIR_LEFT) {
	    state = tree_balance_left(this);
	} else {
	    state = tree_balance_right(this);
	}
    }
    return q;
}

/*
** For the select functions, analyzes the pattern and determines which
** part of the tree should be searched. Also compiles the match program
*/
static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
                           extra_match_validator_t extra_validator, /* Optional callback */
                           struct mp_info *mpi)
{
    Eterm lst, tpl, ttpl;
    Eterm *matches,*guards, *bodies;
    Eterm sbuff[30];
    Eterm *buff = sbuff;
    Eterm *ptpl;
    int i;
    int num_heads = 0;
    Eterm key;
    Eterm partly_bound;
    int res;
    Eterm least = 0;
    Eterm most = 0;

    mpi->some_limitation = 1;
    mpi->got_partial = 0;
    mpi->something_can_match = 0;
    mpi->mp = NULL;
    mpi->save_term = NULL;

    for (lst = pattern; is_list(lst); lst = CDR(list_val(lst)))
	++num_heads;

    if (lst != NIL) {/* proper list... */
	return DB_ERROR_BADPARAM;
    }
    if (num_heads > 10) {
	buff = erts_alloc(ERTS_ALC_T_DB_TMP, sizeof(Eterm) * num_heads * 3);
    }

    matches = buff;
    guards = buff + num_heads;
    bodies = buff + (num_heads * 2);

    i = 0;
    for(lst = pattern; is_list(lst); lst = CDR(list_val(lst))) {
        Eterm match;
        Eterm guard;
        Eterm body;

	ttpl = CAR(list_val(lst));
	if (!is_tuple(ttpl)) {
	    if (buff != sbuff) { 
		erts_free(ERTS_ALC_T_DB_TMP, buff);
	    }
	    return DB_ERROR_BADPARAM;
	}
	ptpl = tuple_val(ttpl);
	if (ptpl[0] != make_arityval(3U)) {
	    if (buff != sbuff) { 
		erts_free(ERTS_ALC_T_DB_TMP, buff);
	    }
	    return DB_ERROR_BADPARAM;
	}
	matches[i] = match = tpl = ptpl[1];
	guards[i] = guard = ptpl[2];
	bodies[i] = body = ptpl[3];

        if(extra_validator != NULL && !extra_validator(tb->keypos, match, guard, body)) {
	    if (buff != sbuff) {
		erts_free(ERTS_ALC_T_DB_TMP, buff);
	    }
            return DB_ERROR_BADPARAM;
        }

	if (!is_list(body) || CDR(list_val(body)) != NIL ||
	    CAR(list_val(body)) != am_DollarUnderscore) {
	}
	++i;

	partly_bound = NIL;
	res = key_given(tb, root, tpl, &(mpi->save_term), &partly_bound);
	if ( res >= 0 ) {   /* Can match something */
	    key = 0;
	    mpi->something_can_match = 1;
	    if (res > 0) {
		key = GETKEY(tb,tuple_val(tpl)); 
	    } else if (partly_bound != NIL) {
		mpi->got_partial = 1;
		key = partly_bound;
	    } else {
		mpi->some_limitation = 0;
	    }
	    if (key != 0) {
		if (least == 0 || 
		    partly_bound_can_match_lesser(key,least)) {
		    least = key;
		}
		if (most == 0 || 
		    partly_bound_can_match_greater(key,most)) {
		    most = key;
		}
	    }
	}
    }
    mpi->least = least;
    mpi->most = most;

    /*
     * It would be nice not to compile the match_spec if nothing could match,
     * but then the select calls would not fail like they should on bad 
     * match specs that happen to specify non existent keys etc.
     */
    if ((mpi->mp = db_match_compile(matches, guards, bodies,
				    num_heads, DCOMP_TABLE, NULL)) 
	== NULL) {
	if (buff != sbuff) { 
	    erts_free(ERTS_ALC_T_DB_TMP, buff);
	}
	return DB_ERROR_BADPARAM;
    }
    if (buff != sbuff) { 
	erts_free(ERTS_ALC_T_DB_TMP, buff);
    }
    return DB_ERROR_NONE;
}

static SWord do_free_tree_continue(DbTableTree *tb, SWord reds)
{
    TreeDbTerm *root;
    TreeDbTerm *p;

    for (;;) {
	root = POP_NODE(&tb->static_stack);
	if (root == NULL) break;
	for (;;) {
	    if ((p = root->left) != NULL) {
		root->left = NULL;
		PUSH_NODE(&tb->static_stack, root);
		root = p;
	    } else if ((p = root->right) != NULL) {
		root->right = NULL;
		PUSH_NODE(&tb->static_stack, root);
		root = p;
	    } else {
		free_term((DbTable*)tb, root);
		if (--reds < 0) {
                    return reds;   /* Done enough for now */
                }
                break;
	    }
	}
    }
    return reds;
}

/*
 * Deletion helpers
 */
int tree_balance_left(TreeDbTerm **this) 
{
    TreeDbTerm *p, *p1, *p2;
    int b1, b2, h = 1;
    
    p = *this;
    switch (p->balance) {
    case -1:
	p->balance = 0;
	break;
    case 0:
	p->balance = 1;
	h = 0;
	break;
    case 1:
	p1 = p->right;
	b1 = p1->balance;
	if (b1 >= 0) { /* Single RR rotation */
	    p->right = p1->left;
	    p1->left = p;
	    if (b1 == 0) {
		p->balance = 1;
		p1->balance = -1;
		h = 0;
	    } else {
		p->balance = p1->balance = 0;
	    }
	    (*this) = p1;
	} else { /* Double RL rotation */
	    p2 = p1->left;
	    b2 = p2->balance;
	    p1->left = p2->right;
	    p2->right = p1;
	    p->right = p2->left;
	    p2->left = p;
	    p->balance = (b2 == 1) ? -1 : 0;
	    p1->balance = (b2 == -1) ? 1 : 0;
	    p2->balance = 0;
	    (*this) = p2;
	}
	break;
    }
    return h;
}

int tree_balance_right(TreeDbTerm **this) 
{
    TreeDbTerm *p, *p1, *p2;
    int b1, b2, h = 1;
    
    p = *this;
    switch (p->balance) {
    case 1:
	p->balance = 0;
	break;
    case 0:
	p->balance = -1;
	h = 0;
	break;
    case -1:
	p1 = p->left;
	b1 = p1->balance;
	if (b1 <= 0) { /* Single LL rotation */
	    p->left = p1->right;
	    p1->right = p;
	    if (b1 == 0) {
		p->balance = -1;
		p1->balance = 1;
		h = 0;
	    } else {
		p->balance = p1->balance = 0;
	    }
	    (*this) = p1;
	} else { /* Double LR rotation */
	    p2 = p1->right;
	    b2 = p2->balance;
	    p1->right = p2->left;
	    p2->left = p1;
	    p->left = p2->right;
	    p2->right = p;
	    p->balance = (b2 == -1) ? 1 : 0;
	    p1->balance = (b2 == 1) ? -1 : 0;
	    p2->balance = 0;
	    (*this) = p2;
	}
    }
    return h;
}

static int delsub(TreeDbTerm **this) 
{
    TreeDbTerm **tstack[STACK_NEED];
    int tpos = 0;
    TreeDbTerm *q = (*this);
    TreeDbTerm **r = &(q->left);
    int h;

    /*
     * Walk down the tree to the right and search 
     * for a void right child, pick that child out
     * and return it to be put in the deleted 
     * object's place.
     */
    
    while ((*r)->right != NULL) {
	tstack[tpos++] = r;
	r = &((*r)->right);
    }
    *this = *r;
    *r = (*r)->left;
    (*this)->left = q->left;
    (*this)->right = q->right;
    (*this)->balance = q->balance;
    tstack[0] = &((*this)->left);
    h = 1;
    while (tpos && h) {
	r = tstack[--tpos];
	h = tree_balance_right(r);
    }
    return h;
}

/*
 * Helper for db_slot
 */

static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root,
                               Sint slot, DbTable *tb,
                               DbTableTree *stack_container)
{
    TreeDbTerm *this;
    TreeDbTerm *tmp;
    DbTreeStack* stack = get_any_stack(tb,stack_container);
    ASSERT(stack != NULL);

    if (slot == 1) { /* Don't search from where we are if we are 
			looking for the first slot */
	stack->slot = 0;
    }

    if (stack->slot == 0) { /* clear stack if slot positions 
				are not recorded */
	stack->pos = 0;
    }
    if (EMPTY_NODE(stack)) {
	this = root;
	if (this == NULL)
	    goto done;
	while (this->left != NULL){
	    PUSH_NODE(stack, this);
	    this = this->left;
	}
	PUSH_NODE(stack, this);
	stack->slot = 1;
    }
    this = TOP_NODE(stack);
    while (stack->slot != slot && this != NULL) {
	if (slot > stack->slot) {
	    if (this->right != NULL) {
		this = this->right;
		while (this->left != NULL) {
		    PUSH_NODE(stack, this);
		    this = this->left;
		}
		PUSH_NODE(stack, this);
	    } else {
		for (;;) {
		    tmp = POP_NODE(stack);
		    this = TOP_NODE(stack);
		    if (this == NULL || this->left == tmp)
			break;
		}
	    }		
	    ++(stack->slot);
	} else {
	    if (this->left != NULL) {
		this = this->left;
		while (this->right != NULL) {
		    PUSH_NODE(stack, this);
		    this = this->right;
		}
		PUSH_NODE(stack, this);
	    } else {
		for (;;) {
		    tmp = POP_NODE(stack);
		    this = TOP_NODE(stack);
		    if (this == NULL || this->right == tmp)
			break;
		}
	    }		
	    --(stack->slot);
	}
    }
done:
    release_stack(tb,stack_container,stack);
    return this;
}

/*
 * Find next and previous in sort order
 */

static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root,
                             DbTreeStack* stack, Eterm key) {
    TreeDbTerm *this;
    TreeDbTerm *tmp;
    Sint c;

    if(( this = TOP_NODE(stack)) != NULL) {
	if (!cmp_key_eq(tb,key,this)) {
	    /* Start from the beginning */
	    stack->pos = stack->slot = 0;
	}
    }
    if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */
	if (( this = root ) == NULL)
	    return NULL;
	for (;;) {
	    PUSH_NODE(stack, this);
	    if (( c = cmp_key(tb,key,this) ) > 0) {
		if (this->right == NULL) /* We are at the previos 
					    and the element does
					    not exist */
		    break;
		else
		    this = this->right;
	    } else if (c < 0) {
		if (this->left == NULL) /* Done */
		    return this;
		else
		    this = this->left;
	    } else
		break;
	}
    }
    /* The next element from this... */
    if (this->right != NULL) {
	this = this->right;
	PUSH_NODE(stack,this);
	while (this->left != NULL) {
	    this = this->left;
	    PUSH_NODE(stack, this);
	}
	if (stack->slot > 0) 
	    ++(stack->slot);
    } else {
	do {
	    tmp = POP_NODE(stack);
	    if (( this = TOP_NODE(stack)) == NULL) {
		stack->slot = 0;
		return NULL;
	    }
	} while (this->right == tmp);
	if (stack->slot > 0) 
	    ++(stack->slot);
    }
    return this;
}

static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root,
                             DbTreeStack* stack, Eterm key) {
    TreeDbTerm *this;
    TreeDbTerm *tmp;
    Sint c;

    if(( this = TOP_NODE(stack)) != NULL) {
	if (!cmp_key_eq(tb,key,this)) {
	    /* Start from the beginning */
	    stack->pos = stack->slot = 0;
	}
    }
    if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */
	if (( this = root ) == NULL)
	    return NULL;
	for (;;) {
	    PUSH_NODE(stack, this);
	    if (( c = cmp_key(tb,key,this) ) < 0) {
		if (this->left == NULL) /* We are at the next 
					   and the element does
					   not exist */
		    break;
		else
		    this = this->left;
	    } else if (c > 0) {
		if (this->right == NULL) /* Done */
		    return this;
		else
		    this = this->right;
	    } else
		break;
	}
    }
    /* The previous element from this... */
    if (this->left != NULL) {
	this = this->left;
	PUSH_NODE(stack,this);
	while (this->right != NULL) {
	    this = this->right;
	    PUSH_NODE(stack, this);
	}
	if (stack->slot > 0) 
	    --(stack->slot);
    } else {
	do {
	    tmp = POP_NODE(stack);
	    if (( this = TOP_NODE(stack)) == NULL) {
		stack->slot = 0;
		return NULL;
	    }
	} while (this->left == tmp);
	if (stack->slot > 0) 
	    --(stack->slot);
    }
    return this;
}

static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
                                         DbTreeStack* stack, Eterm key)
{
    TreeDbTerm *this;
    TreeDbTerm *tmp;
    Sint c;

    /* spool the stack, we have to "re-search" */
    stack->pos = stack->slot = 0;
    if (( this = root ) == NULL)
	return NULL;
    for (;;) {
	PUSH_NODE(stack, this);
	if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl))) >= 0) {
	    if (this->right == NULL) {
		do {
		    tmp = POP_NODE(stack);
		    if (( this = TOP_NODE(stack)) == NULL) {
			return NULL;
		    }
		} while (this->right == tmp);
		return this;
	    } else
		this = this->right;
	} else /*if (c < 0)*/ {
	    if (this->left == NULL) /* Done */
		return this;
	    else
		this = this->left;
	} 
    }
}

static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
                                         DbTreeStack* stack, Eterm key)
{
    TreeDbTerm *this;
    TreeDbTerm *tmp;
    Sint c;

    /* spool the stack, we have to "re-search" */
    stack->pos = stack->slot = 0;
    if (( this = root ) == NULL)
	return NULL;
    for (;;) {
	PUSH_NODE(stack, this);
	if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl))) <= 0) {
	    if (this->left == NULL) {
		do {
		    tmp = POP_NODE(stack);
		    if (( this = TOP_NODE(stack)) == NULL) {
			return NULL;
		    }
		} while (this->left == tmp);
		return this;
	    } else
		this = this->left;
	} else /*if (c < 0)*/ {
	    if (this->right == NULL) /* Done */
		return this;
	    else
		this = this->right;
	} 
    }
}


/*
 * Just lookup a node
 */
static TreeDbTerm *find_node(DbTableCommon *tb, TreeDbTerm *root,
                             Eterm key, DbTableTree *stack_container)
{
    TreeDbTerm *this;
    Sint res;
    DbTreeStack* stack = get_static_stack(stack_container);

    if(!stack || EMPTY_NODE(stack)
       || !cmp_key_eq(tb, key, (this=TOP_NODE(stack)))) {

	this = root;
	while (this != NULL && (res = cmp_key(tb,key,this)) != 0) {
	    if (res < 0)
		this = this->left;
	    else
		this = this->right;
	}
    }
    if (stack) {
	release_stack((DbTable*)tb,stack_container,stack);
    }
    return this;
}

/*
 * Lookup a node and return the address of the node pointer in the tree
 */
static TreeDbTerm **find_node2(DbTableCommon *tb, TreeDbTerm **root, Eterm key)
{
    TreeDbTerm **this;
    Sint res;

    this = root;
    while ((*this) != NULL && (res = cmp_key(tb, key, *this)) != 0) {
	if (res < 0)
	    this = &((*this)->left);
	else
	    this = &((*this)->right);
    }
    if (*this == NULL)
	return NULL;
    return this;
}

/*
 * Find node and return the address of the node pointer (NULL if not found)
 * Tries to reuse the existing stack for performance.
 */

static TreeDbTerm **find_ptr(DbTableCommon *tb, TreeDbTerm **root,
                             DbTreeStack *stack, TreeDbTerm *this) {
    Eterm key = GETKEY(tb, this->dbterm.tpl);
    TreeDbTerm *tmp;
    TreeDbTerm *parent;
    Sint c;

    if(( tmp = TOP_NODE(stack)) != NULL) {
	if (!cmp_key_eq(tb,key,tmp)) {
	    /* Start from the beginning */
	    stack->pos = stack->slot = 0;
	}
    }
    if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */
	if (( tmp = *root ) == NULL)
	    return NULL;
	for (;;) {
	    PUSH_NODE(stack, tmp);
	    if (( c = cmp_key(tb,key,tmp) ) < 0) {
		if (tmp->left == NULL) /* We are at the next
					   and the element does
					   not exist */
		    break;
		else
		    tmp = tmp->left;
	    } else if (c > 0) {
		if (tmp->right == NULL) /* Done */
		    return NULL;
		else
		    tmp = tmp->right;
	    } else
		break;
	}
    }

    if (TOP_NODE(stack) != this)
        return NULL;

    parent = TOPN_NODE(stack, 1);
    if (parent == NULL)
        return ((this != *root) ? NULL : root);
    if (parent->left == this)
        return &(parent->left);
    if (parent->right == this)
        return &(parent->right);
    return NULL;
}

int db_lookup_dbterm_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root,
                                 Eterm key, Eterm obj, DbUpdateHandle* handle,
                                 DbTableTree *stack_container)
{
    TreeDbTerm **pp = find_node2(&tbl->common, root, key);
    int flags = 0;

    if (pp == NULL) {
        if (obj == THE_NON_VALUE) {
            return 0;
        } else {
            Eterm *objp = tuple_val(obj);
            int arity = arityval(*objp);
            Eterm *htop, *hend;

            ASSERT(arity >= tbl->common.keypos);
            htop = HAlloc(p, arity + 1);
            hend = htop + arity + 1;
            sys_memcpy(htop, objp, sizeof(Eterm) * (arity + 1));
            htop[tbl->common.keypos] = key;
            obj = make_tuple(htop);

            if (db_put_tree_common(&tbl->common, root,
                                   obj, 1, stack_container) != DB_ERROR_NONE) {
                return 0;
            }

            pp = find_node2(&tbl->common, root, key);
            ASSERT(pp != NULL);
            HRelease(p, hend, htop);
            flags |= DB_NEW_OBJECT;
        }
    }

    handle->tb = tbl;
    handle->dbterm = &(*pp)->dbterm;
    handle->flags = flags;
    handle->bp = (void**) pp;
    handle->new_size = (*pp)->dbterm.size;
    return 1;
}

static int
db_lookup_dbterm_tree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
                      DbUpdateHandle* handle)
{
    DbTableTree *tb = &tbl->tree;
    return db_lookup_dbterm_tree_common(p, tbl, &tb->root, key, obj, handle, tb);
}

void db_finalize_dbterm_tree_common(int cret, DbUpdateHandle *handle,
                                    DbTableTree *stack_container)
{
    DbTable *tbl = handle->tb;
    TreeDbTerm *bp = (TreeDbTerm *) *handle->bp;

    if (handle->flags & DB_NEW_OBJECT && cret != DB_ERROR_NONE) {
        Eterm ret;
        db_erase_tree(tbl, GETKEY(&tbl->common, bp->dbterm.tpl), &ret);
    } else if (handle->flags & DB_MUST_RESIZE) {
	db_finalize_resize(handle, offsetof(TreeDbTerm,dbterm));
        reset_static_stack(stack_container);

        free_term(tbl, bp);
    }
#ifdef DEBUG
    handle->dbterm = 0;
#endif
    return;
}   

static void
db_finalize_dbterm_tree(int cret, DbUpdateHandle *handle)
{
    DbTable *tbl = handle->tb;
    DbTableTree *tb = &tbl->tree;
    db_finalize_dbterm_tree_common(cret, handle, tb);
}

/*
 * Traverse the tree with a callback function, used by db_match_xxx
 */
static void traverse_backwards(DbTableCommon *tb,
                               TreeDbTerm **root,
			       DbTreeStack* stack,
			       Eterm lastkey,
			       int (*doit)(DbTableCommon *,
					   TreeDbTerm *,
					   void *,
					   int),
			       void *context) 
{
    TreeDbTerm *this, *next;

    if (lastkey == THE_NON_VALUE) {
	stack->pos = stack->slot = 0;
	if (( this = *root ) == NULL) {
	    return;
	}
	while (this != NULL) {
	    PUSH_NODE(stack, this);
	    this = this->right;
	}
	this = TOP_NODE(stack);
	next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
	if (!((*doit)(tb, this, context, 0)))
	    return;
    } else {
	next = find_prev(tb, *root, stack, lastkey);
    }

    while ((this = next) != NULL) {
	next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
	if (!((*doit)(tb, this, context, 0)))
	    return;
    }
}

/*
 * Traverse the tree with a callback function, used by db_match_xxx
 */
static void traverse_forward(DbTableCommon *tb,
                             TreeDbTerm **root,
			     DbTreeStack* stack,
			     Eterm lastkey,
			     int (*doit)(DbTableCommon *,
					 TreeDbTerm *,
					 void *,
					 int),
			     void *context) 
{
    TreeDbTerm *this, *next;

    if (lastkey == THE_NON_VALUE) {
	stack->pos = stack->slot = 0;
	if (( this = *root ) == NULL) {
	    return;
	}
	while (this != NULL) {
	    PUSH_NODE(stack, this);
	    this = this->left;
	}
	this = TOP_NODE(stack);
	next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
	if (!((*doit)(tb, this, context, 1)))
	    return;
    } else {
	next = find_next(tb, *root, stack, lastkey);
    }

    while ((this = next) != NULL) {
	next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
	if (!((*doit)(tb, this, context, 1)))
	    return;
    }
}

/*
 * Traverse the tree with an update callback function, used by db_select_replace
 */
static void traverse_update_backwards(DbTableCommon *tb,
                                      TreeDbTerm **root,
                                      DbTreeStack* stack,
                                      Eterm lastkey,
                                      int (*doit)(DbTableCommon*,
                                                  TreeDbTerm**,
                                                  void*,
                                                  int),
                                      void* context)
{
    int res;
    TreeDbTerm *this, *next, **this_ptr;

    if (lastkey == THE_NON_VALUE) {
        stack->pos = stack->slot = 0;
        if (( this = *root ) == NULL) {
            return;
        }
        while (this != NULL) {
            PUSH_NODE(stack, this);
            this = this->right;
        }
        this = TOP_NODE(stack);
        this_ptr = find_ptr(tb, root, stack, this);
        ASSERT(this_ptr != NULL);
        res = (*doit)(tb, this_ptr, context, 0);
        REPLACE_TOP_NODE(stack, *this_ptr);
        next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
        if (!res)
            return;
    } else {
        next = find_prev(tb, *root, stack, lastkey);
    }

    while ((this = next) != NULL) {
        this_ptr = find_ptr(tb, root, stack, this);
        ASSERT(this_ptr != NULL);
        res = (*doit)(tb, this_ptr, context, 0);
        REPLACE_TOP_NODE(stack, *this_ptr);
        next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
        if (!res)
            return;
    }
}

/*
 * Returns 0 if not given 1 if given and -1 on no possible match
 * if key is given; *ret is set to point to the object concerned.
 */
static int key_given(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
                     TreeDbTerm ***ret, Eterm *partly_bound)
{
    TreeDbTerm **this;
    Eterm key;

    ASSERT(ret != NULL);
    if (pattern == am_Underscore || db_is_variable(pattern) != -1)
	return 0;
    key = db_getkey(tb->keypos, pattern);
    if (is_non_value(key))
	return -1;  /* can't possibly match anything */
    if (!db_has_variable(key)) {   /* Bound key */
	if (( this = find_node2(tb, root, key) ) == NULL) {
	    return -1;
	}
	*ret = this;
	return 1;
    } else if (partly_bound != NULL && key != am_Underscore && 
	       db_is_variable(key) < 0 && !db_has_map(key))
	*partly_bound = key;
	
    return 0;
}



static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done)
{
    Eterm* aa;
    Eterm* bb;
    Eterm a_hdr;
    Eterm b_hdr;
    int i;
    Sint j;

    /* A variable matches anything */
    if (is_atom(a) && (a == am_Underscore || (db_is_variable(a) >= 0))) {
	*done = 1;
	return 0;
    }
    if (is_same(a,b))
	return 0;
    
    switch (a & _TAG_PRIMARY_MASK) {
    case TAG_PRIMARY_LIST:
	if (!is_list(b)) {
	    return CMP(a,b);
	}
	aa = list_val(a);
	bb = list_val(b);
	while (1) {
	    if ((j = do_cmp_partly_bound(*aa++, *bb++, done)) != 0 || *done)
		return j;
	    if (is_same(*aa, *bb))
		return 0;
	    if (is_not_list(*aa) || is_not_list(*bb))
		return do_cmp_partly_bound(*aa, *bb, done);
	    aa = list_val(*aa);
	    bb = list_val(*bb);
	}
    case TAG_PRIMARY_BOXED:
	if ((b & _TAG_PRIMARY_MASK) != TAG_PRIMARY_BOXED) {
	    return CMP(a,b);
	}
	a_hdr = ((*boxed_val(a)) & _TAG_HEADER_MASK) >> _TAG_PRIMARY_SIZE;
	b_hdr = ((*boxed_val(b)) & _TAG_HEADER_MASK) >> _TAG_PRIMARY_SIZE;
	if (a_hdr != b_hdr) {
	    return CMP(a,b);
	}
	if (a_hdr == (_TAG_HEADER_ARITYVAL >> _TAG_PRIMARY_SIZE)) {
	    aa = tuple_val(a);
	    bb = tuple_val(b);
	    /* compare the arities */
	    i = arityval(*aa);	/* get the arity*/
	    if (i < arityval(*bb)) return(-1);
	    if (i > arityval(*bb)) return(1);
	    while (i--) {
		if ((j = do_cmp_partly_bound(*++aa, *++bb, done)) != 0
		    || *done) 
		    return j;
	    }
	    return 0;
	}
	/* Drop through */
      default:
	  return CMP(a,b);
    }
}

static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key) {
    int done = 0;
    Sint ret = do_cmp_partly_bound(partly_bound_key, bound_key, &done);
#ifdef HARDDEBUG
    erts_fprintf(stderr,"\ncmp_partly_bound: %T", partly_bound_key);
    if (ret < 0)
	erts_fprintf(stderr," < ");
    else if (ret > 0)
	erts_fprintf(stderr," > ");
    else
	erts_fprintf(stderr," == ");
    erts_fprintf(stderr,"%T\n", bound_key);
#endif
    return ret;
}

/*
** For partly_bound debugging....
**
BIF_RETTYPE ets_testnisse_2(BIF_ALIST_2)
BIF_ADECL_2
{
    Eterm r1 = make_small(partly_bound_can_match_lesser(BIF_ARG_1,
							BIF_ARG_2));
    Eterm r2 = make_small(partly_bound_can_match_greater(BIF_ARG_1,
							 BIF_ARG_2));
    Eterm *hp = HAlloc(BIF_P,3);
    Eterm ret;

    ret = TUPLE2(hp,r1,r2);
    BIF_RET(ret);
}
**
*/
static int partly_bound_can_match_lesser(Eterm partly_bound_1, 
					 Eterm partly_bound_2) 
{
    int done = 0;
    int ret = do_partly_bound_can_match_lesser(partly_bound_1, 
					       partly_bound_2, 
					       &done);
#ifdef HARDDEBUG
    erts_fprintf(stderr,"\npartly_bound_can_match_lesser: %T",partly_bound_1);
    if (ret)
	erts_fprintf(stderr," can match lesser than ");
    else
	erts_fprintf(stderr," cannot match lesser than ");
    erts_fprintf(stderr,"%T\n",partly_bound_2);
#endif
    return ret;
}

static int partly_bound_can_match_greater(Eterm partly_bound_1, 
					  Eterm partly_bound_2) 
{
    int done = 0;
    int ret = do_partly_bound_can_match_greater(partly_bound_1, 
						partly_bound_2, 
						&done);
#ifdef HARDDEBUG
    erts_fprintf(stderr,"\npartly_bound_can_match_greater: %T",partly_bound_1);
    if (ret)
	erts_fprintf(stderr," can match greater than ");
    else
	erts_fprintf(stderr," cannot match greater than ");
    erts_fprintf(stderr,"%T\n",partly_bound_2);
#endif
    return ret;
}

static int do_partly_bound_can_match_lesser(Eterm a, Eterm b, 
					    int *done)
{
    Eterm* aa;
    Eterm* bb;
    Sint i;
    int j;

    if (is_atom(a) && (a == am_Underscore || 
		       (db_is_variable(a) >= 0))) {
	*done = 1;
	if (is_atom(b) && (b == am_Underscore || 
			   (db_is_variable(b) >= 0))) {
	    return 0;
	} else {
	    return 1;
	}
    } else if (is_atom(b) && (b == am_Underscore || 
			      (db_is_variable(b) >= 0))) {
	*done = 1;
	return 0;
    }

    if (a == b)
	return 0;

    if (not_eq_tags(a,b)) {
	*done = 1;
	return (CMP(a, b) < 0) ? 1 : 0;
    }

    /* we now know that tags are the same */
    switch (tag_val_def(a)) {
    case TUPLE_DEF:
	aa = tuple_val(a);
	bb = tuple_val(b);
	/* compare the arities */
	if (arityval(*aa) < arityval(*bb)) return 1;
	if (arityval(*aa) > arityval(*bb)) return 0;
	i = arityval(*aa);	/* get the arity*/
	while (i--) {
	    if ((j = do_partly_bound_can_match_lesser(*++aa, *++bb, 
						      done)) != 0 
		|| *done) 
		return j;
	}
	return 0;
    case LIST_DEF:
	aa = list_val(a);
	bb = list_val(b);
	while (1) {
	    if ((j = do_partly_bound_can_match_lesser(*aa++, *bb++, 
						      done)) != 0 
		|| *done) 
		return j;
	    if (*aa==*bb)
		return 0;
	    if (is_not_list(*aa) || is_not_list(*bb))
		return do_partly_bound_can_match_lesser(*aa, *bb, 
							done);
	    aa = list_val(*aa);
	    bb = list_val(*bb);
	}
    default:
	if((i = CMP(a, b)) != 0) {
	    *done = 1;
	}
	return (i < 0) ? 1 : 0;
    }
}

static int do_partly_bound_can_match_greater(Eterm a, Eterm b, 
					    int *done)
{
    Eterm* aa;
    Eterm* bb;
    Sint i;
    int j;

    if (is_atom(a) && (a == am_Underscore || 
		       (db_is_variable(a) >= 0))) {
	*done = 1;
	if (is_atom(b) && (b == am_Underscore || 
			   (db_is_variable(b) >= 0))) {
	    return 0;
	} else {
	    return 1;
	}
    } else if (is_atom(b) && (b == am_Underscore || 
			      (db_is_variable(b) >= 0))) {
	*done = 1;
	return 0;
    }

    if (a == b)
	return 0;

    if (not_eq_tags(a,b)) {
	*done = 1;
	return (CMP(a, b) > 0) ? 1 : 0;
    }

    /* we now know that tags are the same */
    switch (tag_val_def(a)) {
    case TUPLE_DEF:
	aa = tuple_val(a);
	bb = tuple_val(b);
	/* compare the arities */
	if (arityval(*aa) < arityval(*bb)) return 0;
	if (arityval(*aa) > arityval(*bb)) return 1;
	i = arityval(*aa);	/* get the arity*/
	while (i--) {
	    if ((j = do_partly_bound_can_match_greater(*++aa, *++bb, 
						      done)) != 0 
		|| *done) 
		return j;
	}
	return 0;
    case LIST_DEF:
	aa = list_val(a);
	bb = list_val(b);
	while (1) {
	    if ((j = do_partly_bound_can_match_greater(*aa++, *bb++, 
						      done)) != 0 
		|| *done) 
		return j;
	    if (*aa==*bb)
		return 0;
	    if (is_not_list(*aa) || is_not_list(*bb))
		return do_partly_bound_can_match_greater(*aa, *bb, 
							done);
	    aa = list_val(*aa);
	    bb = list_val(*bb);
	}
    default:
	if((i = CMP(a, b)) != 0) {
	    *done = 1;
	}
	return (i > 0) ? 1 : 0;
    }
}

/*
 * Callback functions for the different match functions
 */

static int doit_select(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
		       int forward)
{
    struct select_context *sc = (struct select_context *) ptr;
    Eterm ret;
    Eterm* hp;

    sc->lastobj = this->dbterm.tpl;
    
    if (sc->end_condition != NIL && 
	((forward && 
	  cmp_partly_bound(sc->end_condition, 
			   GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) < 0) ||
	 (!forward && 
	  cmp_partly_bound(sc->end_condition, 
			   GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0))) {
	return 0;
    }
    ret = db_match_dbterm(tb, sc->p,sc->mp, &this->dbterm, &hp, 2);
    if (is_value(ret)) {
	sc->accum = CONS(hp, ret, sc->accum);
    }
    if (MBUF(sc->p)) {
	/*
	 * Force a trap and GC if a heap fragment was created. Many heap fragments
	 * make the GC slow.
	 */
	sc->max = 0;
    }
    if (--(sc->max) <= 0) {
	return 0;
    }
    return 1;
}

static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
			     int forward)
{
    struct select_count_context *sc = (struct select_count_context *) ptr;
    Eterm ret;

    sc->lastobj = this->dbterm.tpl;
    
    /* Always backwards traversing */
    if (sc->end_condition != NIL && 
	(cmp_partly_bound(sc->end_condition, 
			  GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0)) {
	return 0;
    }
    ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, NULL, 0);
    if (ret == am_true) {
	++(sc->got);
    }
    if (--(sc->max) <= 0) {
	return 0;
    }
    return 1;
}

static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
			     int forward)
{
    struct select_context *sc = (struct select_context *) ptr;
    Eterm ret;
    Eterm* hp;

    sc->lastobj = this->dbterm.tpl;
    
    if (sc->end_condition != NIL && 
	((forward && 
	  cmp_partly_bound(sc->end_condition, 
			   GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) < 0) ||
	 (!forward && 
	  cmp_partly_bound(sc->end_condition, 
			   GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0))) {
	return 0;
    }

    ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, &hp, 2);
    if (is_value(ret)) {
	++(sc->got);
	sc->accum = CONS(hp, ret, sc->accum);
    }
    if (MBUF(sc->p)) {
	/*
	 * Force a trap and GC if a heap fragment was created. Many heap fragments
	 * make the GC slow.
	 */
	sc->max = 0;
    }
    if (--(sc->max) <= 0 || sc->got == sc->chunk_size) {
	return 0;
    }
    return 1;
}


static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
			      int forward)
{
    struct select_delete_context *sc = (struct select_delete_context *) ptr;
    Eterm ret;
    Eterm key;

    if (sc->erase_lastterm)
	free_term((DbTable*)tb, sc->lastterm);
    sc->erase_lastterm = 0;
    sc->lastterm = this;
    
    if (sc->end_condition != NIL && 
	cmp_partly_bound(sc->end_condition, 
			 GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0)
	return 0;
    ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, NULL, 0);
    if (ret == am_true) {
	key = GETKEY(sc->tb, this->dbterm.tpl);
	linkout_tree(sc->tb, sc->root, key, sc->stack);
	sc->erase_lastterm = 1;
	++sc->accum;
    }
    if (--(sc->max) <= 0) {
	return 0;
    }
    return 1;
}

static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this, void *ptr,
                               int forward)
{
    struct select_replace_context *sc = (struct select_replace_context *) ptr;
    Eterm ret;

    sc->lastobj = (*this)->dbterm.tpl;

    /* Always backwards traversing */
    if (sc->end_condition != NIL &&
	(cmp_partly_bound(sc->end_condition,
			  GETKEY_WITH_POS(sc->keypos, (*this)->dbterm.tpl)) > 0)) {
	return 0;
    }
    ret = db_match_dbterm(tb, sc->p, sc->mp, &(*this)->dbterm, NULL, 0);

    if (is_value(ret)) {
        TreeDbTerm* new;
        TreeDbTerm* old = *this;
#ifdef DEBUG
        Eterm key = db_getkey(tb->keypos, ret);
        ASSERT(is_value(key));
        ASSERT(cmp_key(tb, key, old) == 0);
#endif
        new = new_dbterm(tb, ret);
        new->left = old->left;
        new->right = old->right;
        new->balance = old->balance;
        sc->lastobj = new->dbterm.tpl;
        *this = new;
        free_term((DbTable*)tb, old);
        ++(sc->replaced);
    }
    if (--(sc->max) <= 0) {
	return 0;
    }
    return 1;
}

#ifdef TREE_DEBUG
static void do_dump_tree2(DbTableCommon* tb, int to, void *to_arg, int show,
			  TreeDbTerm *t, int offset)
{
    if (t == NULL)
	return;
    do_dump_tree2(tb, to, to_arg, show, t->right, offset + 4);
    if (show) {
	const char* prefix;
	Eterm term;
	if (tb->compress) {
	    prefix = "key=";
	    term = GETKEY(tb, t->dbterm.tpl);
	}
	else {
	    prefix = "";
	    term = make_tuple(t->dbterm.tpl);
	}
	erts_print(to, to_arg, "%*s%s%T (addr = %p, bal = %d)\n",
		   offset, "", prefix, term, t, t->balance);
    }
    do_dump_tree2(tb, to, to_arg, show, t->left, offset + 4); 
}

#endif

#ifdef HARDDEBUG

/*
 * No called, but kept as it might come to use
 */
void db_check_table_tree(DbTable *tbl)
{
    DbTableTree *tb = &tbl->tree;
    check_table_tree(tb, tb->root);
    check_saved_stack(tb);
    check_slot_pos(tb);
}

static TreeDbTerm *traverse_until(TreeDbTerm *t, int *current, int to)
{
    TreeDbTerm *tmp;
    if (t == NULL) 
	return NULL;
    tmp = traverse_until(t->left, current, to);
    if (tmp != NULL)
	return tmp;
    ++(*current);
    if (*current == to)
	return t;
    return traverse_until(t->right, current, to);
}

static void check_slot_pos(DbTableTree *tb)
{
    int pos = 0;
    TreeDbTerm *t;
    if (tb->stack.slot == 0 || tb->stack.pos == 0)
	return;
    t = traverse_until(tb->root, &pos, tb->stack.slot);
    if (t != tb->stack.array[tb->stack.pos - 1]) {
	erts_fprintf(stderr, "Slot position does not correspont with stack, "
		   "element position %d is really 0x%08X, when stack says "
		   "it's 0x%08X\n", tb->stack.slot, t, 
		   tb->stack.array[tb->stack.pos - 1]);
	do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
    }
}
	

static void check_saved_stack(DbTableTree *tb)
{
     TreeDbTerm *t = tb->root;
     DbTreeStack* stack = &tb->static_stack;
     int n = 0;
     if (stack->pos == 0)
	 return;
     if (t != stack->array[0]) {
	 erts_fprintf(stderr,"tb->stack[0] is 0x%08X, should be 0x%08X\n",
		      stack->array[0], t);
	 do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
	 return;
     }
     while (n < stack->pos) {
	 if (t == NULL) {
	     erts_fprintf(stderr, "NULL pointer in tree when stack not empty,"
			" stack depth is %d\n", n);
	     do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
	     return;
	 }
	 n++;
	 if (n < stack->pos) {
	     if (stack->array[n] == t->left)
		 t = t->left;
	     else if (stack->array[n] == t->right)
		 t = t->right;
	     else {
		 erts_fprintf(stderr, "tb->stack[%d] == 0x%08X does not "
			    "represent child pointer in tree!"
			    "(left == 0x%08X, right == 0x%08X\n", 
			    n, tb->stack[n], t->left, t->right);
		 do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
		 return;
	     }
	 }
     }
}

static int check_table_tree(DbTableTree* tb, TreeDbTerm *t)
{
    int lh, rh;
    if (t == NULL)
	return 0;
    lh = check_table_tree(tb, t->left);
    rh = check_table_tree(tb, t->right);
    if ((rh - lh) != t->balance) {
	erts_fprintf(stderr, "Invalid tree balance for this node:\n");
	erts_fprintf(stderr,"balance = %d, left = 0x%08X, right = 0x%08X\n",
		     t->balance, t->left, t->right);
	erts_fprintf(stderr,"\nDump:\n---------------------------------\n");
	do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, t, 0);
	erts_fprintf(stderr,"\n---------------------------------\n");
    }
    return ((rh > lh) ? rh : lh) + 1;
}
	
#endif