aboutsummaryrefslogblamecommitdiffstats
path: root/erts/emulator/beam/erl_db_catree.c
blob: 2d213baa25cba4363ec0bafc6cbbc0fae66fc171 (plain) (tree)

















































                                                                           
                                             
   





























                                                                      





                       



                       


                                                                 






















                                                                         

                                                                    
                                                                      
                                                                                 

                                                                      
                                                                               
                                                              

                                                                    
                                                                    

                                                                          
                                                                       

                                                              
                                                                      

                                                                           
                                                                        

                                                              
                                                                      

                                                                            







                                                                     





                                                                                      




                                                                   





                                                                  










                                                            



























                                      
                                                        












                                  
                                                   
                                                    

                                                       


                                                           


                                                             




                                                       
                                                         

                                                         




                                                             
                                                                              

                                                                                                                      
                                                                            

                                                                                                                     


                                 
                                                                                    

                                                                                                                            
                                                                                  

                                                                                                                           
 




















                                                                                                   
                                               
                                                




                                                             












                                                                       
                                                














                                                                     
                                                  








                                                  
                                                                





















































































                                                                      
                                         



















                                                          
                                                               




































































































































































                                                                           
                   


                               
                   

                                
                                        





















                                                                 
                    


                               
                   

                               
                   





























                                                                
            
                                   
      
                                






                                     

                                                                      
 

                                                                 

               

                                 
                                                                             

              
                                                                             



              

                                           
 









































                                                                      









                                                           
                                                           
 

                                               






                                                                    
                                                  
 

                                                  

                                            
                                                                               
            
                                                                               



                  
                                                    
 




                                                 
                                                     


                                                       
                                                         
                                          

                                                                    
                                                                      
                                      
     
                                                                      

                                                                       

          
                                
     


                  
                                                  
 
                                    












                                                                               


                  
                                                            
 

                                                


                  
                                                   
 

                                             


                  
                                                     
 

                                               

 
                  




























                                                                  






                                                                    
                            



                                                  


                                                      



                                   
          
                                   


                                          







                                                    
                                                        



                                                                       




                                              




                                                    

                                            
                                                       
     
 
 






                              

                                                               

                                                              



                               
                                 



                                 





                                           
                

 









                                                                           
                                             
     

                     
 




                                                                           
 





                                                 
     

                     
 
                             
                               
     
                             
      
 


                                                             
                                                             
                                                            
 
                         
                                                               
                                                          
                                          
 
                        
                          



                                                             
 
                                                    
                                                  
                            
                                                     

                                                                      

                           

 

                                                        
                                                         


                                      
                         


                                           

                                               
 
                                        
                                    



                                                                      
                                                   



                                                                










                                                                               
                                                               

                                                            
             

 
                                         
 



                                                     

 
                                                                          
 
                            
                                                       







                                                     
                                       







                                                                           

 
 

                                                   

                                              


                                      
                                   
 


                                                   
 



                                              
                
                                              

         
                

































                                            
                     










                                             
                     

 









                                              
                                          

                                                  







                                       
                                    
                                

               



                                                              
                                        
                                        
                                    
                   
                                                
                                        

                                        


                                    


                                          
                           



                                               

                                                
                                             

                                                                    
                                  


                                                          
                    
                                                           




                                           


                                                                         
                                                              
             

                                                
                    
                                                                         


                                


                                                              
                                        
                                        
                                    
                   
                                                
                                        

                                        


                                    


                                          
                           



                                               

                                                



                                             
                                                                    
                                  


                                                          
                    
                                                         




                                           


                                                                        
                                                              
             

                                               
                    

                                                           



                                                                            



                                                       
            
                                                      
     

                                
                                  

                                             


                                                                       

                                            

                                                  
                                              

                                            

                                                      
                                              

 
                                           


                                                                 
                                  


                                               
 
                                                    
                                                         
                                        
                                

               





                                                             

                                                     


                                                

                                                                 
                             


                                             
                
                                              
         
                                  
                                
                                          
                                                

                                                      
                                                  


     



                                                       
   
                                                                    
 




                                                     
 



                                    
     
 













                                                                        
         



















































                                                                         
                          



                 



                       

 







                                                                   
 
                  






                                        
              






















                                           
             


                                           

         

                                  


















                                                              

                            
                                      
                     
                                


                                                            





                                                                

                            
               



                                               
                                                             
                               
     


                                                           
                                 




                                                                          







                                               
                                               




                                                                                        
                  
 
                                                   

                    
                                 




                                                               






                                               
                                                             




                                                          
                                 
                  





                                                                          

                            
               


                                               
                                               






                                                                               
                                                   

                    
                                 


                  



                                                                     





                                                                         

 


                                                                         



                                                                  
                                
                  

 


                                                                  
                                 


                                                        

                                                                            


                                    
                                   

 
                                                                      






                           

                                           








                                                                    
                                                           

 


                                                                

            

                                               

                            
                              
                                       
                                           
                      
 
                             

                                                                    
                                    
     
 
                                



                                       

                          

                                     

                            

                                                        






                                                
                                                        






                                                
                                         
                                                               
                                    
                                           



                                                                           
                                          





                                            
                                                          
                                            
         


                                              

                                    


     
                                                                         
 
                                                    

 
                                                                         
 
                                                    

 









                                                                             





                                                                        
                              
                                       
                      




                                       

                          

                                     

                            
                                                                          






                                            
                                                               
                                    



                                                                       








                                    




















































                                                                             







                                                                        
                      


                                       
                          

                                     
                            



                                                                      
                                                                        



                                                                       



















                                                             


                                                                



                                                                  
                                
                  

 



                                                               



                                                                  
                                
                  

 


                                                               





                                                                        

 



                                                                         








                                                                        





                                                      
               




                                                                        
                                 





                                                        

                                                                 
 
               




                                                                            
                                 


                  
                                                                

                                                                   
 
               




                                                                      
                                 





                                                              

                                                                       
 
               





                                                                          
                                 



                                                                      

                                                            
 
               




                                                                         
                                 




                                                                      

                                                           
 
               





                                                                                 
                                 





                                                               

                                                                        
 


                                         


                                               
                                            

                                                                             
                                 



                                                                       

                                                             
 


                                         


                                               
                                            


                                                                    
                                 



                                                                        

                                                               
 
               




                                                                           
                                 



                                                                      

                                                                            
 
               




                                                                              
                                 


                  


                                                                          





                                                                        










                                                             






                                                           
                                                  
                   
                                 









                                                                  


























                                                                             
 















                                                                  


                                                    


                                                           
                             



                
















                                                                  



                                                                         
                                     


                            




                                              


                                                         
                                                  
                   
                                 
 
                                                





                                                                                  



                                                                           
                   
                                                                         

                                                   


                                                           






                                                                       
                                                       


                                                            
































                                                                                      







                                                     
                                                                  
                                                  
                   
                                 






                                                    







                                                                 



































                                                                        
 



























                                                                                























                                                                            
/*
 * %CopyrightBegin%
 *
 * Copyright Ericsson AB and Kjell Winblad 1998-2018. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * %CopyrightEnd%
 */

/*
 * Description: Implementation of ETS ordered_set table type with
 *              fine-grained synchronization.
 *
 * Author: 	Kjell Winblad
 *
 * This implementation is based on the contention adapting search tree
 * (CA tree). The CA tree is a concurrent data structure that
 * dynamically adapts its synchronization granularity based on how
 * much contention is detected in locks. The following publication
 * contains a detailed description of CA trees:
 * 
 * A Contention Adapting Approach to Concurrent Ordered Sets
 * Journal of Parallel and Distributed Computing, 2018
 * Kjell Winblad and Konstantinos Sagonas
 * https://doi.org/10.1016/j.jpdc.2017.11.007
 *
 * The following publication may also be interesting as it discusses
 * how the CA tree can be used as an ETS ordered_set table type
 * backend:
 *
 * More Scalable Ordered Set for ETS Using Adaptation
 * In Thirteenth ACM SIGPLAN workshop on Erlang (2014)
 * Kjell Winblad and Konstantinos Sagonas
 * https://doi.org/10.1145/2633448.2633455
 *
 * This implementation of the ordered_set ETS table type is only
 * activated when the options {write_concurrency, true}, public and
 * ordered_set are passed to the ets:new/2 function. This
 * implementation is expected to scale better than the default
 * implementation located in "erl_db_tree.c".
 * 
 * The default implementation has a static stack optimization (see
 * get_static_stack in erl_db_tree.c). This implementation does not
 * have such an optimization as it induces bad scalability when
 * concurrent read operations are frequent (they all try to get hold
 * of the same stack). The default implementation may thus perform
 * better compared to this implementation in scenarios where the
 * static stack optimization is useful. One such scenario is when only
 * one process is accessing the table and this process is traversing
 * the table with a sequence of next/2 calls.
 */

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "sys.h"
#include "erl_vm.h"
#include "global.h"
#include "erl_process.h"
#include "error.h"
#define ERTS_WANT_DB_INTERNAL__
#include "erl_db.h"
#include "bif.h"
#include "big.h"
#include "erl_binary.h"

#include "erl_db_catree.h"
#include "erl_db_tree.h"
#include "erl_db_tree_util.h"

#ifdef DEBUG
#  define IF_DEBUG(X) X
#else
#  define IF_DEBUG(X)
#endif

/*
** Forward declarations
*/

static SWord do_delete_base_node_cont(DbTableCATree *tb,
                                    DbTableCATreeNode *base_node,
                                    SWord num_left);

/* Method interface functions */
static int db_first_catree(Process *p, DbTable *tbl,
                           Eterm *ret);
static int db_next_catree(Process *p, DbTable *tbl,
                          Eterm key, Eterm *ret);
static int db_last_catree(Process *p, DbTable *tbl,
                          Eterm *ret);
static int db_prev_catree(Process *p, DbTable *tbl,
                          Eterm key,
                          Eterm *ret);
static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail);
static int db_get_catree(Process *p, DbTable *tbl,
                         Eterm key,  Eterm *ret);
static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret);
static int db_get_element_catree(Process *p, DbTable *tbl,
                                 Eterm key,int ndex,
                                 Eterm *ret);
static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret);
static int db_erase_object_catree(DbTable *tbl, Eterm object,Eterm *ret);
static int db_slot_catree(Process *p, DbTable *tbl,
                          Eterm slot_term,  Eterm *ret);
static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
                            Eterm pattern, int reversed, Eterm *ret,
                            enum DbIterSafety);
static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
                                  Eterm pattern,  Eterm *ret, enum DbIterSafety);
static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
                                  Eterm pattern, Sint chunk_size,
                                  int reversed, Eterm *ret, enum DbIterSafety);
static int db_select_continue_catree(Process *p, DbTable *tbl,
                                     Eterm continuation, Eterm *ret,
                                     enum DbIterSafety*);
static int db_select_count_continue_catree(Process *p, DbTable *tbl,
                                           Eterm continuation, Eterm *ret,
                                           enum DbIterSafety*);
static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
                                   Eterm pattern,  Eterm *ret,
                                   enum DbIterSafety);
static int db_select_delete_continue_catree(Process *p, DbTable *tbl, 
                                            Eterm continuation, Eterm *ret,
                                            enum DbIterSafety*);
static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
                                    Eterm pattern, Eterm *ret,
                                    enum DbIterSafety);
static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
                                             Eterm continuation, Eterm *ret,
                                             enum DbIterSafety*);
static int db_take_catree(Process *, DbTable *, Eterm, Eterm *);
static void db_print_catree(fmtfn_t to, void *to_arg,
                            int show, DbTable *tbl);
static int db_free_table_catree(DbTable *tbl);
static SWord db_free_table_continue_catree(DbTable *tbl, SWord);
static void db_foreach_offheap_catree(DbTable *,
                                      void (*)(ErlOffHeap *, void *),
                                      void *);
static SWord db_delete_all_objects_catree(Process* p,
                                          DbTable* tbl,
                                          SWord reds,
                                          Eterm* nitems_holder_wb);
static Eterm db_delete_all_objects_get_nitems_from_holder_catree(Process* p,
                                                                 Eterm nitems_holder);
static int
db_lookup_dbterm_catree(Process *, DbTable *, Eterm key, Eterm obj,
                        DbUpdateHandle*);
static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *);

static void split_catree(DbTableCATree *tb,
                         DbTableCATreeNode* ERTS_RESTRICT base,
                         DbTableCATreeNode* ERTS_RESTRICT parent);
static void join_catree(DbTableCATree *tb,
                        DbTableCATreeNode *thiz,
                        DbTableCATreeNode *parent);
static ERTS_INLINE
int try_wlock_base_node(DbTableCATreeBaseNode *base_node);
static ERTS_INLINE
void wunlock_base_node(DbTableCATreeNode *base_node);
static ERTS_INLINE
void wlock_base_node_no_stats(DbTableCATreeNode *base_node);
static ERTS_INLINE
void wunlock_adapt_base_node(DbTableCATree* tb,
                             DbTableCATreeNode* node,
                             DbTableCATreeNode* parent,
                             int current_level);
/*
** External interface
*/
DbTableMethod db_catree =
{
    db_create_catree,
    db_first_catree,
    db_next_catree,
    db_last_catree,
    db_prev_catree,
    db_put_catree,
    db_get_catree,
    db_get_element_catree,
    db_member_catree,
    db_erase_catree,
    db_erase_object_catree,
    db_slot_catree,
    db_select_chunk_catree,
    db_select_catree,
    db_select_delete_catree,
    db_select_continue_catree,
    db_select_delete_continue_catree,
    db_select_count_catree,
    db_select_count_continue_catree,
    db_select_replace_catree,
    db_select_replace_continue_catree,
    db_take_catree,
    db_delete_all_objects_catree,
    db_delete_all_objects_get_nitems_from_holder_catree,
    db_free_table_catree,
    db_free_table_continue_catree,
    db_print_catree,
    db_foreach_offheap_catree,
    db_lookup_dbterm_catree,
    db_finalize_dbterm_catree

};

/*
 * Constants
 */

#define ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION 250
#define ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION (-1)
#define ERL_DB_CATREE_LOCK_GRAVITY_CONTRIBUTION (-500)
#define ERL_DB_CATREE_LOCK_GRAVITY_PATTERN (0xFF800000)
#define ERL_DB_CATREE_LOCK_MORE_THAN_ONE_CONTRIBUTION (-10)
#define ERL_DB_CATREE_HIGH_CONTENTION_LIMIT 1000
#define ERL_DB_CATREE_LOW_CONTENTION_LIMIT (-1000)
#define ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT 16
#define ERL_DB_CATREE_LOCK_LOW_NO_CONTRIBUTION_LIMIT (-20000)
#define ERL_DB_CATREE_LOCK_HIGH_NO_CONTRIBUTION_LIMIT (20000)

/*
 * Internal CA tree related helper functions and macros
 */

#define GET_ROUTE_NODE_KEY(node) (node->u.route.key.term)
#define GET_BASE_NODE_LOCK(node) (&(node->u.base.lock))
#define GET_ROUTE_NODE_LOCK(node) (&(node->u.route.lock))


/* Helpers for reading and writing shared atomic variables */

/* No memory barrier */
#define GET_ROOT(tb) ((DbTableCATreeNode*)erts_atomic_read_nob(&((tb)->root)))
#define GET_LEFT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->u.route.left)))
#define GET_RIGHT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->u.route.right)))
#define SET_ROOT(tb, v) erts_atomic_set_nob(&((tb)->root), (erts_aint_t)(v))
#define SET_LEFT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->u.route.left), (erts_aint_t)(v));
#define SET_RIGHT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v));


/* Release or acquire barriers */
#define GET_ROOT_ACQB(tb) ((DbTableCATreeNode*)erts_atomic_read_acqb(&((tb)->root)))
#define GET_LEFT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->u.route.left)))
#define GET_RIGHT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->u.route.right)))
#define SET_ROOT_RELB(tb, v) erts_atomic_set_relb(&((tb)->root), (erts_aint_t)(v))
#define SET_LEFT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.left), (erts_aint_t)(v));
#define SET_RIGHT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v));

/* Change base node lock statistics */
#define BASE_NODE_STAT_SET(NODE, VALUE) erts_atomic_set_nob(&(NODE)->u.base.lock_statistics, VALUE)
#define BASE_NODE_STAT_READ(NODE) erts_atomic_read_nob(&(NODE)->u.base.lock_statistics)
#define BASE_NODE_STAT_ADD(NODE, VALUE)                                 \
    do {                                                                \
        Sint v = erts_atomic_read_nob(&((NODE)->u.base.lock_statistics)); \
        ASSERT(VALUE > 0);                                              \
        if(v < ERL_DB_CATREE_LOCK_HIGH_NO_CONTRIBUTION_LIMIT) {          \
            erts_atomic_set_nob(&(NODE->u.base.lock_statistics), v + VALUE); \
        }                                                               \
    }while(0);
#define BASE_NODE_STAT_SUB(NODE, VALUE)                                 \
    do {                                                                \
        Sint v = erts_atomic_read_nob(&((NODE)->u.base.lock_statistics)); \
        ASSERT(VALUE < 0);                                              \
        if(v > ERL_DB_CATREE_LOCK_LOW_NO_CONTRIBUTION_LIMIT) {          \
            erts_atomic_set_nob(&(NODE->u.base.lock_statistics), v + VALUE); \
        }                                                               \
    }while(0);


/* Compares a key to the key in a route node */
static ERTS_INLINE Sint cmp_key_route(Eterm key,
                                      DbTableCATreeNode *obj)
{
    return CMP(key, GET_ROUTE_NODE_KEY(obj));
}

/*
 * Used by the split_tree function
 */
static ERTS_INLINE
int less_than_two_elements(TreeDbTerm *root)
{
    return root == NULL || (root->left == NULL && root->right == NULL);
}

/*
 * Inserts a TreeDbTerm into a tree. Returns the new root.
 */
static ERTS_INLINE
TreeDbTerm* insert_TreeDbTerm(DbTableCATree *tb,
                              TreeDbTerm *insert_to_root,
                              TreeDbTerm *value_to_insert) {
    /* Non recursive insertion in AVL tree, building our own stack */
    TreeDbTerm **tstack[STACK_NEED];
    int tpos = 0;
    int dstack[STACK_NEED+1];
    int dpos = 0;
    int state = 0;
    TreeDbTerm * base = insert_to_root;
    TreeDbTerm **this = &base;
    Sint c;
    Eterm key;
    int dir;
    TreeDbTerm *p1, *p2, *p;

    key = GETKEY(tb, value_to_insert->dbterm.tpl);

    dstack[dpos++] = DIR_END;
    for (;;)
	if (!*this) { /* Found our place */
	    state = 1;
	    *this = value_to_insert;
	    (*this)->balance = 0;
	    (*this)->left = (*this)->right = NULL;
	    break;
	} else if ((c = cmp_key(&tb->common, key, *this)) < 0) {
	    /* go lefts */
	    dstack[dpos++] = DIR_LEFT;
	    tstack[tpos++] = this;
	    this = &((*this)->left);
	} else { /* go right */
	    dstack[dpos++] = DIR_RIGHT;
	    tstack[tpos++] = this;
	    this = &((*this)->right);
	}

    while (state && ( dir = dstack[--dpos] ) != DIR_END) {
	this = tstack[--tpos];
	p = *this;
	if (dir == DIR_LEFT) {
	    switch (p->balance) {
	    case 1:
		p->balance = 0;
		state = 0;
		break;
	    case 0:
		p->balance = -1;
		break;
	    case -1: /* The icky case */
		p1 = p->left;
		if (p1->balance == -1) { /* Single LL rotation */
		    p->left = p1->right;
		    p1->right = p;
		    p->balance = 0;
		    (*this) = p1;
		} else { /* Double RR rotation */
		    p2 = p1->right;
		    p1->right = p2->left;
		    p2->left = p1;
		    p->left = p2->right;
		    p2->right = p;
		    p->balance = (p2->balance == -1) ? +1 : 0;
		    p1->balance = (p2->balance == 1) ? -1 : 0;
		    (*this) = p2;
		}
		(*this)->balance = 0;
		state = 0;
		break;
	    }
	} else { /* dir == DIR_RIGHT */
	    switch (p->balance) {
	    case -1:
		p->balance = 0;
		state = 0;
		break;
	    case 0:
		p->balance = 1;
		break;
	    case 1:
		p1 = p->right;
		if (p1->balance == 1) { /* Single RR rotation */
		    p->right = p1->left;
		    p1->left = p;
		    p->balance = 0;
		    (*this) = p1;
		} else { /* Double RL rotation */
		    p2 = p1->left;
		    p1->left = p2->right;
		    p2->right = p1;
		    p->right = p2->left;
		    p2->left = p;
		    p->balance = (p2->balance == 1) ? -1 : 0;
		    p1->balance = (p2->balance == -1) ? 1 : 0;
		    (*this) = p2;
		}
		(*this)->balance = 0;
		state = 0;
		break;
	    }
	}
    }
    return base;
}

/*
 * Split an AVL tree into two trees. The function stores the node
 * containing the "split key" in the write back parameter
 * split_key_wb. The function stores the left tree containing the keys
 * that are smaller than the "split key" in the write back parameter
 * left_wb and the tree containing the rest of the keys in the write
 * back parameter right_wb.
 */
static void split_tree(DbTableCATree *tb,
                       TreeDbTerm *root,
                       TreeDbTerm **split_key_node_wb,
                       TreeDbTerm **left_wb,
                       TreeDbTerm **right_wb) {
    TreeDbTerm * split_node = NULL;
    TreeDbTerm * left_root;
    TreeDbTerm * right_root;
    if (root->left == NULL) { /* To get non empty split */
        *right_wb = root->right;
        *split_key_node_wb = root->right;
        root->right = NULL;
        root->balance = 0;
        *left_wb = root;
        return;
    }
    split_node = root;
    left_root = split_node->left;
    split_node->left = NULL;
    right_root = split_node->right;
    split_node->right = NULL;
    right_root = insert_TreeDbTerm(tb, right_root, split_node);
    *split_key_node_wb = split_node;
    *left_wb = left_root;
    *right_wb = right_root;
}

/*
 * Used by the join_trees function
 */
static ERTS_INLINE int compute_tree_hight(TreeDbTerm * root)
{
    if(root == NULL) {
        return 0;
    } else {
        TreeDbTerm * current_node = root;
        int hight_so_far = 1;
        while (current_node->left != NULL || current_node->right != NULL) {
            if (current_node->balance == -1) {
                current_node = current_node->left;
            } else {
                current_node = current_node->right;
            }
            hight_so_far = hight_so_far + 1;
        }
        return hight_so_far;
    }
}

/*
 * Used by the join_trees function
 */
static ERTS_INLINE
TreeDbTerm* linkout_min_or_max_tree_node(TreeDbTerm **root, int is_min)
{
    TreeDbTerm **tstack[STACK_NEED];
    int tpos = 0;
    int dstack[STACK_NEED+1];
    int dpos = 0;
    int state = 0;
    TreeDbTerm **this = root;
    int dir;
    TreeDbTerm *q = NULL;

    dstack[dpos++] = DIR_END;
    for (;;) {
        if (!*this) { /* Failure */
            return NULL;
        } else if (is_min && (*this)->left != NULL) {
            dstack[dpos++] = DIR_LEFT;
            tstack[tpos++] = this;
            this = &((*this)->left);
        } else if (!is_min && (*this)->right != NULL) {
            dstack[dpos++] = DIR_RIGHT;
            tstack[tpos++] = this;
            this = &((*this)->right);
        } else { /* Min value, found the one to splice out */
            q = (*this);
            if (q->right == NULL) {
                (*this) = q->left;
                state = 1;
            } else if (q->left == NULL) {
                (*this) = q->right;
                state = 1;
            }
            break;
        }
    }
    while (state && ( dir = dstack[--dpos] ) != DIR_END) {
        this = tstack[--tpos];
        if (dir == DIR_LEFT) {
            state = tree_balance_left(this);
        } else {
            state = tree_balance_right(this);
        }
    }
    return q;
}

#define LINKOUT_MIN_TREE_NODE(root) linkout_min_or_max_tree_node(root, 1)
#define LINKOUT_MAX_TREE_NODE(root) linkout_min_or_max_tree_node(root, 0)

/*
 * Joins two AVL trees where all the keys in the left one are smaller
 * then the keys in the right one and returns the resulting tree.
 *
 * The algorithm is described on page 474 in D. E. Knuth. The Art of
 * Computer Programming: Sorting and Searching,
 * vol. 3. Addison-Wesley, 2nd edition, 1998.
 */
static TreeDbTerm* join_trees(TreeDbTerm *left_root_param,
                               TreeDbTerm *right_root_param)
{
    TreeDbTerm **tstack[STACK_NEED];
    int tpos = 0;
    int dstack[STACK_NEED+1];
    int dpos = 0;
    int state = 1;
    TreeDbTerm **this;
    int dir;
    TreeDbTerm *p1, *p2, *p;
    TreeDbTerm *left_root = left_root_param;
    TreeDbTerm *right_root = right_root_param;
    int left_height;
    int right_height;
    int current_height;
    dstack[dpos++] = DIR_END;
    if (left_root == NULL) {
        return right_root;
    } else if (right_root == NULL) {
        return left_root;
    }

    left_height = compute_tree_hight(left_root);
    right_height = compute_tree_hight(right_root);
    if (left_height >= right_height) {
        TreeDbTerm * new_root =
            LINKOUT_MIN_TREE_NODE(&right_root);
        int new_right_height = compute_tree_hight(right_root);
        TreeDbTerm * current_node = left_root;
        this = &left_root;
        current_height = left_height;
        while(current_height > new_right_height + 1) {
            if (current_node->balance == -1) {
                current_height = current_height - 2;
            } else {
                current_height = current_height - 1;
            }
            dstack[dpos++] = DIR_RIGHT;
            tstack[tpos++] = this;
            this = &((*this)->right);
            current_node = current_node->right;
        }
        new_root->left = current_node;
        new_root->right = right_root;
        new_root->balance = new_right_height - current_height;
        *this = new_root;
    } else {
        /* This case is symmetric to the previous case */
        TreeDbTerm * new_root =
            LINKOUT_MAX_TREE_NODE(&left_root);
        int new_left_height = compute_tree_hight(left_root);
        TreeDbTerm * current_node = right_root;
        this = &right_root;
        current_height = right_height;
        while (current_height > new_left_height + 1) {
            if (current_node->balance == 1) {
                current_height = current_height - 2;
            } else {
                current_height = current_height - 1;
            }
            dstack[dpos++] = DIR_LEFT;
            tstack[tpos++] = this;
            this = &((*this)->left);
            current_node = current_node->left;
        }
        new_root->right = current_node;
        new_root->left = left_root;
        new_root->balance = current_height - new_left_height;
        *this = new_root;
    }
    /* Now we need to continue as if this was during the insert */
    while (state && ( dir = dstack[--dpos] ) != DIR_END) {
        this = tstack[--tpos];
        p = *this;
        if (dir == DIR_LEFT) {
            switch (p->balance) {
            case 1:
                p->balance = 0;
                state = 0;
                break;
            case 0:
                p->balance = -1;
                break;
            case -1: /* The icky case */
                p1 = p->left;
                if (p1->balance == -1) { /* Single LL rotation */
                    p->left = p1->right;
                    p1->right = p;
                    p->balance = 0;
                    (*this) = p1;
                } else { /* Double RR rotation */
                    p2 = p1->right;
                    p1->right = p2->left;
                    p2->left = p1;
                    p->left = p2->right;
                    p2->right = p;
                    p->balance = (p2->balance == -1) ? +1 : 0;
                    p1->balance = (p2->balance == 1) ? -1 : 0;
                    (*this) = p2;
                }
                (*this)->balance = 0;
                state = 0;
                break;
            }
        } else { /* dir == DIR_RIGHT */
            switch (p->balance) {
            case -1:
                p->balance = 0;
                state = 0;
                break;
            case 0:
                p->balance = 1;
                break;
            case 1:
                p1 = p->right;
                if (p1->balance == 1) { /* Single RR rotation */
                    p->right = p1->left;
                    p1->left = p;
                    p->balance = 0;
                    (*this) = p1;
                } else { /* Double RL rotation */
                    p2 = p1->left;
                    p1->left = p2->right;
                    p2->right = p1;
                    p->right = p2->left;
                    p2->left = p;
                    p->balance = (p2->balance == 1) ? -1 : 0;
                    p1->balance = (p2->balance == -1) ? 1 : 0;
                    (*this) = p2;
                }
                (*this)->balance = 0;
                state = 0;
                break;
            }
        }
    }
    /* Return the joined tree */
    if (left_height >= right_height) {
        return left_root;
    } else {
        return right_root;
    }
}

#ifdef DEBUG
#  define PROVOKE_RANDOM_SPLIT_JOIN
#endif
#ifdef PROVOKE_RANDOM_SPLIT_JOIN
static int dbg_fastrand(void)
{
    static int g_seed = 648835;
    g_seed = (214013*g_seed+2531011);
    return (g_seed>>16)&0x7FFF;
}

static void dbg_provoke_random_splitjoin(DbTableCATree* tb,
                                         DbTableCATreeNode* base_node)
{
    if (tb->common.status & DB_CATREE_FORCE_SPLIT ||
        !(tb->common.status & DB_CATREE_DEBUG_RANDOM_SPLIT_JOIN))
        return;

    switch (dbg_fastrand() % 8) {
    case 1:
        BASE_NODE_STAT_ADD(base_node, 1+ERL_DB_CATREE_HIGH_CONTENTION_LIMIT);
        break;
    case 2:
        BASE_NODE_STAT_SUB(base_node, -1+ERL_DB_CATREE_LOW_CONTENTION_LIMIT);
        break;
    }
}
#else
#  define dbg_provoke_random_splitjoin(T,N)
#endif /* PROVOKE_RANDOM_SPLIT_JOIN */

static ERTS_NOINLINE
void do_random_join(DbTableCATree* tb, Uint rand)
{
    DbTableCATreeNode* node = GET_ROOT_ACQB(tb);
    DbTableCATreeNode* parent = NULL;
    int level = 0;
    Sint stat;
    while (!node->is_base_node) {
        parent = node;
        if ((rand & (1 << level)) == 0) {
            node = GET_LEFT_ACQB(node);
        } else {
            node = GET_RIGHT_ACQB(node);
        }
        level++;
    }
    BASE_NODE_STAT_SUB(node, ERL_DB_CATREE_LOCK_GRAVITY_CONTRIBUTION);
    stat = BASE_NODE_STAT_READ(node);
    if (stat >= ERL_DB_CATREE_LOW_CONTENTION_LIMIT &&
        stat <= ERL_DB_CATREE_HIGH_CONTENTION_LIMIT) {
        return; /* No adaptation */
    }
    if (parent != NULL && !try_wlock_base_node(&node->u.base)) {
        if (!node->u.base.is_valid) {
            wunlock_base_node(node);
            return;
        }
        wunlock_adapt_base_node(tb, node, parent, level);
    }
}

static ERTS_INLINE
void do_random_join_with_low_probability(DbTableCATree* tb, Uint seed)
{
#ifndef ERTS_DB_CA_TREE_NO_RANDOM_JOIN_WITH_LOW_PROBABILITY
    Uint32 rand = erts_sched_local_random(seed);
    if (((rand & ERL_DB_CATREE_LOCK_GRAVITY_PATTERN)) == 0) {
        do_random_join(tb, rand);
    }
#endif
}

static ERTS_INLINE
int try_wlock_base_node(DbTableCATreeBaseNode *base_node)
{
    return EBUSY == erts_rwmtx_tryrwlock(&base_node->lock);
}

/*
 * Locks a base node without adjusting the lock statistics
 */
static ERTS_INLINE
void wlock_base_node_no_stats(DbTableCATreeNode *base_node)
{
    ASSERT(base_node->is_base_node);
    erts_rwmtx_rwlock(&base_node->u.base.lock);
}

/*
 * Locks a base node and adjusts the lock statistics according to if
 * the lock was contended or not
 */
static ERTS_INLINE
void wlock_base_node(DbTableCATreeNode *base_node)
{
    ASSERT(base_node->is_base_node);
    if (try_wlock_base_node(&base_node->u.base)) {
        /* The lock is contended */
        wlock_base_node_no_stats(base_node);
        BASE_NODE_STAT_ADD(base_node, ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION);
    } else {
        BASE_NODE_STAT_SUB(base_node, ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION);
    }
}

static ERTS_INLINE
void wunlock_base_node(DbTableCATreeNode *base_node)
{
    erts_rwmtx_rwunlock(&base_node->u.base.lock);
}

static ERTS_INLINE
void wunlock_adapt_base_node(DbTableCATree* tb,
                             DbTableCATreeNode* node,
                             DbTableCATreeNode* parent,
                             int current_level)
{
    Sint base_node_lock_stat = BASE_NODE_STAT_READ(node);
    dbg_provoke_random_splitjoin(tb,node);
    if ((!node->u.base.root && parent && !(tb->common.status
                                           & DB_CATREE_FORCE_SPLIT))
        || base_node_lock_stat < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) {
        join_catree(tb, node, parent);
    }
    else if (base_node_lock_stat > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT
        && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) {
        split_catree(tb, node, parent);
    }
    else {
        wunlock_base_node(node);
    }
}

static ERTS_INLINE
void rlock_base_node(DbTableCATreeNode *base_node)
{
    ASSERT(base_node->is_base_node);
    if (EBUSY == erts_rwmtx_tryrlock(&base_node->u.base.lock)) {
        /* The lock is contended */
        BASE_NODE_STAT_ADD(base_node, ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION);
        erts_rwmtx_rlock(&base_node->u.base.lock);
    }
}

static ERTS_INLINE
void runlock_base_node(DbTableCATreeNode *base_node, DbTableCATree* tb)
{
    ASSERT(base_node->is_base_node);
    erts_rwmtx_runlock(&base_node->u.base.lock);
    do_random_join_with_low_probability(tb, (Uint)base_node);
}

static ERTS_INLINE
void runlock_base_node_no_rand(DbTableCATreeNode *base_node)
{
    ASSERT(base_node->is_base_node);
    erts_rwmtx_runlock(&base_node->u.base.lock);
}

static ERTS_INLINE
void lock_route_node(DbTableCATreeNode *route_node)
{
    ASSERT(!route_node->is_base_node);
    erts_mtx_lock(&route_node->u.route.lock);
}

static ERTS_INLINE
void unlock_route_node(DbTableCATreeNode *route_node)
{
    ASSERT(!route_node->is_base_node);
    erts_mtx_unlock(&route_node->u.route.lock);
}

static ERTS_INLINE
Eterm copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size)
{
    dst->size = key_size;
    if (key_size != 0) {
        Eterm* hp = &dst->heap[0];
        ErlOffHeap tmp_offheap;
        tmp_offheap.first  = NULL;
        dst->term = copy_struct(key, key_size, &hp, &tmp_offheap);
        dst->oh = tmp_offheap.first;
    }
    else {
        ASSERT(is_immed(key));
        dst->term = key;
        dst->oh = NULL;
    }
    return dst->term;
}

static ERTS_INLINE
void destroy_route_key(DbRouteKey* key)
{
    if (key->oh) {
        ErlOffHeap oh;
        oh.first = key->oh;
        erts_cleanup_offheap(&oh);
    }
}

static ERTS_INLINE
void init_root_iterator(DbTableCATree* tb, CATreeRootIterator* iter,
                        int read_only)
{
    iter->tb = tb;
    iter->read_only = read_only;
    iter->locked_bnode = NULL;
    iter->next_route_key = THE_NON_VALUE;
    iter->search_key = NULL;
}

static ERTS_INLINE
void lock_iter_base_node(CATreeRootIterator* iter,
                         DbTableCATreeNode *base_node,
                         DbTableCATreeNode *parent,
                         int current_level)
{
    ASSERT(!iter->locked_bnode);
    if (iter->read_only)
        rlock_base_node(base_node);
    else {
        wlock_base_node(base_node);
        iter->bnode_parent = parent;
        iter->bnode_level = current_level;
    }
    iter->locked_bnode = base_node;
}

static ERTS_INLINE
void unlock_iter_base_node(CATreeRootIterator* iter)
{
    ASSERT(iter->locked_bnode);
    if (iter->read_only)
        runlock_base_node(iter->locked_bnode, iter->tb);
    else if (iter->locked_bnode->u.base.is_valid) {
        wunlock_adapt_base_node(iter->tb, iter->locked_bnode,
                                iter->bnode_parent, iter->bnode_level);
    }
    else
        wunlock_base_node(iter->locked_bnode);
    iter->locked_bnode = NULL;
}

static ERTS_INLINE
void destroy_root_iterator(CATreeRootIterator* iter)
{
    if (iter->locked_bnode)
        unlock_iter_base_node(iter);
    if (iter->search_key) {
        destroy_route_key(iter->search_key);
        erts_free(ERTS_ALC_T_DB_TMP, iter->search_key);
    }
}

typedef struct
{
    DbTableCATreeNode *parent;
    int current_level;
} FindBaseNode;

static ERTS_INLINE
DbTableCATreeNode* find_base_node(DbTableCATree* tb, Eterm key,
                                  FindBaseNode* fbn)
{
    DbTableCATreeNode* ERTS_RESTRICT node = GET_ROOT_ACQB(tb);
    if (fbn) {
        fbn->parent = NULL;
        fbn->current_level = 0;
    }
    while (!node->is_base_node) {
        if (fbn) {
            fbn->current_level++;
            fbn->parent = node;
        }
        if (cmp_key_route(key, node) < 0) {
            node = GET_LEFT_ACQB(node);
        } else {
            node = GET_RIGHT_ACQB(node);
        }
    }
    return node;
}

static ERTS_INLINE
DbTableCATreeNode* find_rlock_valid_base_node(DbTableCATree* tb, Eterm key)
{
    DbTableCATreeNode* base_node;

    while (1) {
        base_node = find_base_node(tb, key, NULL);
        rlock_base_node(base_node);
        if (base_node->u.base.is_valid)
            break;
        runlock_base_node_no_rand(base_node);
    }
    return base_node;
}

static ERTS_INLINE
DbTableCATreeNode* find_wlock_valid_base_node(DbTableCATree* tb, Eterm key,
                                              FindBaseNode* fbn)
{
    DbTableCATreeNode* base_node;

    while (1) {
        base_node = find_base_node(tb, key, fbn);
        wlock_base_node(base_node);
        if (base_node->u.base.is_valid)
            break;
        wunlock_base_node(base_node);
    }
    return base_node;
}

#ifdef ERTS_ENABLE_LOCK_CHECK
#  define LC_ORDER(ORDER) ORDER
#else
#  define LC_ORDER(ORDER) NIL
#endif

#define sizeof_base_node() \
          offsetof(DbTableCATreeNode, u.base.end_of_struct__)

static DbTableCATreeNode *create_base_node(DbTableCATree *tb,
                                           TreeDbTerm* root)
{
    DbTableCATreeNode *p;
    erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
    p = erts_db_alloc(ERTS_ALC_T_DB_TABLE, (DbTable *) tb,
                      sizeof_base_node());

    p->is_base_node = 1;
    p->u.base.root = root;
    if (tb->common.type & DB_FREQ_READ)
        rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
    if (erts_ets_rwmtx_spin_count >= 0)
        rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;

    erts_rwmtx_init_opt(&p->u.base.lock, &rwmtx_opt,
                        "erl_db_catree_base_node",
                        NIL,
                        ERTS_LOCK_FLAGS_CATEGORY_DB);
    BASE_NODE_STAT_SET(p, ((tb->common.status & DB_CATREE_FORCE_SPLIT)
                           ? INT_MAX : 0));
    p->u.base.is_valid = 1;
    return p;
}

static ERTS_INLINE Uint sizeof_route_node(Uint key_size)
{
    return (offsetof(DbTableCATreeNode, u.route.key.heap)
            + key_size*sizeof(Eterm));
}

static DbTableCATreeNode*
create_route_node(DbTableCATree *tb,
                  DbTableCATreeNode *left,
                  DbTableCATreeNode *right,
                  DbTerm * keyTerm,
                  DbTableCATreeNode* lc_parent)
{
    Eterm key = GETKEY(tb,keyTerm->tpl);
    int key_size = size_object(key);
    DbTableCATreeNode* p = erts_db_alloc(ERTS_ALC_T_DB_TABLE,
                                         (DbTable *) tb,
                                         sizeof_route_node(key_size));

    copy_route_key(&p->u.route.key, key, key_size);
    p->is_base_node = 0;
    p->u.route.is_valid = 1;
    erts_atomic_init_nob(&p->u.route.left, (erts_aint_t)left);
    erts_atomic_init_nob(&p->u.route.right, (erts_aint_t)right);
#ifdef ERTS_ENABLE_LOCK_CHECK
    /* Route node lock order is inverse tree depth (from leafs toward root) */
    p->u.route.lc_order = (lc_parent == NULL ? MAX_SMALL :
                           lc_parent->u.route.lc_order - 1);
    /*
     * This assert may eventually fail as we don't increase 'lc_order' in join
     * operations when route nodes move up in the tree.
     * Tough luck if you run a lock-checking VM for such a long time on 32-bit.
     */
    ERTS_LC_ASSERT(p->u.route.lc_order >= 0);
#endif
    erts_mtx_init(&p->u.route.lock, "erl_db_catree_route_node",
                  LC_ORDER(make_small(p->u.route.lc_order)),
                  ERTS_LOCK_FLAGS_CATEGORY_DB);
    return p;
}

static void do_free_base_node(void* vptr)
{
    DbTableCATreeNode *p = (DbTableCATreeNode *)vptr;
    ASSERT(p->is_base_node);
    erts_rwmtx_destroy(&p->u.base.lock);
    erts_free(ERTS_ALC_T_DB_TABLE, p);
}

static void free_catree_base_node(DbTableCATree* tb, DbTableCATreeNode* p)
{
    ASSERT(p->is_base_node);
    ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_base_node(), 0);
    do_free_base_node(p);
}

static void do_free_route_node(void *vptr)
{
    DbTableCATreeNode *p = (DbTableCATreeNode *)vptr;
    ASSERT(!p->is_base_node);
    erts_mtx_destroy(&p->u.route.lock);
    destroy_route_key(&p->u.route.key);
    erts_free(ERTS_ALC_T_DB_TABLE, p);
}

static void free_catree_route_node(DbTableCATree* tb, DbTableCATreeNode* p)
{
    ASSERT(!p->is_base_node);
    ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_route_node(p->u.route.key.size), 0);
    do_free_route_node(p);
}


/*
 * Returns the parent routing node of the specified
 * route node 'child' if such a parent exists
 * or NULL if 'child' is attached to the root.
 */
static ERTS_INLINE DbTableCATreeNode *
parent_of(DbTableCATree *tb,
          DbTableCATreeNode *child)
{
    Eterm key = GET_ROUTE_NODE_KEY(child);
    DbTableCATreeNode *current = GET_ROOT_ACQB(tb);
    DbTableCATreeNode *prev = NULL;

    while (current != child) {
        prev = current;
        if (cmp_key_route(key, current) < 0) {
            current = GET_LEFT_ACQB(current);
        } else {
            current = GET_RIGHT_ACQB(current);
        }
    }
    return prev;
}


static ERTS_INLINE DbTableCATreeNode *
leftmost_base_node(DbTableCATreeNode *root)
{
    DbTableCATreeNode *node = root;
    while (!node->is_base_node) {
        node = GET_LEFT_ACQB(node);
    }
    return node;
}


static ERTS_INLINE DbTableCATreeNode *
rightmost_base_node(DbTableCATreeNode *root)
{
    DbTableCATreeNode *node = root;
    while (!node->is_base_node) {
        node = GET_RIGHT_ACQB(node);
    }
    return node;
}


static ERTS_INLINE DbTableCATreeNode *
leftmost_route_node(DbTableCATreeNode *root)
{
    DbTableCATreeNode *node = root;
    DbTableCATreeNode *prev_node = NULL;
    while (!node->is_base_node) {
        prev_node = node;
        node = GET_LEFT_ACQB(node);
    }
    return prev_node;
}

static ERTS_INLINE DbTableCATreeNode*
rightmost_route_node(DbTableCATreeNode *root)
{
    DbTableCATreeNode * node = root;
    DbTableCATreeNode * prev_node = NULL;
    while (!node->is_base_node) {
        prev_node = node;
        node = GET_RIGHT_ACQB(node);
    }
    return prev_node;
}

static ERTS_INLINE
void init_tree_stack(DbTreeStack *stack,
                     TreeDbTerm **stack_array,
                     Uint init_slot)
{
    stack->array = stack_array;
    stack->pos = 0;
    stack->slot = init_slot;
}

static void join_catree(DbTableCATree *tb,
                        DbTableCATreeNode *thiz,
                        DbTableCATreeNode *parent)
{
    DbTableCATreeNode *gparent;
    DbTableCATreeNode *neighbor;
    DbTableCATreeNode *new_neighbor;
    DbTableCATreeNode *neighbor_parent;

    ASSERT(thiz->is_base_node);
    if (parent == NULL) {
        BASE_NODE_STAT_SET(thiz, 0);
        wunlock_base_node(thiz);
        return;
    }
    ASSERT(!parent->is_base_node);
    if (GET_LEFT(parent) == thiz) {
        neighbor = leftmost_base_node(GET_RIGHT_ACQB(parent));
        if (try_wlock_base_node(&neighbor->u.base)) {
            /* Failed to acquire lock */
            BASE_NODE_STAT_SET(thiz, 0);
            wunlock_base_node(thiz);
            return;
        } else if (!neighbor->u.base.is_valid) {
            BASE_NODE_STAT_SET(thiz, 0);
            wunlock_base_node(thiz);
            wunlock_base_node(neighbor);
            return;
        } else {
            lock_route_node(parent);
            parent->u.route.is_valid = 0;
            neighbor->u.base.is_valid = 0;
            thiz->u.base.is_valid = 0;
            gparent = NULL;
            do {
                if (gparent != NULL) {
                    unlock_route_node(gparent);
                }
                gparent = parent_of(tb, parent);
                if (gparent != NULL)
                    lock_route_node(gparent);
            } while (gparent != NULL && !gparent->u.route.is_valid);

            if (gparent == NULL) {
                SET_ROOT_RELB(tb, GET_RIGHT(parent));
            } else if (GET_LEFT(gparent) == parent) {
                SET_LEFT_RELB(gparent, GET_RIGHT(parent));
            } else {
                SET_RIGHT_RELB(gparent, GET_RIGHT(parent));
            }
            unlock_route_node(parent);
            if (gparent != NULL) {
                unlock_route_node(gparent);
            }
            {
                TreeDbTerm* new_root = join_trees(thiz->u.base.root,
                                                  neighbor->u.base.root);
                new_neighbor = create_base_node(tb, new_root);
            }
            if (GET_RIGHT(parent) == neighbor) {
                neighbor_parent = gparent;
            } else {
                neighbor_parent = leftmost_route_node(GET_RIGHT(parent));
            }
        }
    } else { /* Symetric case */
        ASSERT(GET_RIGHT(parent) == thiz);
        neighbor = rightmost_base_node(GET_LEFT_ACQB(parent));
        if (try_wlock_base_node(&neighbor->u.base)) {
            /* Failed to acquire lock */
            BASE_NODE_STAT_SET(thiz, 0);
            wunlock_base_node(thiz);
            return;
        } else if (!neighbor->u.base.is_valid) {
            BASE_NODE_STAT_SET(thiz, 0);
            wunlock_base_node(thiz);
            wunlock_base_node(neighbor);
            return;
        } else {
            lock_route_node(parent);
            parent->u.route.is_valid = 0;
            neighbor->u.base.is_valid = 0;
            thiz->u.base.is_valid = 0;
            gparent = NULL;
            do {
                if (gparent != NULL) {
                    unlock_route_node(gparent);
                }
                gparent = parent_of(tb, parent);
                if (gparent != NULL) {
                    lock_route_node(gparent);
                } else {
                    gparent = NULL;
                }
            } while (gparent != NULL && !gparent->u.route.is_valid);
            if (gparent == NULL) {
                SET_ROOT_RELB(tb, GET_LEFT(parent));
            } else if (GET_RIGHT(gparent) == parent) {
                SET_RIGHT_RELB(gparent, GET_LEFT(parent));
            } else {
                SET_LEFT_RELB(gparent, GET_LEFT(parent));
            }
            unlock_route_node(parent);
            if (gparent != NULL) {
                unlock_route_node(gparent);
            }
            {
                TreeDbTerm* new_root = join_trees(neighbor->u.base.root,
                                                  thiz->u.base.root);
                new_neighbor = create_base_node(tb, new_root);
            }
            if (GET_LEFT(parent) == neighbor) {
                neighbor_parent = gparent;
            } else {
                neighbor_parent =
                    rightmost_route_node(GET_LEFT(parent));
            }
        }
    }
    /* Link in new neighbor and free nodes that are no longer in the tree */
    if (neighbor_parent == NULL) {
        SET_ROOT_RELB(tb, new_neighbor);
    } else if (GET_LEFT(neighbor_parent) == neighbor) {
        SET_LEFT_RELB(neighbor_parent, new_neighbor);
    } else {
        SET_RIGHT_RELB(neighbor_parent, new_neighbor);
    }
    wunlock_base_node(thiz);
    wunlock_base_node(neighbor);
    /* Free the parent and base */
    erts_schedule_db_free(&tb->common,
                          do_free_route_node,
                          parent,
                          &parent->u.route.free_item,
                          sizeof_route_node(parent->u.route.key.size));
    erts_schedule_db_free(&tb->common,
                          do_free_base_node,
                          thiz,
                          &thiz->u.base.free_item,
                          sizeof_base_node());
    erts_schedule_db_free(&tb->common,
                          do_free_base_node,
                          neighbor,
                          &neighbor->u.base.free_item,
                          sizeof_base_node());
}

static void split_catree(DbTableCATree *tb,
                         DbTableCATreeNode* ERTS_RESTRICT base,
                         DbTableCATreeNode* ERTS_RESTRICT parent)
{
    TreeDbTerm *splitOutWriteBack;
    DbTableCATreeNode* ERTS_RESTRICT new_left;
    DbTableCATreeNode* ERTS_RESTRICT new_right;
    DbTableCATreeNode* ERTS_RESTRICT new_route;

    if (less_than_two_elements(base->u.base.root)) {
        if (!(tb->common.status & DB_CATREE_FORCE_SPLIT))
            BASE_NODE_STAT_SET(base, 0);
        wunlock_base_node(base);
        return;
    } else {
        TreeDbTerm *left_tree;
        TreeDbTerm *right_tree;

        split_tree(tb, base->u.base.root, &splitOutWriteBack,
                   &left_tree, &right_tree);

        new_left = create_base_node(tb, left_tree);
        new_right = create_base_node(tb, right_tree);
        new_route = create_route_node(tb,
                                      new_left,
                                      new_right,
                                      &splitOutWriteBack->dbterm,
                                      parent);
        if (parent == NULL) {
            SET_ROOT_RELB(tb, new_route);
        } else if(GET_LEFT(parent) == base) {
            SET_LEFT_RELB(parent, new_route);
        } else {
            SET_RIGHT_RELB(parent, new_route);
        }
        base->u.base.is_valid = 0;
        wunlock_base_node(base);
        erts_schedule_db_free(&tb->common,
                              do_free_base_node,
                              base,
                              &base->u.base.free_item,
                              sizeof_base_node());
    }
}

/* @brief Free the entire catree and its sub-trees.
 *
 * @param reds Reductions to spend.
 * @return Reductions left. Negative value if not done.
 */
static SWord db_free_table_continue_catree(DbTable *tbl, SWord reds)
{
    DbTableCATree *tb = &tbl->catree;
    DbTableCATreeNode *node;
    DbTableCATreeNode *parent;
    CATreeNodeStack rnode_stack;
    DbTableCATreeNode *rnode_stack_array[STACK_NEED];

    if (!tb->deletion) {
        /* First call */
        tb->deletion = 1;
        tb->nr_of_deleted_items = 0;
    }

    /*
     * The route tree is traversed and freed while keeping it consistent
     * during yields.
     */
    rnode_stack.array = rnode_stack_array;
    rnode_stack.pos = 0;
    rnode_stack.size = STACK_NEED;

    node = GET_ROOT(tb);
    if (node->is_base_node) {
        if (node->u.base.root) {
            reds = do_delete_base_node_cont(tb, node, reds);
            if (reds < 0)
                return reds; /* Yield */
        }
        free_catree_base_node(tb, node);
    }
    else {
        for (;;) {
            DbTableCATreeNode* left = GET_LEFT(node);
            DbTableCATreeNode* right = GET_RIGHT(node);

            if (!left->is_base_node) {
                PUSH_NODE(&rnode_stack, node);
                node = left;
            }
            else if (!right->is_base_node) {
                PUSH_NODE(&rnode_stack, node);
                node = right;
            }
            else {
                if (left->u.base.root) {
                    reds = do_delete_base_node_cont(tb, left, reds);
                    if (reds < 0)
                        return reds; /* Yield */
                }
                if (right->u.base.root) {
                    reds = do_delete_base_node_cont(tb, right, reds);
                    if (reds < 0)
                        return reds; /* Yield */
                }

                free_catree_base_node(tb, right);
                free_catree_route_node(tb, node);
                /*
                 * Keep empty left base node to join with its grandparent
                 * for tree consistency during yields.
                 */

                parent = POP_NODE(&rnode_stack);
                if (parent) {
                    if (node == GET_LEFT(parent)) {
                        SET_LEFT(parent, left);
                    }
                    else {
                        ASSERT(node == GET_RIGHT(parent));
                        SET_RIGHT(parent, left);
                    }

                    reds -= 2;
                    if (reds < 0)
                        return reds; /* Yield */

                    node = parent;
                }
                else {  /* Done */
                    free_catree_base_node(tb, left);
                    break;
                }
            }
        }
    }

    ASSERT(reds >= 0);
    SET_ROOT(tb, NULL);
    return reds;
}

/* @brief Free all objects of a base node, but keep the base node.
 *
 * @param reds Reductions to spend.
 * @return Reductions left. Negative value if not done.
 */
static SWord do_delete_base_node_cont(DbTableCATree *tb,
                                      DbTableCATreeNode *base_node,
                                      SWord reds)
{
    TreeDbTerm *p;
    DbTreeStack stack;
    TreeDbTerm* stack_array[STACK_NEED];

    stack.pos = 0;
    stack.array = stack_array;

    p = base_node->u.base.root;
    for (;;) {
        if (p->left) {
            PUSH_NODE(&stack, p);
            p = p->left;
        }
        else if (p->right) {
            PUSH_NODE(&stack, p);
            p = p->right;
        }
        else {
            TreeDbTerm *parent;

            DEC_NITEMS((DbTable*)tb);
            tb->nr_of_deleted_items++;
            free_term((DbTable*)tb, p);

            parent = POP_NODE(&stack);
            if (!parent)
                break;
            if (parent->left == p)
                parent->left = NULL;
            else {
                ASSERT(parent->right == p);
                parent->right = NULL;
            }
            if (--reds < 0)
                return reds;	/* Yield */
            p = parent;
        }
    }
    base_node->u.base.root = NULL;
    return reds;
}


/*
** Initialization function
*/

void db_initialize_catree(void)
{
    return;
};

/*
** Table interface routines (i.e., what's called by the bif's)
*/

int db_create_catree(Process *p, DbTable *tbl)
{
    DbTableCATree *tb = &tbl->catree;
    DbTableCATreeNode *root;

    root = create_base_node(tb, NULL);
    tb->deletion = 0;
    tb->nr_of_deleted_items = 0;
#ifdef DEBUG
    tbl->common.status |= DB_CATREE_DEBUG_RANDOM_SPLIT_JOIN;
#endif
    erts_atomic_init_relb(&(tb->root), (erts_aint_t)root);
    return DB_ERROR_NONE;
}

static int db_first_catree(Process *p, DbTable *tbl, Eterm *ret)
{
    TreeDbTerm *root;
    CATreeRootIterator iter;
    int result;

    init_root_iterator(&tbl->catree, &iter, 1);
    root = *catree_find_first_root(&iter);
    if (!root) {
        TreeDbTerm **pp = catree_find_next_root(&iter, NULL);
        root = pp ? *pp : NULL;
    }

    result = db_first_tree_common(p, tbl, root, ret, NULL);

    destroy_root_iterator(&iter);
    return result;
}

static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTreeStack stack;
    TreeDbTerm * stack_array[STACK_NEED];
    TreeDbTerm **rootp;
    CATreeRootIterator iter;
    int result;

    init_root_iterator(&tbl->catree, &iter, 1);
    iter.next_route_key = key;
    rootp = catree_find_next_root(&iter, NULL);

    do {
        init_tree_stack(&stack, stack_array, 0);
        result = db_next_tree_common(p, tbl, (rootp ? *rootp : NULL), key, ret, &stack);
        if (result != DB_ERROR_NONE || *ret != am_EOT)
            break;

        rootp = catree_find_next_root(&iter, NULL);
    } while (rootp);

    destroy_root_iterator(&iter);
    return result;
}

static int db_last_catree(Process *p, DbTable *tbl, Eterm *ret)
{
    TreeDbTerm *root;
    CATreeRootIterator iter;
    int result;

    init_root_iterator(&tbl->catree, &iter, 1);
    root = *catree_find_last_root(&iter);
    if (!root) {
        TreeDbTerm **pp = catree_find_prev_root(&iter, NULL);
        root = pp ? *pp : NULL;
    }

    result = db_last_tree_common(p, tbl, root, ret, NULL);

    destroy_root_iterator(&iter);
    return result;
}

static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTreeStack stack;
    TreeDbTerm * stack_array[STACK_NEED];
    TreeDbTerm **rootp;
    CATreeRootIterator iter;
    int result;

    init_root_iterator(&tbl->catree, &iter, 1);
    iter.next_route_key = key;
    rootp = catree_find_prev_root(&iter, NULL);

    do {
        init_tree_stack(&stack, stack_array, 0);
        result = db_prev_tree_common(p, tbl, (rootp ? *rootp : NULL), key, ret,
                                     &stack);
        if (result != DB_ERROR_NONE || *ret != am_EOT)
            break;
        rootp = catree_find_prev_root(&iter, NULL);
    } while (rootp);

    destroy_root_iterator(&iter);
    return result;
}

static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail)
{
    DbTableCATree *tb = &tbl->catree;
    Eterm key = GETKEY(&tb->common, tuple_val(obj));
    FindBaseNode fbn;
    DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
    int result = db_put_tree_common(&tb->common, &node->u.base.root, obj,
                                    key_clash_fail, NULL);
    wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
    return result;
}

static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
    int result = db_get_tree_common(p, &tb->common,
                                    node->u.base.root,
                                    key, ret, NULL);
    runlock_base_node(node, tb);
    return result;
}

TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator* iter)
{
    FindBaseNode fbn;
    DbTableCATreeNode* base_node;

    while (1) {
        base_node = find_base_node(iter->tb, key, &fbn);
        lock_iter_base_node(iter, base_node, fbn.parent, fbn.current_level);
        if (base_node->u.base.is_valid)
            break;
        unlock_iter_base_node(iter);
    }
    return &base_node->u.base.root;
}

static Eterm save_iter_search_key(CATreeRootIterator* iter, Eterm key)
{
    Uint key_size;

    if (is_immed(key))
        return key;

    if (iter->search_key) {
        if (key == iter->search_key->term)
            return key; /* already saved */
        destroy_route_key(iter->search_key);
    }
    key_size = size_object(key);
    if (!iter->search_key || key_size > iter->search_key->size) {
        iter->search_key = erts_realloc(ERTS_ALC_T_DB_TMP,
                                        iter->search_key,
                                        (offsetof(DbRouteKey, heap)
                                         + key_size*sizeof(Eterm)));
    }
    return copy_route_key(iter->search_key, key, key_size);
}

TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter,
                                       int forward,
                                       Eterm *search_keyp)
{
#ifdef DEBUG
    DbTableCATreeNode *rejected_invalid = NULL;
    DbTableCATreeNode *rejected_empty = NULL;
#endif
    DbTableCATreeNode *node;
    DbTableCATreeNode *parent;
    DbTableCATreeNode* next_route_node;
    Eterm route_key = iter->next_route_key;
    int current_level;

    if (iter->locked_bnode) {
        if (search_keyp)
            *search_keyp = save_iter_search_key(iter, *search_keyp);
        unlock_iter_base_node(iter);
    }

    if (is_non_value(route_key))
        return NULL;

    while (1) {
        node = GET_ROOT_ACQB(iter->tb);
        current_level = 0;
        parent = NULL;
        next_route_node = NULL;
        while (!node->is_base_node) {
            current_level++;
            parent = node;
            if (forward) {
                if (cmp_key_route(route_key,node) < 0) {
                    next_route_node = node;
                    node = GET_LEFT_ACQB(node);
                } else {
                    node = GET_RIGHT_ACQB(node);
                }
            }
            else {
                if (cmp_key_route(route_key,node) > 0) {
                    next_route_node = node;
                    node = GET_RIGHT_ACQB(node);
                } else {
                    node = GET_LEFT_ACQB(node);
                }
            }
        }
        ASSERT(node != rejected_invalid);
        lock_iter_base_node(iter, node, parent, current_level);
        if (node->u.base.is_valid) {
            ASSERT(node != rejected_empty);
            if (node->u.base.root) {
                iter->next_route_key = (next_route_node ?
                                        next_route_node->u.route.key.term :
                                        THE_NON_VALUE);
                iter->locked_bnode = node;
                return &node->u.base.root;
            }
            if (!next_route_node) {
                unlock_iter_base_node(iter);
                return NULL;
            }
            route_key = next_route_node->u.route.key.term;
            IF_DEBUG(rejected_empty = node);
        }
        else
            IF_DEBUG(rejected_invalid = node);

        /* Retry */
        unlock_iter_base_node(iter);
    }
}

TreeDbTerm** catree_find_next_root(CATreeRootIterator *iter, Eterm* keyp)
{
    return catree_find_nextprev_root(iter, 1, keyp);
}

TreeDbTerm** catree_find_prev_root(CATreeRootIterator *iter, Eterm* keyp)
{
    return catree_find_nextprev_root(iter, 0, keyp);
}

/* @brief Find root of tree where object with smallest key of all larger than
 * partially bound key may reside. Can be used as a starting point for
 * a reverse iteration with pb_key.
 *
 * @param pb_key The partially bound key. Example {42, '$1'}
 * @param iter An initialized root iterator.
 *
 * @return Pointer to found root pointer. May not be NULL.
 */
TreeDbTerm** catree_find_next_from_pb_key_root(Eterm pb_key,
                                               CATreeRootIterator* iter)
{
#ifdef DEBUG
    DbTableCATreeNode *rejected_base = NULL;
#endif
    DbTableCATreeNode *node;
    DbTableCATreeNode *parent;
    DbTableCATreeNode* next_route_node;
    int current_level;

    ASSERT(!iter->locked_bnode);

    while (1) {
        node = GET_ROOT_ACQB(iter->tb);
        current_level = 0;
        parent = NULL;
        next_route_node = NULL;
        while (!node->is_base_node) {
            current_level++;
            parent = node;
            if (cmp_partly_bound(pb_key, GET_ROUTE_NODE_KEY(node)) >= 0) {
                next_route_node = node;
                node = GET_RIGHT_ACQB(node);
            } else {
                node = GET_LEFT_ACQB(node);
            }
        }
        ASSERT(node != rejected_base);
        lock_iter_base_node(iter, node, parent, current_level);
        if (node->u.base.is_valid) {
            iter->next_route_key = (next_route_node ?
                                    next_route_node->u.route.key.term :
                                    THE_NON_VALUE);
            return &node->u.base.root;
        }
        /* Retry */
        unlock_iter_base_node(iter);
#ifdef DEBUG
        rejected_base = node;
#endif
    }
}

/* @brief Find root of tree where object with largest key of all smaller than
 * partially bound key may reside. Can be used as a starting point for
 * a forward iteration with pb_key.
 *
 * @param pb_key The partially bound key. Example {42, '$1'}
 * @param iter An initialized root iterator.
 *
 * @return Pointer to found root pointer. May not be NULL.
 */
TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key,
                                               CATreeRootIterator* iter)
{
#ifdef DEBUG
    DbTableCATreeNode *rejected_base = NULL;
#endif
    DbTableCATreeNode *node;
    DbTableCATreeNode *parent;
    DbTableCATreeNode* next_route_node;
    int current_level;

    ASSERT(!iter->locked_bnode);

    while (1) {
        node = GET_ROOT_ACQB(iter->tb);
        current_level = 0;
        parent = NULL;
        next_route_node = NULL;
        while (!node->is_base_node) {
            current_level++;
            parent = node;
            if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) <= 0) {
                next_route_node = node;
                node = GET_LEFT_ACQB(node);
            } else {
                node = GET_RIGHT_ACQB(node);
            }
        }
        ASSERT(node != rejected_base);
        lock_iter_base_node(iter, node, parent, current_level);
        if (node->u.base.is_valid) {
            iter->next_route_key = (next_route_node ?
                                    next_route_node->u.route.key.term :
                                    THE_NON_VALUE);
            return &node->u.base.root;
        }
        /* Retry */
        unlock_iter_base_node(iter);
#ifdef DEBUG
        rejected_base = node;
#endif
    }
}

static TreeDbTerm** catree_find_firstlast_root(CATreeRootIterator* iter,
                                               int first)
{
#ifdef DEBUG
    DbTableCATreeNode *rejected_base = NULL;
#endif
    DbTableCATreeNode *node;
    DbTableCATreeNode* next_route_node;
    int current_level;

    while (1) {
        node = GET_ROOT_ACQB(iter->tb);
        current_level = 0;
        next_route_node = NULL;
        while (!node->is_base_node) {
            current_level++;
            next_route_node = node;
            node = first ? GET_LEFT_ACQB(node) : GET_RIGHT_ACQB(node);
        }
        ASSERT(node != rejected_base);
        lock_iter_base_node(iter, node, next_route_node, current_level);
        if (node->u.base.is_valid) {
            iter->next_route_key = (next_route_node ?
                                    next_route_node->u.route.key.term :
                                    THE_NON_VALUE);
            return &node->u.base.root;
        }
        /* Retry */
        unlock_iter_base_node(iter);
#ifdef DEBUG
        rejected_base = node;
#endif
    }
}

TreeDbTerm** catree_find_first_root(CATreeRootIterator* iter)
{
    return catree_find_firstlast_root(iter, 1);
}

TreeDbTerm** catree_find_last_root(CATreeRootIterator* iter)
{
    return catree_find_firstlast_root(iter, 0);
}

static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
    int result = db_member_tree_common(&tb->common,
                                       node->u.base.root,
                                       key, ret, NULL);
    runlock_base_node(node, tb);
    return result;
}

static int db_get_element_catree(Process *p, DbTable *tbl,
			       Eterm key, int ndex, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
    int result = db_get_element_tree_common(p, &tb->common,
                                            node->u.base.root,
                                            key, ndex, ret, NULL);
    runlock_base_node(node, tb);
    return result;
}

static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    FindBaseNode fbn;
    DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
    int result = db_erase_tree_common(tbl, &node->u.base.root, key,
                                      ret, NULL);
    wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
    return result;
}

static int db_erase_object_catree(DbTable *tbl, Eterm object, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    Eterm key = GETKEY(&tb->common, tuple_val(object));
    FindBaseNode fbn;
    DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
    int result = db_erase_object_tree_common(tbl,
                                             &node->u.base.root,
                                             object,
                                             ret,
                                             NULL);
    wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
    return result;
}


static int db_slot_catree(Process *p, DbTable *tbl,
                          Eterm slot_term, Eterm *ret)
{
    int result;
    CATreeRootIterator iter;

    init_root_iterator(&tbl->catree, &iter, 1);
    result = db_slot_tree_common(p, tbl, *catree_find_first_root(&iter),
                                 slot_term, ret, NULL, &iter);
    destroy_root_iterator(&iter);
    return result;
}

static int db_select_continue_catree(Process *p,
                                     DbTable *tbl,
                                     Eterm continuation,
                                     Eterm *ret,
                                     enum DbIterSafety* safety_p)
{
    int result;
    CATreeRootIterator iter;

    init_root_iterator(&tbl->catree, &iter, 1);
    result = db_select_continue_tree_common(p, &tbl->common,
                                            continuation, ret, NULL, &iter);
    destroy_root_iterator(&iter);
    return result;
}

static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
                            Eterm pattern, int reverse, Eterm *ret,
                            enum DbIterSafety safety)
{
    int result;
    CATreeRootIterator iter;

    init_root_iterator(&tbl->catree, &iter, 1);
    result = db_select_tree_common(p, tbl, tid, pattern, reverse, ret,
                                   NULL, &iter);
    destroy_root_iterator(&iter);
    return result;
}

static int db_select_count_continue_catree(Process *p,
                                           DbTable *tbl,
                                           Eterm continuation,
                                           Eterm *ret,
                                           enum DbIterSafety* safety_p)
{
    int result;
    CATreeRootIterator iter;

    init_root_iterator(&tbl->catree, &iter, 1);
    result = db_select_count_continue_tree_common(p, tbl,
                                                  continuation, ret, NULL,
                                                  &iter);
    destroy_root_iterator(&iter);
    return result;
}

static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
                                  Eterm pattern, Eterm *ret,
                                  enum DbIterSafety safety)
{
    int result;
    CATreeRootIterator iter;

    init_root_iterator(&tbl->catree, &iter, 1);
    result = db_select_count_tree_common(p, tbl,
                                         tid, pattern, ret, NULL, &iter);
    destroy_root_iterator(&iter);
    return result;
}

static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
                                  Eterm pattern, Sint chunk_size,
                                  int reversed, Eterm *ret,
                                  enum DbIterSafety safety)
{
    int result;
    CATreeRootIterator iter;

    init_root_iterator(&tbl->catree, &iter, 1);
    result = db_select_chunk_tree_common(p, tbl,
                                         tid, pattern, chunk_size, reversed, ret,
                                         NULL, &iter);
    destroy_root_iterator(&iter);
    return result;
}

static int db_select_delete_continue_catree(Process *p,
                                            DbTable *tbl,
                                            Eterm continuation,
                                            Eterm *ret,
                                            enum DbIterSafety* safety_p)
{
    DbTreeStack stack;
    TreeDbTerm * stack_array[STACK_NEED];
    int result;
    CATreeRootIterator iter;

    init_root_iterator(&tbl->catree, &iter, 0);
    init_tree_stack(&stack, stack_array, 0);
    result = db_select_delete_continue_tree_common(p, tbl, continuation, ret,
                                                   &stack, &iter);
    destroy_root_iterator(&iter);
    return result;
}

static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
                                   Eterm pattern, Eterm *ret,
                                   enum DbIterSafety safety)
{
    DbTreeStack stack;
    TreeDbTerm * stack_array[STACK_NEED];
    int result;
    CATreeRootIterator iter;

    init_root_iterator(&tbl->catree, &iter, 0);
    init_tree_stack(&stack, stack_array, 0);
    result = db_select_delete_tree_common(p, tbl,
                                          tid, pattern, ret, &stack,
                                          &iter);
    destroy_root_iterator(&iter);
    return result;
}

static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
                                    Eterm pattern, Eterm *ret,
                                    enum DbIterSafety safety_p)
{
    int result;
    CATreeRootIterator iter;

    init_root_iterator(&tbl->catree, &iter, 0);
    result = db_select_replace_tree_common(p, tbl,
                                           tid, pattern, ret, NULL, &iter);
    destroy_root_iterator(&iter);
    return result;
}

static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
                                             Eterm continuation, Eterm *ret,
                                             enum DbIterSafety* safety_p)
{
    int result;
    CATreeRootIterator iter;

    init_root_iterator(&tbl->catree, &iter, 0);
    result = db_select_replace_continue_tree_common(p, tbl, continuation, ret,
                                                    NULL, &iter);
    destroy_root_iterator(&iter);
    return result;
}

static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    FindBaseNode fbn;
    DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
    int result = db_take_tree_common(p, tbl, &node->u.base.root, key,
                                     ret, NULL);
    wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
    return result;
}

/*
** Other interface routines (not directly coupled to one bif)
*/


/* Display tree contents (for dump) */
static void db_print_catree(fmtfn_t to, void *to_arg,
                            int show, DbTable *tbl)
{
    CATreeRootIterator iter;
    TreeDbTerm** root;

    init_root_iterator(&tbl->catree, &iter, 1);
    root = catree_find_first_root(&iter);
    do {
        db_print_tree_common(to, to_arg, show, *root, tbl);
        root = catree_find_next_root(&iter, NULL);
    } while (root);
    destroy_root_iterator(&iter);
}

/* Release all memory occupied by a single table */
static int db_free_table_catree(DbTable *tbl)
{
    while (db_free_table_continue_catree(tbl, ERTS_SWORD_MAX) < 0)
	;
    return 1;
}

static
int db_catree_nr_of_items_deleted_wb_dtor(Binary *context_bin) {
    (void)context_bin;
    return 1;
}

typedef struct {
    Uint nr_of_deleted_items;
} DbCATreeNrOfItemsDeletedWb;

static Eterm
create_and_install_num_of_deleted_items_wb_bin(Process *p, DbTableCATree *tb)
{
    Binary* bin =
        erts_create_magic_binary(sizeof(DbCATreeNrOfItemsDeletedWb),
                                 db_catree_nr_of_items_deleted_wb_dtor);
    DbCATreeNrOfItemsDeletedWb* data = ERTS_MAGIC_BIN_DATA(bin);
    Eterm* hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE);
    Eterm mref = erts_mk_magic_ref(&hp, &MSO(p), bin);
    data->nr_of_deleted_items = 0;
    tb->nr_of_deleted_items_wb = bin;
    erts_refc_inctest(&bin->intern.refc, 2);   
    return mref;
}

static Eterm db_delete_all_objects_get_nitems_from_holder_catree(Process* p,
                                                                 Eterm mref)
{
    Binary* bin = erts_magic_ref2bin(mref);
    DbCATreeNrOfItemsDeletedWb* data = ERTS_MAGIC_BIN_DATA(bin);
    return erts_make_integer(data->nr_of_deleted_items, p);
}

static SWord db_delete_all_objects_catree(Process* p,
                                          DbTable* tbl,
                                          SWord reds,
                                          Eterm* nitems_holder_wb)
{
    DbTableCATree *tb = &tbl->catree;
    DbCATreeNrOfItemsDeletedWb* data;
    if (!tb->deletion) {
        *nitems_holder_wb =
            create_and_install_num_of_deleted_items_wb_bin(p, tb);
    }
    reds = db_free_table_continue_catree(tbl, reds);
    if (reds < 0)
        return reds;
    data = ERTS_MAGIC_BIN_DATA(tb->nr_of_deleted_items_wb);
    data->nr_of_deleted_items = tb->nr_of_deleted_items;
    erts_bin_release(tb->nr_of_deleted_items_wb);
    db_create_catree(p, tbl);
    return reds;
}


static void do_for_route_nodes(DbTableCATreeNode* node,
                               void (*func)(ErlOffHeap *, void *),
                               void *arg)
{
    ErlOffHeap tmp_offheap;

    if (!GET_LEFT(node)->is_base_node)
        do_for_route_nodes(GET_LEFT(node), func, arg);

    tmp_offheap.first = node->u.route.key.oh;
    tmp_offheap.overhead = 0;
    (*func)(&tmp_offheap, arg);

    if (!GET_RIGHT(node)->is_base_node)
        do_for_route_nodes(GET_RIGHT(node), func, arg);
}

static void db_foreach_offheap_catree(DbTable *tbl,
                                      void (*func)(ErlOffHeap *, void *),
                                      void *arg)
{
    DbTableCATree* tb = &tbl->catree;
    CATreeRootIterator iter;
    TreeDbTerm** root;

    if (!GET_ROOT(tb)) {
        ASSERT(tb->common.status & DB_DELETE);
        return;
    }
    init_root_iterator(tb, &iter, 1);
    root = catree_find_first_root(&iter);
    do {
        db_foreach_offheap_tree_common(*root, func, arg);
        root = catree_find_next_root(&iter, NULL);
    } while (root);
    destroy_root_iterator(&iter);

    do_for_route_nodes(GET_ROOT(tb), func, arg);
}

static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
                                   DbUpdateHandle *handle)
{
    DbTableCATree *tb = &tbl->catree;
    FindBaseNode fbn;
    DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
    int res = db_lookup_dbterm_tree_common(p, tbl, &node->u.base.root, key,
                                           obj, handle, NULL);
    if (res == 0) {
        wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
    } else {
        /* db_finalize_dbterm_catree will unlock */
        handle->u.catree.base_node = node;
        handle->u.catree.parent = fbn.parent;
        handle->u.catree.current_level = fbn.current_level;
    }
    return res;
}

static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *handle)
{
    DbTableCATree *tb = &(handle->tb->catree);
    db_finalize_dbterm_tree_common(cret, handle, NULL);
    wunlock_adapt_base_node(tb, handle->u.catree.base_node,
                            handle->u.catree.parent,
                            handle->u.catree.current_level);
    return;
}

#ifdef ERTS_ENABLE_LOCK_COUNT
static void erts_lcnt_enable_db_catree_lock_count_helper(DbTableCATree *tb,
                                                         DbTableCATreeNode *node,
                                                         int enable)
{
    erts_lcnt_ref_t *lcnt_ref;
    erts_lock_flags_t lock_type;
    if (node->is_base_node) {
        lcnt_ref = &GET_BASE_NODE_LOCK(node)->lcnt;
        lock_type = ERTS_LOCK_TYPE_RWMUTEX;
    } else {
        erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_LEFT(node), enable);
        erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_RIGHT(node), enable);
        lcnt_ref = &GET_ROUTE_NODE_LOCK(node)->lcnt;
        lock_type = ERTS_LOCK_TYPE_MUTEX;
    }
    if (enable) {
        erts_lcnt_install_new_lock_info(lcnt_ref, "db_hash_slot", tb->common.the_name,
                                        lock_type | ERTS_LOCK_FLAGS_CATEGORY_DB);
    } else {
        erts_lcnt_uninstall(lcnt_ref);
    }
}

void erts_lcnt_enable_db_catree_lock_count(DbTableCATree *tb, int enable)
{
    erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_ROOT(tb), enable);
}
#endif /* ERTS_ENABLE_LOCK_COUNT */

void db_catree_force_split(DbTableCATree* tb, int on)
{
    CATreeRootIterator iter;
    TreeDbTerm** root;

    init_root_iterator(tb, &iter, 1);
    root = catree_find_first_root(&iter);
    do {
        BASE_NODE_STAT_SET(iter.locked_bnode, (on ? INT_MAX : 0));
        root = catree_find_next_root(&iter, NULL);
    } while (root);
    destroy_root_iterator(&iter);

    if (on)
        tb->common.status |= DB_CATREE_FORCE_SPLIT;
    else
        tb->common.status &= ~DB_CATREE_FORCE_SPLIT;
}

void db_catree_debug_random_split_join(DbTableCATree* tb, int on)
{
    if (on)
        tb->common.status |= DB_CATREE_DEBUG_RANDOM_SPLIT_JOIN;
    else
        tb->common.status &= ~DB_CATREE_DEBUG_RANDOM_SPLIT_JOIN;
}

void db_calc_stats_catree(DbTableCATree* tb, DbCATreeStats* stats)
{
    DbTableCATreeNode* stack[ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT];
    DbTableCATreeNode* node;
    Uint depth = 0;

    stats->route_nodes = 0;
    stats->base_nodes = 0;
    stats->max_depth = 0;

    node = GET_ROOT(tb);
    do {
        while (!node->is_base_node) {
            stats->route_nodes++;
            ASSERT(depth < sizeof(stack)/sizeof(*stack));
            stack[depth++] = node;  /* PUSH parent */
            if (stats->max_depth < depth)
                stats->max_depth = depth;
            node = GET_LEFT(node);
        }
        stats->base_nodes++;

        while (depth > 0) {
            DbTableCATreeNode* parent = stack[depth-1];
            if (node == GET_LEFT(parent)) {
                node = GET_RIGHT(parent);
                break;
            }
            else {
                ASSERT(node == GET_RIGHT(parent));
                node = parent;
                depth--; /* POP parent */
            }
        }
    } while (depth > 0);
}

struct debug_catree_fa {
    void (*func)(ErlOffHeap *, void *);
    void *arg;
};

static void debug_free_route_node(void *vfap, ErtsThrPrgrVal val, void *vnp)
{
    DbTableCATreeNode *np = vnp;
    if (np->u.route.key.oh) {
        struct debug_catree_fa *fap = vfap;
        ErlOffHeap oh;
        ERTS_INIT_OFF_HEAP(&oh);
        oh.first = np->u.route.key.oh;
        (*fap->func)(&oh, fap->arg);
    }
}

void
erts_db_foreach_thr_prgr_offheap_catree(void (*func)(ErlOffHeap *, void *),
                                        void *arg)
{
    struct debug_catree_fa fa;
    fa.func = func;
    fa.arg = arg;
    erts_debug_later_op_foreach(do_free_route_node, debug_free_route_node, &fa);
}


#ifdef HARDDEBUG

/*
 * Not called, but kept as it might come to use
 */
static inline int my_check_table_tree(TreeDbTerm *t)
{
    int lh, rh;
    if (t == NULL)
	return 0;
    lh = my_check_table_tree(t->left);
    rh = my_check_table_tree(t->right);
    if ((rh - lh) != t->balance) {
	erts_fprintf(stderr, "Invalid tree balance for this node:\n");
	erts_fprintf(stderr,"balance = %d, left = 0x%08X, right = 0x%08X\n",
		     t->balance, t->left, t->right);
	erts_fprintf(stderr,"\nDump:\n---------------------------------\n");
	erts_fprintf(stderr,"\n---------------------------------\n");
        abort();
    }
    return ((rh > lh) ? rh : lh) + 1;
}

#endif