aboutsummaryrefslogblamecommitdiffstats
path: root/erts/emulator/beam/erl_db_catree.c
blob: 71768adcda865f0acf2c6cc3e6687cfd7ff94dac (plain) (tree)
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742



















































































































































































































                                                                                   


                                                           





                                                                            

                                                                                                                      
                                                                            

                                                                                                                     



                                                                                  

                                                                                                                            
                                                                                  

                                                                                                                           


























































































































































































































































































































































































































































































































                                                                                     
                                                               


















































                                                                                                            
                                         










                                                                                                     


                                   





                                                           








                                                           
                                                          












                                                                       
                                           



















                                                                       
                                         
 



                                                     

 
                                                                          
 












                                                              

                                      







                                                                           

 
 










































































































                                                                             
                                                                               

                                        
                                                        







































                                                                                               
                                          





























                                                                                            
                                      
























                                                                                            
                                          










































































                                                                                                              
                                          
























                                                                        
                                                          








                                                           
                                        

                                                                                       
                                                         

















                                                            
                                                  
















                                                                       
                                    
                                                    
                                                           
















                                                                           















                                                               





























                                                                    

                                                            
                           
                                                         






















                                                                                   
                                                          








                                               
                                        

                                                                                       
                                                         






















                                                                    
                                                          
















                                                                               
                                            










                                                                                       
                                                         






















                                                                    
                                                          
















                                                                              
                                            




















                                                                            














                                                               











                                                             
                                                          



                                   
                                             

















                                                      

                                                      












                                                                                      




                                                         










                                               
                                      









                                                                    
                                                         





































                                                                                 
                                                 







































                                                                      
                                                   

                                                                    
                                                                           

































                                                                
                                                                                              









































                                                                                                                                       

                                                                           












                                                                          

                                                                              







































































































                                                                                              
                                                           
                                                       
                                       











                                                        
                                                                               
                                                                     
                                       










                                                                   
                                                                      

                                                              
                                       












                                                                 
                                                                     
                                                                           
                                       









                                                                      
                                                                            
                                                                  
                                       










                                                                      
                                                                            
                                                                                        
                                       














                                                               
                                                                              
                                                                              
                                       












                                                                       
                                                                     
                                                                     
                                       










                                                                        
                                                              
                                                                    
                                       










                                                                            
                                                                       
                                                                             
                                       


























                                                                                   

                                                                   






































                                                                              
                                                                           




































                                                                               

                                                                 

























                                                                                                      
                                                             



























































                                                                                      
/*
 * %CopyrightBegin%
 *
 * Copyright Ericsson AB and Kjell Winblad 1998-2018. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * %CopyrightEnd%
 */

/*
 * Description: Implementation of ETS ordered_set table type with
 *              fine-grained synchronization.
 *
 * Author: 	Kjell Winblad
 *
 * This implementation is based on the contention adapting search tree
 * (CA tree). The CA tree is a concurrent data structure that
 * dynamically adapts its synchronization granularity based on how
 * much contention is detected in locks. The following publication
 * contains a detailed description of CA trees:
 * 
 * A Contention Adapting Approach to Concurrent Ordered Sets
 * Journal of Parallel and Distributed Computing, 2018
 * Kjell Winblad and Konstantinos Sagonas
 * https://doi.org/10.1016/j.jpdc.2017.11.007
 *
 * The following publication may also be interesting as it discusses
 * how the CA tree can be used as an ETS ordered_set table type
 * backend:
 *
 * More Scalable Ordered Set for ETS Using Adaptation
 * In Thirteenth ACM SIGPLAN workshop on Erlang (2014)
 * Kjell Winblad and Konstantinos Sagonas
 * https://doi.org/10.1145/2633448.2633455
 *
 * This implementation of the ordered_set ETS table type is only
 * activated when the options {write_concurrency, true}, public and
 * ordered_set are passed to the ets:new/2 function. This
 * implementation is expected to scale better than the default
 * implementation (located in "erl_db_tree.c") when concurrent
 * processes use the following ETS operations to operate on a table:
 * 
 * delete/2, delete_object/2, first/1, insert/2 (single object),
 * insert_new/2 (single object), lookup/2, lookup_element/2, member/2,
 * next/2, take/2 and update_element/3 (single object).
 *
 * Currently, the implementation does not have scalable support for
 * the other operations (e.g., select/2). These operations are handled
 * by merging all locks so that all terms get protected by a single
 * lock. This implementation may thus perform worse than the default
 * implementation in some scenarios. For example, when concurrent
 * processes access a table with the operations insert/2, delete/2 and
 * select/2, the insert/2 and delete/2 operations will trigger splits
 * of locks (to get more fine-grained synchronization) but this will
 * quickly be undone by the select/2 operation if this operation is
 * also called frequently.
 *
 * The default implementation has a static stack optimization (see
 * get_static_stack in erl_db_tree.c). This implementation does not
 * have such an optimization as it induces bad scalability when
 * concurrent read operations are frequent (they all try to get hold
 * of the same stack). The default implementation may thus perform
 * better compared to this implementation in scenarios where the
 * static stack optimization is useful. One such scenario is when only
 * one process is accessing the table and this process is traversing
 * the table with a sequence of next/2 calls.
 */

#ifdef HAVE_CONFIG_H
#  include "config.h"
#endif

#include "sys.h"
#include "erl_vm.h"
#include "global.h"
#include "erl_process.h"
#include "error.h"
#define ERTS_WANT_DB_INTERNAL__
#include "erl_db.h"
#include "bif.h"
#include "big.h"
#include "erl_binary.h"

#include "erl_db_catree.h"
#include "erl_db_tree.h"
#include "erl_db_tree_util.h"

/*
** Forward declarations
*/

static SWord do_free_base_node_cont(DbTableCATree *tb, SWord num_left);
static SWord do_free_routing_nodes_catree_cont(DbTableCATree *tb, SWord num_left);
static DbTableCATreeNode *catree_first_base_node_from_free_list(DbTableCATree *tb);

/* Method interface functions */
static int db_first_catree(Process *p, DbTable *tbl,
                           Eterm *ret);
static int db_next_catree(Process *p, DbTable *tbl,
                          Eterm key, Eterm *ret);
static int db_last_catree(Process *p, DbTable *tbl,
                          Eterm *ret);
static int db_prev_catree(Process *p, DbTable *tbl,
                          Eterm key,
                          Eterm *ret);
static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail);
static int db_get_catree(Process *p, DbTable *tbl,
                         Eterm key,  Eterm *ret);
static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret);
static int db_get_element_catree(Process *p, DbTable *tbl,
                                 Eterm key,int ndex,
                                 Eterm *ret);
static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret);
static int db_erase_object_catree(DbTable *tbl, Eterm object,Eterm *ret);
static int db_slot_catree(Process *p, DbTable *tbl,
                          Eterm slot_term,  Eterm *ret);
static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
                            Eterm pattern, int reversed, Eterm *ret);
static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
                                  Eterm pattern,  Eterm *ret);
static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
                                  Eterm pattern, Sint chunk_size,
                                  int reversed, Eterm *ret);
static int db_select_continue_catree(Process *p, DbTable *tbl,
                                     Eterm continuation, Eterm *ret);
static int db_select_count_continue_catree(Process *p, DbTable *tbl,
                                           Eterm continuation, Eterm *ret);
static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
                                   Eterm pattern,  Eterm *ret);
static int db_select_delete_continue_catree(Process *p, DbTable *tbl, 
                                            Eterm continuation, Eterm *ret);
static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
                                    Eterm pattern, Eterm *ret);
static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
                                             Eterm continuation, Eterm *ret);
static int db_take_catree(Process *, DbTable *, Eterm, Eterm *);
static void db_print_catree(fmtfn_t to, void *to_arg,
                            int show, DbTable *tbl);
static int db_free_table_catree(DbTable *tbl);
static SWord db_free_table_continue_catree(DbTable *tbl, SWord);
static void db_foreach_offheap_catree(DbTable *,
                                      void (*)(ErlOffHeap *, void *),
                                      void *);
static SWord db_delete_all_objects_catree(Process* p, DbTable* tbl, SWord reds);
static int
db_lookup_dbterm_catree(Process *, DbTable *, Eterm key, Eterm obj,
                        DbUpdateHandle*);
static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *);

/*
** External interface
*/
DbTableMethod db_catree =
{
    db_create_catree,
    db_first_catree,
    db_next_catree,
    db_last_catree,
    db_prev_catree,
    db_put_catree,
    db_get_catree,
    db_get_element_catree,
    db_member_catree,
    db_erase_catree,
    db_erase_object_catree,
    db_slot_catree,
    db_select_chunk_catree,
    db_select_catree,
    db_select_delete_catree,
    db_select_continue_catree,
    db_select_delete_continue_catree,
    db_select_count_catree,
    db_select_count_continue_catree,
    db_select_replace_catree,
    db_select_replace_continue_catree,
    db_take_catree,
    db_delete_all_objects_catree,
    db_free_table_catree,
    db_free_table_continue_catree,
    db_print_catree,
    db_foreach_offheap_catree,
    db_lookup_dbterm_catree,
    db_finalize_dbterm_catree

};

/*
 * Constants
 */

#define ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION 200
#define ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION (-1)
#define ERL_DB_CATREE_LOCK_MORE_THAN_ONE_CONTRIBUTION (-10)
#define ERL_DB_CATREE_HIGH_CONTENTION_LIMIT 1000
#define ERL_DB_CATREE_LOW_CONTENTION_LIMIT (-1000)
#define ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT 14

/*
 * Internal CA tree related helper functions and macros
 */

#define GET_ROUTE_NODE_KEY(node) (node->u.route.key.tpl[0])
#define GET_BASE_NODE_LOCK(node) (&(node->u.base.lock))
#define GET_ROUTE_NODE_LOCK(node) (&(node->u.route.lock))


/* Helpers for reading and writing shared atomic variables */

/* No memory barrier */
#define GET_ROOT(tb) ((DbTableCATreeNode*)erts_atomic_read_nob(&(tb->root)))
#define GET_LEFT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->u.route.left)))
#define GET_RIGHT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->u.route.right)))
#define SET_ROOT(tb, v) erts_atomic_set_nob(&((tb)->root), (erts_aint_t)(v))
#define SET_LEFT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->u.route.left), (erts_aint_t)(v));
#define SET_RIGHT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v));


/* Release or acquire barriers */
#define GET_ROOT_ACQB(tb) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(tb->root)))
#define GET_LEFT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->u.route.left)))
#define GET_RIGHT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->u.route.right)))
#define SET_ROOT_RELB(tb, v) erts_atomic_set_relb(&((tb)->root), (erts_aint_t)(v))
#define SET_LEFT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.left), (erts_aint_t)(v));
#define SET_RIGHT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v));

/* Compares a key to the key in a route node */
static ERTS_INLINE Sint cmp_key_route(DbTableCommon * tb,
                                      Eterm key,
                                      DbTableCATreeNode *obj)
{
    return CMP(key, GET_ROUTE_NODE_KEY(obj));
}

static ERTS_INLINE void push_node_dyn_array(DbTable *tb,
                                            CATreeNodeStack *stack,
                                            DbTableCATreeNode *node)
{
    int i;
    if (stack->pos == stack->size) {
        DbTableCATreeNode **newArray =
            erts_db_alloc(ERTS_ALC_T_DB_STK, tb,
                          sizeof(DbTableCATreeNode*) * (stack->size*2));
        for (i = 0; i < stack->pos; i++) {
            newArray[i] = stack->array[i];
        }
        if (stack->size > STACK_NEED) {
            /* Dynamically allocated array that needs to be deallocated */
            erts_db_free(ERTS_ALC_T_DB_STK, tb,
                         stack->array,
                         sizeof(DbTableCATreeNode *) * stack->size);
        }
        stack->array = newArray;
        stack->size = stack->size*2;
    }
    PUSH_NODE(stack, node);
}

/*
 * Used by the split_tree function
 */
static ERTS_INLINE
int less_than_two_elements(TreeDbTerm *root)
{
    return root == NULL || (root->left == NULL && root->right == NULL);
}

/*
 * Inserts a TreeDbTerm into a tree. Returns the new root.
 */
static ERTS_INLINE
TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data,
                              TreeDbTerm *insert_to_root,
                              TreeDbTerm *value_to_insert) {
    /* Non recursive insertion in AVL tree, building our own stack */
    TreeDbTerm **tstack[STACK_NEED];
    int tpos = 0;
    int dstack[STACK_NEED+1];
    int dpos = 0;
    int state = 0;
    TreeDbTerm * base = insert_to_root;
    TreeDbTerm **this = &base;
    Sint c;
    Eterm key;
    int dir;
    TreeDbTerm *p1, *p2, *p;

    key = GETKEY(common_table_data, value_to_insert->dbterm.tpl);

    dstack[dpos++] = DIR_END;
    for (;;)
	if (!*this) { /* Found our place */
	    state = 1;
	    *this = value_to_insert;
	    (*this)->balance = 0;
	    (*this)->left = (*this)->right = NULL;
	    break;
	} else if ((c = cmp_key(common_table_data, key, *this)) < 0) {
	    /* go lefts */
	    dstack[dpos++] = DIR_LEFT;
	    tstack[tpos++] = this;
	    this = &((*this)->left);
	} else { /* go right */
	    dstack[dpos++] = DIR_RIGHT;
	    tstack[tpos++] = this;
	    this = &((*this)->right);
	}

    while (state && ( dir = dstack[--dpos] ) != DIR_END) {
	this = tstack[--tpos];
	p = *this;
	if (dir == DIR_LEFT) {
	    switch (p->balance) {
	    case 1:
		p->balance = 0;
		state = 0;
		break;
	    case 0:
		p->balance = -1;
		break;
	    case -1: /* The icky case */
		p1 = p->left;
		if (p1->balance == -1) { /* Single LL rotation */
		    p->left = p1->right;
		    p1->right = p;
		    p->balance = 0;
		    (*this) = p1;
		} else { /* Double RR rotation */
		    p2 = p1->right;
		    p1->right = p2->left;
		    p2->left = p1;
		    p->left = p2->right;
		    p2->right = p;
		    p->balance = (p2->balance == -1) ? +1 : 0;
		    p1->balance = (p2->balance == 1) ? -1 : 0;
		    (*this) = p2;
		}
		(*this)->balance = 0;
		state = 0;
		break;
	    }
	} else { /* dir == DIR_RIGHT */
	    switch (p->balance) {
	    case -1:
		p->balance = 0;
		state = 0;
		break;
	    case 0:
		p->balance = 1;
		break;
	    case 1:
		p1 = p->right;
		if (p1->balance == 1) { /* Single RR rotation */
		    p->right = p1->left;
		    p1->left = p;
		    p->balance = 0;
		    (*this) = p1;
		} else { /* Double RL rotation */
		    p2 = p1->left;
		    p1->left = p2->right;
		    p2->right = p1;
		    p->right = p2->left;
		    p2->left = p;
		    p->balance = (p2->balance == 1) ? -1 : 0;
		    p1->balance = (p2->balance == -1) ? 1 : 0;
		    (*this) = p2;
		}
		(*this)->balance = 0;
		state = 0;
		break;
	    }
	}
    }
    return base;
}

/*
 * Split an AVL tree into two trees. The function stores the node
 * containing the "split key" in the write back parameter
 * split_key_wb. The function stores the left tree containing the keys
 * that are smaller than the "split key" in the write back parameter
 * left_wb and the tree containing the rest of the keys in the write
 * back parameter right_wb.
 */
static void split_tree(DbTableCommon *tb,
                       TreeDbTerm *root,
                       TreeDbTerm **split_key_node_wb,
                       TreeDbTerm **left_wb,
                       TreeDbTerm **right_wb) {
    TreeDbTerm * split_node = NULL;
    TreeDbTerm * left_root;
    TreeDbTerm * right_root;
    if (root->left == NULL) { /* To get non empty split */
        *right_wb = root->right;
        *split_key_node_wb = root->right;
        root->right = NULL;
        root->balance = 0;
        *left_wb = root;
        return;
    }
    split_node = root;
    left_root = split_node->left;
    split_node->left = NULL;
    right_root = split_node->right;
    split_node->right = NULL;
    right_root = insert_TreeDbTerm(tb,
                                   right_root,
                                   split_node);
    *split_key_node_wb = split_node;
    *left_wb = left_root;
    *right_wb = right_root;
}

/*
 * Used by the join_trees function
 */
static ERTS_INLINE int compute_tree_hight(TreeDbTerm * root)
{
    if(root == NULL) {
        return 0;
    } else {
        TreeDbTerm * current_node = root;
        int hight_so_far = 1;
        while (current_node->left != NULL || current_node->right != NULL) {
            if (current_node->balance == -1) {
                current_node = current_node->left;
            } else {
                current_node = current_node->right;
            }
            hight_so_far = hight_so_far + 1;
        }
        return hight_so_far;
    }
}

/*
 * Used by the join_trees function
 */
static ERTS_INLINE
TreeDbTerm* linkout_min_or_max_tree_node(TreeDbTerm **root, int is_min)
{
    TreeDbTerm **tstack[STACK_NEED];
    int tpos = 0;
    int dstack[STACK_NEED+1];
    int dpos = 0;
    int state = 0;
    TreeDbTerm **this = root;
    int dir;
    TreeDbTerm *q = NULL;

    dstack[dpos++] = DIR_END;
    for (;;) {
        if (!*this) { /* Failure */
            return NULL;
        } else if (is_min && (*this)->left != NULL) {
            dstack[dpos++] = DIR_LEFT;
            tstack[tpos++] = this;
            this = &((*this)->left);
        } else if (!is_min && (*this)->right != NULL) {
            dstack[dpos++] = DIR_RIGHT;
            tstack[tpos++] = this;
            this = &((*this)->right);
        } else { /* Min value, found the one to splice out */
            q = (*this);
            if (q->right == NULL) {
                (*this) = q->left;
                state = 1;
            } else if (q->left == NULL) {
                (*this) = q->right;
                state = 1;
            }
            break;
        }
    }
    while (state && ( dir = dstack[--dpos] ) != DIR_END) {
        this = tstack[--tpos];
        if (dir == DIR_LEFT) {
            state = tree_balance_left(this);
        } else {
            state = tree_balance_right(this);
        }
    }
    return q;
}

#define LINKOUT_MIN_TREE_NODE(root) linkout_min_or_max_tree_node(root, 1)
#define LINKOUT_MAX_TREE_NODE(root) linkout_min_or_max_tree_node(root, 0)

/*
 * Joins two AVL trees where all the keys in the left one are smaller
 * then the keys in the right one and returns the resulting tree.
 *
 * The algorithm is described on page 474 in D. E. Knuth. The Art of
 * Computer Programming: Sorting and Searching,
 * vol. 3. Addison-Wesley, 2nd edition, 1998.
 */
static TreeDbTerm* join_trees(TreeDbTerm *left_root_param,
                               TreeDbTerm *right_root_param)
{
    TreeDbTerm **tstack[STACK_NEED];
    int tpos = 0;
    int dstack[STACK_NEED+1];
    int dpos = 0;
    int state = 1;
    TreeDbTerm **this;
    int dir;
    TreeDbTerm *p1, *p2, *p;
    TreeDbTerm *left_root = left_root_param;
    TreeDbTerm *right_root = right_root_param;
    int left_height;
    int right_height;
    int current_height;
    dstack[dpos++] = DIR_END;
    if (left_root == NULL) {
        return right_root;
    } else if (right_root == NULL) {
        return left_root;
    }

    left_height = compute_tree_hight(left_root);
    right_height = compute_tree_hight(right_root);
    if (left_height >= right_height) {
        TreeDbTerm * new_root =
            LINKOUT_MIN_TREE_NODE(&right_root);
        int new_right_height = compute_tree_hight(right_root);
        TreeDbTerm * current_node = left_root;
        this = &left_root;
        current_height = left_height;
        while(current_height > new_right_height + 1) {
            if (current_node->balance == -1) {
                current_height = current_height - 2;
            } else {
                current_height = current_height - 1;
            }
            dstack[dpos++] = DIR_RIGHT;
            tstack[tpos++] = this;
            this = &((*this)->right);
            current_node = current_node->right;
        }
        new_root->left = current_node;
        new_root->right = right_root;
        new_root->balance = new_right_height - current_height;
        *this = new_root;
    } else {
        /* This case is symmetric to the previous case */
        TreeDbTerm * new_root =
            LINKOUT_MAX_TREE_NODE(&left_root);
        int new_left_height = compute_tree_hight(left_root);
        TreeDbTerm * current_node = right_root;
        this = &right_root;
        current_height = right_height;
        while (current_height > new_left_height + 1) {
            if (current_node->balance == 1) {
                current_height = current_height - 2;
            } else {
                current_height = current_height - 1;
            }
            dstack[dpos++] = DIR_LEFT;
            tstack[tpos++] = this;
            this = &((*this)->left);
            current_node = current_node->left;
        }
        new_root->right = current_node;
        new_root->left = left_root;
        new_root->balance = current_height - new_left_height;
        *this = new_root;
    }
    /* Now we need to continue as if this was during the insert */
    while (state && ( dir = dstack[--dpos] ) != DIR_END) {
        this = tstack[--tpos];
        p = *this;
        if (dir == DIR_LEFT) {
            switch (p->balance) {
                case 1:
                p->balance = 0;
                state = 0;
                break;
                case 0:
                p->balance = -1;
                break;
                case -1: /* The icky case */
                p1 = p->left;
                if (p1->balance == -1) { /* Single LL rotation */
                    p->left = p1->right;
                    p1->right = p;
                    p->balance = 0;
                    (*this) = p1;
                } else { /* Double RR rotation */
                    p2 = p1->right;
                    p1->right = p2->left;
                    p2->left = p1;
                    p->left = p2->right;
                    p2->right = p;
                    p->balance = (p2->balance == -1) ? +1 : 0;
                    p1->balance = (p2->balance == 1) ? -1 : 0;
                    (*this) = p2;
                }
                (*this)->balance = 0;
                state = 0;
                break;
            }
        } else { /* dir == DIR_RIGHT */
            switch (p->balance) {
                case -1:
                p->balance = 0;
                state = 0;
                break;
                case 0:
                p->balance = 1;
                break;
                case 1:
                p1 = p->right;
                if (p1->balance == 1) { /* Single RR rotation */
                    p->right = p1->left;
                    p1->left = p;
                    p->balance = 0;
                    (*this) = p1;
                } else { /* Double RL rotation */
                    p2 = p1->left;
                    p1->left = p2->right;
                    p2->right = p1;
                    p->right = p2->left;
                    p2->left = p;
                    p->balance = (p2->balance == 1) ? -1 : 0;
                    p1->balance = (p2->balance == -1) ? 1 : 0;
                    (*this) = p2;
                }
                (*this)->balance = 0;
                state = 0;
                break;
            }
        }
    }
    /* Return the joined tree */
    if (left_height >= right_height) {
        return left_root;
    } else {
        return right_root;
    }
}


static ERTS_INLINE
int try_wlock_base_node(DbTableCATreeBaseNode *base_node)
{
    return EBUSY == erts_rwmtx_tryrwlock(&base_node->lock);
}

/*
 * Locks a base node without adjusting the lock statistics
 */
static ERTS_INLINE
void wlock_base_node_no_stats(DbTableCATreeBaseNode *base_node)
{
    erts_rwmtx_rwlock(&base_node->lock);
}

/*
 * Locks a base node and adjusts the lock statistics according to if
 * the lock was contended or not
 */
static ERTS_INLINE
void wlock_base_node(DbTableCATreeBaseNode *base_node)
{
    if (try_wlock_base_node(base_node)) {
        /* The lock is contended */
        wlock_base_node_no_stats(base_node);
        base_node->lock_statistics += ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION;
    } else {
        base_node->lock_statistics += ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION;
    }
}

static ERTS_INLINE
void wunlock_base_node(DbTableCATreeBaseNode *base_node)
{
    erts_rwmtx_rwunlock(&base_node->lock);
}

static ERTS_INLINE
void rlock_base_node(DbTableCATreeBaseNode *base_node)
{
    erts_rwmtx_rlock(&base_node->lock);
}

static ERTS_INLINE
void runlock_base_node(DbTableCATreeBaseNode *base_node)
{
    erts_rwmtx_runlock(&base_node->lock);
}

static ERTS_INLINE
void lock_route_node(DbTableCATreeRouteNode *route_node)
{
    erts_mtx_lock(&route_node->lock);
}

static ERTS_INLINE
void unlock_route_node(DbTableCATreeRouteNode *route_node)
{
    erts_mtx_unlock(&route_node->lock);
}


/*
 * The following macros are used to create the ETS functions that only
 * need to access one element (e.g. db_put_catree, db_get_catree and
 * db_erase_catree).
 */

#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(LOCK,UNLOCK) \
        int retry;                                                      \
        DbTableCATreeNode *current_node;                                \
        DbTableCATreeNode *prev_node;                                   \
        DbTableCommon* common_table_data = &tb->common;                 \
        DbTableCATreeBaseNode *base_node;                               \
        int current_level;                                              \
        (void)prev_node;                                                \
        do {                                                            \
            retry = 0;                                                  \
            current_node = GET_ROOT_ACQB(tb);                           \
            prev_node = NULL;                                           \
            current_level = 0;                                          \
            while ( ! current_node->is_base_node ) {                    \
                current_level = current_level + 1;                      \
                prev_node = current_node;                               \
                if (cmp_key_route(common_table_data,key,current_node) < 0) { \
                    current_node = GET_LEFT_ACQB(current_node);         \
                } else {                                                \
                    current_node = GET_RIGHT_ACQB(current_node);        \
                }                                                       \
            }                                                           \
            base_node = &current_node->u.base;                \
            LOCK(base_node);                                            \
            if ( ! base_node->is_valid ) {                              \
                /* Retry */                                             \
                UNLOCK(base_node);                                      \
                retry = 1;                                              \
            }                                                           \
        } while(retry);                                                 


#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \
    if (base_node->lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT \
        && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) { \
        split_catree(&tb->common, prev_node, current_node);             \
    } else if (base_node->lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) { \
        join_catree(tb, prev_node, current_node);                       \
    } else {                                                            \
        wunlock_base_node(base_node);                                    \
    }

#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \
    static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \
                                                        Eterm key,      \
                                                        PARAMETERS){    \
        int result;                                                     \
        ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node)  \
        result = SEQUENTAIL_CALL;                                   \
        ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \
        return result;                                                  \
    }


#define ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \
    static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \
                                                        Eterm key,      \
                                                        PARAMETERS){    \
        int result;                                                     \
        ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(rlock_base_node,runlock_base_node) \
        result = SEQUENTAIL_CALL;                                       \
        runlock_base_node(base_node);                                   \
        return result;                                                  \
    }



static DbTableCATreeNode *create_catree_base_node(DbTableCATree *tb)
{
    DbTableCATreeNode *new_base_node_container =
        erts_db_alloc(ERTS_ALC_T_DB_TABLE,
                      (DbTable *) tb,
                      sizeof(DbTableCATreeNode));
    DbTableCATreeBaseNode *new_base_node =
        &new_base_node_container->u.base;
    erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
    new_base_node_container->is_base_node = 1;
    new_base_node->root = NULL;
    if (tb->common.type & DB_FREQ_READ)
        rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
    if (erts_ets_rwmtx_spin_count >= 0)
        rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
    erts_rwmtx_init_opt(&new_base_node->lock, &rwmtx_opt,
                        "erl_db_catree_base_node", tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB);
    new_base_node->lock_statistics = 0;
    new_base_node->is_valid = 1;
    return new_base_node_container;
}

static ERTS_INLINE Uint sizeof_route_node(Uint key_size)
{
    return (offsetof(DbTableCATreeNode, u.route.key.tpl[1])
            + key_size*sizeof(Eterm));
}

static DbTableCATreeNode*
create_catree_route_node(DbTableCommon * common_table_data,
                         DbTableCATreeNode *left,
                         DbTableCATreeNode *right,
                         DbTerm * keyTerm)
{
    Eterm* top;
    Eterm key = GETKEY(common_table_data,keyTerm->tpl);
    int key_size = size_object(key);
    Uint offset = offsetof(DbTableCATreeNode,u.route.key);
    size_t route_node_container_size =
        offset +
        sizeof(DbTerm) +
        sizeof(Eterm)*key_size;
    ErlOffHeap tmp_offheap;
    byte* new_route_node_container_bytes =
        erts_db_alloc(ERTS_ALC_T_DB_TABLE,
                      (DbTable *) common_table_data,
                      route_node_container_size);
    DbTerm* newp = (DbTerm*) (new_route_node_container_bytes + offset);
    DbTableCATreeNode *new_route_node_container =
        (DbTableCATreeNode*)new_route_node_container_bytes;
    DbTableCATreeRouteNode *new_route_node =
        &new_route_node_container->u.route;
    if (key_size != 0) {
        newp->size = key_size;
        top = &newp->tpl[1];
        tmp_offheap.first  = NULL;
        newp->tpl[0] = copy_struct(key, key_size, &top, &tmp_offheap);
        newp->first_oh = tmp_offheap.first;
    } else {
        newp->size = key_size;
        newp->first_oh = NULL;
        newp->tpl[0] = key;
    }
    new_route_node_container->is_base_node = 0;
    new_route_node->is_valid = 1;
    erts_atomic_init_nob(&(new_route_node->left), (erts_aint_t)left);
    erts_atomic_init_nob(&(new_route_node->right), (erts_aint_t)right);
    erts_mtx_init(&new_route_node->lock, "erl_db_catree_route_node",
                  NIL, ERTS_LOCK_FLAGS_CATEGORY_DB);
    return new_route_node_container;
}

static void do_free_base_node(void* vptr)
{
    DbTableCATreeNode *p = (DbTableCATreeNode *)vptr;
    ASSERT(p->is_base_node);
    erts_rwmtx_destroy(&p->u.base.lock);
    erts_free(ERTS_ALC_T_DB_TABLE, p);
}

static void free_catree_base_node(DbTableCATree* tb, DbTableCATreeNode* p)
{
    ASSERT(p->is_base_node);
    ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof(DbTableCATreeNode), 0);
    do_free_base_node(p);
}

static void do_free_route_node(void *vptr)
{
    DbTableCATreeNode *p = (DbTableCATreeNode *)vptr;
    ASSERT(!p->is_base_node);
    erts_mtx_destroy(&p->u.route.lock);
    if (p->u.route.key.size != 0) {
        ErlOffHeap tmp_oh;
        tmp_oh.first = p->u.route.key.first_oh;
        erts_cleanup_offheap(&tmp_oh);
    }
    erts_free(ERTS_ALC_T_DB_TABLE, p);
}

static void free_catree_route_node(DbTableCATree* tb, DbTableCATreeNode* p)
{
    ASSERT(!p->is_base_node);
    ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_route_node(p->u.route.key.size), 0);
    do_free_route_node(p);
}


/*
 * Returns the parent routing node of the specified
 * route_node_container if such a routing node exists or NULL if
 * route_node_container is attached to the root
 */
static ERTS_INLINE DbTableCATreeNode *
parent_of(DbTableCATree *tb,
          DbTableCATreeNode *route_node_container)
{

    Eterm key = GET_ROUTE_NODE_KEY(route_node_container);
    DbTableCATreeNode *current_node = GET_ROOT_ACQB(tb);
    DbTableCATreeNode *prev_node = NULL;
    if (current_node == route_node_container) {
        return NULL;
    }
    while (current_node != route_node_container) {
        prev_node = current_node;
        if (cmp_key_route((DbTableCommon *)tb, key, current_node) < 0) {
            current_node = GET_LEFT_ACQB(current_node);
        } else {
            current_node = GET_RIGHT_ACQB(current_node);
        }
    }
    return prev_node;
}


static ERTS_INLINE DbTableCATreeNode *
leftmost_base_node(DbTableCATreeNode *root)
{
    DbTableCATreeNode *node = root;
    while (!node->is_base_node) {
        node = GET_LEFT_ACQB(node);
    }
    return node;
}


static ERTS_INLINE DbTableCATreeNode *
rightmost_base_node(DbTableCATreeNode *root)
{
    DbTableCATreeNode *node = root;
    while (!node->is_base_node) {
        node = GET_RIGHT_ACQB(node);
    }
    return node;
}


static ERTS_INLINE DbTableCATreeNode *
leftmost_route_node(DbTableCATreeNode *root)
{
    DbTableCATreeNode *node = root;
    DbTableCATreeNode *prev_node = NULL;
    while (!node->is_base_node) {
        prev_node = node;
        node = GET_LEFT_ACQB(node);
    }
    if (prev_node == NULL) {
        return NULL;
    } else {
        return prev_node;
    }
}

static ERTS_INLINE DbTableCATreeNode*
rightmost_route_node(DbTableCATreeNode *root)
{
    DbTableCATreeNode * node = root;
    DbTableCATreeNode * prev_node = NULL;
    while (!node->is_base_node) {
        prev_node = node;
        node = GET_RIGHT_ACQB(node);
    }
    if (prev_node == NULL) {
        return NULL;
    } else {
        return prev_node;
    }
}

static ERTS_INLINE DbTableCATreeNode*
leftmost_base_node_and_path(DbTableCATreeNode *root, CATreeNodeStack * stack)
{
    DbTableCATreeNode * node = root;
    while (!node->is_base_node) {
        PUSH_NODE(stack, node);
        node = GET_LEFT_ACQB(node);
    }
    return node;
}

static ERTS_INLINE DbTableCATreeNode*
get_next_base_node_and_path(DbTableCommon *common_table_data,
                            DbTableCATreeNode *base_node,
                            CATreeNodeStack *stack)
{
    if (EMPTY_NODE(stack)) { /* The parent of b is the root */
        return NULL;
    } else {
        if (GET_LEFT(TOP_NODE(stack)) == base_node) {
            return leftmost_base_node_and_path(
                        GET_RIGHT_ACQB(TOP_NODE(stack)),
                        stack);
        } else {
            Eterm pkey =
                TOP_NODE(stack)->u.route.key.tpl[0]; /* pKey = key of parent */
            POP_NODE(stack);
            while (!EMPTY_NODE(stack)) {
                if (TOP_NODE(stack)->u.route.is_valid &&
                   cmp_key_route(common_table_data, pkey, TOP_NODE(stack)) <= 0) {
                    return leftmost_base_node_and_path(GET_RIGHT_ACQB(TOP_NODE(stack)), stack);
                } else {
                  POP_NODE(stack);
                }
            }
        }
        return NULL;
    }
}

static ERTS_INLINE void
clone_stack(CATreeNodeStack *search_stack_ptr,
            CATreeNodeStack *search_stack_copy_ptr)
{
    int i;
    search_stack_copy_ptr->pos = search_stack_ptr->pos;
    for (i = 0; i < search_stack_ptr->pos; i++) {
        search_stack_copy_ptr->array[i] = search_stack_ptr->array[i];
    }
}



static ERTS_INLINE DbTableCATreeNode*
lock_first_base_node(DbTable *tbl,
                     CATreeNodeStack *search_stack_ptr,
                     CATreeNodeStack *locked_base_nodes_stack_ptr)
{
    int retry;
    DbTableCATreeNode *current_node;
    DbTableCATreeBaseNode *base_node;
    DbTableCATree* tb = &tbl->catree;
    do {
        retry = 0;
        current_node = GET_ROOT_ACQB(tb);
        while ( ! current_node->is_base_node ) {
            PUSH_NODE(search_stack_ptr, current_node);
            current_node = GET_LEFT_ACQB(current_node);
        }
        base_node = &current_node->u.base;
        rlock_base_node(base_node);
        if ( ! base_node->is_valid ) {
            /* Retry */
            runlock_base_node(base_node);
            retry = 1;
        }
    } while(retry);
    push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
    return current_node;
}

static ERTS_INLINE DbTableCATreeBaseNode*
find_and_lock_next_base_node_and_path(DbTable *tbl,
                                      CATreeNodeStack **search_stack_ptr_ptr,
                                      CATreeNodeStack **search_stack_copy_ptr_ptr,
                                      CATreeNodeStack *locked_base_nodes_stack_ptr)
{
    DbTableCATreeNode *current_node;
    DbTableCATreeBaseNode *base_node;
    CATreeNodeStack * tmp_stack_ptr;
    DbTableCommon* common_table_data;
 retry_find_and_lock_next_base_node:
    current_node = TOP_NODE(locked_base_nodes_stack_ptr);
    common_table_data = &tbl->common;
    clone_stack(*search_stack_ptr_ptr, *search_stack_copy_ptr_ptr);
    current_node =
        get_next_base_node_and_path(common_table_data, current_node, *search_stack_ptr_ptr);
    if (current_node == NULL) {
        return NULL;
    }
    base_node = &current_node->u.base;
    rlock_base_node(base_node);
    if ( ! base_node->is_valid ) {
        /* Retry */
        runlock_base_node(base_node);
        /* Revert to previous state */
        current_node = TOP_NODE(locked_base_nodes_stack_ptr);
        tmp_stack_ptr = *search_stack_ptr_ptr;
        *search_stack_ptr_ptr = *search_stack_copy_ptr_ptr;
        *search_stack_copy_ptr_ptr = tmp_stack_ptr;
        goto retry_find_and_lock_next_base_node;
    } else {
        push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
    }
    return base_node;
}

static ERTS_INLINE
void unlock_and_release_locked_base_node_stack(DbTable *tbl,
                                               CATreeNodeStack *locked_base_nodes_stack_ptr)
{
    DbTableCATreeNode *current_node;
    DbTableCATreeBaseNode *base_node;
    int i;
    for (i = 0; i < locked_base_nodes_stack_ptr->pos; i++) {
        current_node = locked_base_nodes_stack_ptr->array[i];
        base_node = &current_node->u.base;
        if (locked_base_nodes_stack_ptr->pos > 1) {
            base_node->lock_statistics =     /* This is not atomic which is fine as */
                base_node->lock_statistics + /* correctness does not depend on that. */
                ERL_DB_CATREE_LOCK_MORE_THAN_ONE_CONTRIBUTION;
        }
        runlock_base_node(base_node);
    }
    if (locked_base_nodes_stack_ptr->size > STACK_NEED) {
        erts_db_free(ERTS_ALC_T_DB_STK, tbl,
                     locked_base_nodes_stack_ptr->array,
                     sizeof(DbTableCATreeNode *) * locked_base_nodes_stack_ptr->size);
    }
}

static ERTS_INLINE
void init_stack(CATreeNodeStack *stack,
                DbTableCATreeNode **stack_array,
                Uint init_size)
{
    stack->array = stack_array;
    stack->pos = 0;
    stack->size = init_size;
}

static ERTS_INLINE
void init_tree_stack(DbTreeStack *stack,
                     TreeDbTerm **stack_array,
                     Uint init_slot)
{
    stack->array = stack_array;
    stack->pos = 0;
    stack->slot = init_slot;
}

#define DEC_ROUTE_NODE_STACK_AND_ARRAY(STACK_NAME) \
    CATreeNodeStack STACK_NAME; \
    CATreeNodeStack * STACK_NAME##_ptr = &(STACK_NAME); \
    DbTableCATreeNode *STACK_NAME##_array[STACK_NEED];

#define DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS \
DEC_ROUTE_NODE_STACK_AND_ARRAY(search_stack) \
DEC_ROUTE_NODE_STACK_AND_ARRAY(search_stack_copy) \
DEC_ROUTE_NODE_STACK_AND_ARRAY(locked_base_nodes_stack) \
init_stack(&search_stack, search_stack_array, 0); \
init_stack(&search_stack_copy, search_stack_copy_array, 0); \
init_stack(&locked_base_nodes_stack, locked_base_nodes_stack_array, STACK_NEED);/* Abuse as stack array size*/

/*
 * Locks and returns the base node that contains the specified key if
 * it is present. The function saves the search path to the found base
 * node in search_stack_ptr and adds the found base node to
 * locked_base_nodes_stack_ptr.
 */
static ERTS_INLINE DbTableCATreeBaseNode *
lock_base_node_with_key(DbTable *tbl,
                        Eterm key,
                        CATreeNodeStack * search_stack_ptr,
                        CATreeNodeStack * locked_base_nodes_stack_ptr)
{
    int retry;
    DbTableCATreeNode *current_node;
    DbTableCATreeBaseNode *base_node;
    DbTableCATree* tb = &tbl->catree;
    DbTableCommon* common_table_data = &tbl->common;
    do {
        retry = 0;
        current_node = GET_ROOT_ACQB(tb);
        while ( ! current_node->is_base_node ) {
            PUSH_NODE(search_stack_ptr, current_node);
            if( cmp_key_route(common_table_data,key,current_node) < 0 ) {
                current_node = GET_LEFT_ACQB(current_node);
            } else {
                current_node = GET_RIGHT_ACQB(current_node);
            }
        }
        base_node = &current_node->u.base;
        rlock_base_node(base_node);
        if ( ! base_node->is_valid ) {
            /* Retry */
            runlock_base_node(base_node);
            retry = 1;
        }
    } while(retry);
    push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
    return base_node;
}

/*
 * Joins a base node with it's right neighbor. Returns the base node
 * resulting from the join in locked state or NULL if there is no base
 * node to join with.
 */
static DbTableCATreeNode*
erl_db_catree_force_join_right(DbTableCommon *common_table_data,
                               DbTableCATreeNode *parent_container,
                               DbTableCATreeNode *base_container,
                               DbTableCATreeNode **result_parent_wb)
{
    DbTableCATreeRouteNode *parent;
    DbTableCATreeNode *gparent_container;
    DbTableCATreeRouteNode *gparent;
    DbTableCATreeBaseNode *base = &base_container->u.base;
    DbTableCATree *tb = (DbTableCATree *)common_table_data;
    DbTableCATreeNode *neighbor_base_container;
    DbTableCATreeBaseNode *neighbor_base;
    DbTableCATreeNode *new_neighbor_base;
    DbTableCATreeNode *neighbor_base_parent;
    int neighbour_not_valid;
    if (parent_container == NULL) {
        return NULL;
    }
    parent = &parent_container->u.route;
    do {
        neighbor_base_container = leftmost_base_node(GET_RIGHT_ACQB(parent_container));
        neighbor_base = &neighbor_base_container->u.base;
        wlock_base_node_no_stats(neighbor_base);
        neighbour_not_valid = !neighbor_base->is_valid;
        if (neighbour_not_valid) {
            wunlock_base_node(neighbor_base);
        }
    } while (neighbour_not_valid);
    lock_route_node(parent);
    parent->is_valid = 0;
    neighbor_base->is_valid = 0;
    base->is_valid = 0;
    gparent = NULL;
    gparent_container = NULL;
    do {
        if (gparent != NULL) {
            unlock_route_node(gparent);
        }
        gparent_container = parent_of(tb, parent_container);
        if (gparent_container != NULL) {
            gparent = &gparent_container->u.route;
            lock_route_node(gparent);
        } else {
            gparent = NULL;
        }
    } while (gparent != NULL && !gparent->is_valid);
    if (gparent == NULL) {
        SET_ROOT_RELB(tb, GET_RIGHT(parent_container));
    } else if (GET_LEFT(gparent_container) == parent_container) {
        SET_LEFT_RELB(gparent_container, GET_RIGHT(parent_container));
    } else {
        SET_RIGHT_RELB(gparent_container, GET_RIGHT(parent_container));
    }
    unlock_route_node(parent);
    if (gparent != NULL) {
        unlock_route_node(gparent);
    }
    new_neighbor_base = create_catree_base_node(tb);
    new_neighbor_base->u.base.root =
        join_trees(base->root, neighbor_base->root);
    wlock_base_node_no_stats(&(new_neighbor_base->u.base));
    neighbor_base_parent = NULL;
    if (GET_RIGHT(parent_container) == neighbor_base_container) {
        neighbor_base_parent = gparent_container;
    } else {
        neighbor_base_parent =
            leftmost_route_node(GET_RIGHT(parent_container));
    }
    if(neighbor_base_parent == NULL) {
        SET_ROOT_RELB(tb, new_neighbor_base);
    } else if (GET_LEFT(neighbor_base_parent) == neighbor_base_container) {
        SET_LEFT_RELB(neighbor_base_parent, new_neighbor_base);
    } else {
        SET_RIGHT_RELB(neighbor_base_parent, new_neighbor_base);
    }
    wunlock_base_node(base);
    wunlock_base_node(neighbor_base);
    /* Free the parent and base */
    erts_schedule_db_free(common_table_data,
                          do_free_route_node,
                          parent_container,
                          &parent->free_item,
                          sizeof_route_node(parent->key.size));
    erts_schedule_db_free(common_table_data,
                          do_free_base_node,
                          base_container,
                          &base->free_item,
                          sizeof(DbTableCATreeNode));
    erts_schedule_db_free(common_table_data,
                          do_free_base_node,
                          neighbor_base_container,
                          &neighbor_base->free_item,
                          sizeof(DbTableCATreeNode));

    if (parent_container == neighbor_base_container) {
        *result_parent_wb = gparent_container;
    } else {
        *result_parent_wb = neighbor_base_parent;
    }
    return new_neighbor_base;
}

/*
 * Used to merge together all base nodes for operations such as last
 * and select_*. Returns the base node resulting from the merge in
 * locked state.
 */
static DbTableCATreeNode *
merge_to_one_locked_base_node(DbTableCommon * common_table_data)
{
    DbTableCATreeNode *parent_container;
    DbTableCATreeNode *new_parent_container;
    DbTableCATree *tb = (DbTableCATree *)common_table_data;
    DbTableCATreeNode *base_container;
    DbTableCATreeNode *new_base_container;
    int is_not_valid;
    /* Find first base node */
    do {
        parent_container = NULL;
        base_container = GET_ROOT_ACQB(tb);
        while ( ! base_container->is_base_node ) {
            parent_container = base_container;
            base_container = GET_LEFT_ACQB(base_container);
        }
        wlock_base_node_no_stats(&(base_container->u.base));
        is_not_valid = ! base_container->u.base.is_valid;
        if (is_not_valid) {
            wunlock_base_node(&(base_container->u.base));
        }
    } while(is_not_valid);
    do {
        new_base_container = erl_db_catree_force_join_right(common_table_data,
                                                            parent_container,
                                                            base_container,
                                                            &new_parent_container);
        if (new_base_container != NULL) {
            base_container = new_base_container;
            parent_container = new_parent_container;
        }
    } while(new_base_container != NULL);
    return base_container;
}


static void join_catree(DbTableCATree *tb,
                        DbTableCATreeNode *parent_container,
                        DbTableCATreeNode *base_container)
{
    DbTableCATreeRouteNode *parent;
    DbTableCATreeNode *gparent_container;
    DbTableCATreeRouteNode *gparent;
    DbTableCATreeBaseNode *base = &base_container->u.base;
    DbTableCATreeNode *neighbor_base_container;
    DbTableCATreeBaseNode *neighbor_base;
    DbTableCATreeNode *new_neighbor_base;
    DbTableCATreeNode *neighbor_base_parent;
    if (parent_container == NULL) {
        base->lock_statistics = 0;
        wunlock_base_node(base);
        return;
    }
    parent = &parent_container->u.route;
    if (GET_LEFT(parent_container) == base_container) {
        neighbor_base_container = leftmost_base_node(GET_RIGHT_ACQB(parent_container));
        neighbor_base = &neighbor_base_container->u.base;
        if (try_wlock_base_node(neighbor_base)) {
            /* Failed to acquire lock */
            base->lock_statistics = 0;
            wunlock_base_node(base);
            return;
        } else if (!neighbor_base->is_valid) {
            base->lock_statistics = 0;
            wunlock_base_node(base);
            wunlock_base_node(neighbor_base);
            return;
        } else {
            lock_route_node(parent);
            parent->is_valid = 0;
            neighbor_base->is_valid = 0;
            base->is_valid = 0;
            gparent = NULL;
            gparent_container = NULL;
            do {
                if (gparent != NULL) {
                    unlock_route_node(gparent);
                }
                gparent_container = parent_of(tb, parent_container);
                if (gparent_container != NULL) {
                    gparent = &gparent_container->u.route;
                    lock_route_node(gparent);
                } else {
                    gparent = NULL;
                }
            } while (gparent != NULL && !gparent->is_valid);
            if (gparent == NULL) {
                SET_ROOT_RELB(tb, GET_RIGHT(parent_container));
            } else if (GET_LEFT(gparent_container) == parent_container) {
                SET_LEFT_RELB(gparent_container, GET_RIGHT(parent_container));
            } else {
                SET_RIGHT_RELB(gparent_container, GET_RIGHT(parent_container));
            }
            unlock_route_node(parent);
            if (gparent != NULL) {
                unlock_route_node(gparent);
            }
            new_neighbor_base = create_catree_base_node(tb);
            new_neighbor_base->u.base.root =
                join_trees(base->root, neighbor_base->root);
            neighbor_base_parent = NULL;
            if (GET_RIGHT(parent_container) == neighbor_base_container) {
                neighbor_base_parent = gparent_container;
            } else {
                neighbor_base_parent =
                    leftmost_route_node(GET_RIGHT(parent_container));
            }
        }
    } else { /* Symetric case */
        neighbor_base_container = rightmost_base_node(GET_LEFT_ACQB(parent_container));
        neighbor_base = &neighbor_base_container->u.base;
        if (try_wlock_base_node(neighbor_base)) {
            /* Failed to acquire lock */
            base->lock_statistics = 0;
            wunlock_base_node(base);
            return;
        } else if (!neighbor_base->is_valid) {
            base->lock_statistics = 0;
            wunlock_base_node(base);
            wunlock_base_node(neighbor_base);
            return;
        } else {
            lock_route_node(parent);
            parent->is_valid = 0;
            neighbor_base->is_valid = 0;
            base->is_valid = 0;
            gparent = NULL;
            gparent_container = NULL;
            do {
                if (gparent != NULL) {
                    unlock_route_node(gparent);
                }
                gparent_container = parent_of(tb, parent_container);
                if (gparent_container != NULL) {
                    gparent = &gparent_container->u.route;
                    lock_route_node(gparent);
                } else {
                    gparent = NULL;
                }
            } while (gparent != NULL && !gparent->is_valid);
            if (gparent == NULL) {
                SET_ROOT_RELB(tb, GET_LEFT(parent_container));
            } else if (GET_RIGHT(gparent_container) == parent_container) {
                SET_RIGHT_RELB(gparent_container, GET_LEFT(parent_container));
            } else {
                SET_LEFT_RELB(gparent_container, GET_LEFT(parent_container));
            }
            unlock_route_node(parent);
            if (gparent != NULL) {
                unlock_route_node(gparent);
            }
            new_neighbor_base = create_catree_base_node(tb);
            new_neighbor_base->u.base.root =
                join_trees(neighbor_base->root, base->root);
            neighbor_base_parent = NULL;
            if (GET_LEFT(parent_container) == neighbor_base_container) {
                neighbor_base_parent = gparent_container;
            } else {
                neighbor_base_parent =
                    rightmost_route_node(GET_LEFT(parent_container));
            }
        }
    }
    /* Link in new neighbor and free nodes that are no longer in the tree */
    if (neighbor_base_parent == NULL) {
        SET_ROOT_RELB(tb, new_neighbor_base);
    } else if (GET_LEFT(neighbor_base_parent) == neighbor_base_container) {
        SET_LEFT_RELB(neighbor_base_parent, new_neighbor_base);
    } else {
        SET_RIGHT_RELB(neighbor_base_parent, new_neighbor_base);
    }
    wunlock_base_node(base);
    wunlock_base_node(neighbor_base);
    /* Free the parent and base */
    erts_schedule_db_free(&tb->common,
                          do_free_route_node,
                          parent_container,
                          &parent->free_item,
                          sizeof_route_node(parent->key.size));
    erts_schedule_db_free(&tb->common,
                          do_free_base_node,
                          base_container,
                          &base->free_item,
                          sizeof(DbTableCATreeNode));
    erts_schedule_db_free(&tb->common,
                          do_free_base_node,
                          neighbor_base_container,
                          &neighbor_base->free_item,
                          sizeof(DbTableCATreeNode));
}


static void split_catree(DbTableCommon *tb,
                         DbTableCATreeNode *parent_container,
                         DbTableCATreeNode *base_container) {
    TreeDbTerm *splitOutWriteBack;
    TreeDbTerm *leftWriteBack;
    TreeDbTerm *rightWriteBack;
    DbTableCATreeNode *left_base_node;
    DbTableCATreeNode *right_base_node;
    DbTableCATreeNode *routing_node_container;
    DbTableCATreeBaseNode *base = &base_container->u.base;
    DbTableCATreeRouteNode *parent;
    if (parent_container == NULL) {
        parent = NULL;
    } else {
        parent =  &parent_container->u.route;
    }

    if (less_than_two_elements(base->root)) {
        base->lock_statistics = 0;
        wunlock_base_node(base);
        return;
    } else {
        /* Split the tree */
        split_tree(tb,
                   base->root,
                   &splitOutWriteBack,
                   &leftWriteBack,
                   &rightWriteBack);
        /* Create new base nodes */
        left_base_node =
          create_catree_base_node((DbTableCATree*)tb);
        right_base_node =
          create_catree_base_node((DbTableCATree*)tb);
        left_base_node->u.base.root = leftWriteBack;
        right_base_node->u.base.root = rightWriteBack;
        routing_node_container = create_catree_route_node(tb,
                                                          left_base_node,
                                                          right_base_node,
                                                          &splitOutWriteBack->dbterm);
        if (parent == NULL) {
            SET_ROOT_RELB((DbTableCATree*)tb, routing_node_container);
        } else if(GET_LEFT(parent_container) == base_container) {
            SET_LEFT_RELB(parent_container, routing_node_container);
        } else {
            SET_RIGHT_RELB(parent_container, routing_node_container);
        }
        base->is_valid = 0;
        wunlock_base_node(base);
        erts_schedule_db_free(tb,
                              do_free_base_node,
                              base_container,
                              &base->free_item,
                              sizeof(DbTableCATreeNode));
    }
}

/*
 * Helper functions for removing the table
 */

static void catree_add_base_node_to_free_list(
        DbTableCATree *tb,
        DbTableCATreeNode *base_node_container)
{
    base_node_container->u.base.next =
        tb->base_nodes_to_free_list;
    tb->base_nodes_to_free_list = base_node_container;
}

static void catree_deque_base_node_from_free_list(DbTableCATree *tb)
{
    if (tb->base_nodes_to_free_list == NULL) {
        return; /* List empty */
    } else {
        DbTableCATreeNode *first = tb->base_nodes_to_free_list;
        tb->base_nodes_to_free_list = first->u.base.next;
    }
}

static DbTableCATreeNode *catree_first_base_node_from_free_list(
        DbTableCATree *tb)
{
    return tb->base_nodes_to_free_list;
}

static SWord do_free_routing_nodes_catree_cont(DbTableCATree *tb, SWord num_left)
{
    DbTableCATreeNode *root;
    DbTableCATreeNode *p;
    for (;;) {
        root = POP_NODE(&tb->free_stack_rnodes);
    	if (root == NULL) break;
        else if(root->is_base_node) {
            catree_add_base_node_to_free_list(tb, root);
            break;
        }
    	for (;;) {
            if ((GET_LEFT(root) != NULL) &&
                (p = GET_LEFT(root))->is_base_node) {
                SET_LEFT(root, NULL);
                catree_add_base_node_to_free_list(tb, p);
            } else if ((GET_RIGHT(root) != NULL) &&
                       (p = GET_RIGHT(root))->is_base_node) {
                SET_RIGHT(root, NULL);
                catree_add_base_node_to_free_list(tb, p);
            } else if ((p = GET_LEFT(root)) != NULL) {
                SET_LEFT(root, NULL);
                PUSH_NODE(&tb->free_stack_rnodes, root);
                root = p;
            } else if ((p = GET_RIGHT(root)) != NULL) {
                SET_RIGHT(root, NULL);
                PUSH_NODE(&tb->free_stack_rnodes, root);
                root = p;
            } else {
                free_catree_route_node(tb, root);
                if (--num_left >= 0) {
                    break;
                } else {
                    return num_left;	/* Done enough for now */
                }
            }
        }
    }
    return num_left;
}

static SWord do_free_base_node_cont(DbTableCATree *tb, SWord num_left)
{
    TreeDbTerm *root;
    TreeDbTerm *p;
    DbTableCATreeNode *base_node_container =
        catree_first_base_node_from_free_list(tb);
    for (;;) {
        root = POP_NODE(&tb->free_stack_elems);
        if (root == NULL) break;
        for (;;) {
            if ((p = root->left) != NULL) {
                root->left = NULL;
                PUSH_NODE(&tb->free_stack_elems, root);
                root = p;
            } else if ((p = root->right) != NULL) {
                root->right = NULL;
                PUSH_NODE(&tb->free_stack_elems, root);
                root = p;
            } else {
                free_term((DbTable*)tb, root);
                if (--num_left >= 0) {
                    break;
                } else {
                    return num_left;	/* Done enough for now */
                }
            }
        }
    }
    catree_deque_base_node_from_free_list(tb);
    free_catree_base_node(tb, base_node_container);
    base_node_container = catree_first_base_node_from_free_list(tb);
    if (base_node_container != NULL) {
        PUSH_NODE(&tb->free_stack_elems, base_node_container->u.base.root);
    }
    return num_left;
}


/*
** Initialization function
*/

void db_initialize_catree(void)
{
    return;
};

/*
** Table interface routines (i.e., what's called by the bif's)
*/

int db_create_catree(Process *p, DbTable *tbl)
{
    DbTableCATree *tb = &tbl->catree;
    DbTableCATreeNode *root = create_catree_base_node(tb);
    tb->deletion = 0;
    tb->base_nodes_to_free_list = NULL;
    erts_atomic_init_relb(&(tb->root), (erts_aint_t)root);
    return DB_ERROR_NONE;
}

static int db_first_catree(Process *p, DbTable *tbl, Eterm *ret)
{
    DbTableCATreeBaseNode *base_node;
    int result;
    DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS;
    /* Find first base node */
    base_node = &(lock_first_base_node(tbl, &search_stack, &locked_base_nodes_stack)->u.base);
    /* Find next base node until non-empty base node is found */
    while (base_node != NULL && base_node->root == NULL) {
        base_node = find_and_lock_next_base_node_and_path(tbl, &search_stack_ptr, &search_stack_copy_ptr, locked_base_nodes_stack_ptr);
    }
    /* Get the return value */
    result = db_first_tree_common(p, tbl, (base_node == NULL ? NULL : base_node->root), ret, NULL);
    /* Unlock base nodes */
    unlock_and_release_locked_base_node_stack(tbl, locked_base_nodes_stack_ptr);
    return result;
}

static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTreeStack next_search_stack;
    TreeDbTerm *next_search_stack_array[STACK_NEED];
    DbTableCATreeBaseNode *base_node;
    int result = 0;
    DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS;
    init_tree_stack(&next_search_stack, next_search_stack_array, 0);
    /* Lock base node with key */
    base_node = lock_base_node_with_key(tbl, key, &search_stack, &locked_base_nodes_stack);
    /* Continue until key is not >= greatest key in base_node */
    while (base_node != NULL) {
        result = db_next_tree_common(p, tbl, base_node->root, key,
                                     ret, &next_search_stack);
        if (result != DB_ERROR_NONE || *ret != am_EOT) {
            break;
        }
        base_node =
            find_and_lock_next_base_node_and_path(tbl,
                                                  &search_stack_ptr,
                                                  &search_stack_copy_ptr,
                                                  locked_base_nodes_stack_ptr);
    }
    unlock_and_release_locked_base_node_stack(tbl, &locked_base_nodes_stack);
    return result;
}

static int db_last_catree(Process *p, DbTable *tbl, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common);
    int result = db_last_tree_common(p, tbl, base->u.base.root, ret, NULL);
    wunlock_base_node(&(base->u.base));
    return result;
    
}

static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTreeStack stack;
    TreeDbTerm * stack_array[STACK_NEED];
    DbTableCATree *tb = &tbl->catree;
    int result;
    DbTableCATreeNode *base;
    init_tree_stack(&stack, stack_array, 0);
    base = merge_to_one_locked_base_node(&tb->common);
    result = db_prev_tree_common(p, tbl, base->u.base.root, key, ret, &stack);
    wunlock_base_node(&(base->u.base));
    return result;
}

#define ERL_DB_CATREE_DO_OPERATION_PUT_PARAMS Eterm obj, int key_clash_fail
ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(put,
                                           ERL_DB_CATREE_DO_OPERATION_PUT_PARAMS,
                                           db_put_tree_common(&tb->common,
                                                              &base_node->root,
                                                              obj,
                                                              key_clash_fail,
                                                              NULL);)

static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail)
{
    DbTableCATree *tb = &tbl->catree;
    Eterm key = GETKEY(&tb->common, tuple_val(obj));
    return erl_db_catree_do_operation_put(tb,
                                          key,
                                          obj,
                                          key_clash_fail);
}

#define ERL_DB_CATREE_DO_OPERATION_GET_PARAMS Process *p, Eterm *ret
ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(get,
                                                ERL_DB_CATREE_DO_OPERATION_GET_PARAMS,
                                                db_get_tree_common(p, &tb->common,
                                                                   base_node->root,
                                                                   key, ret, NULL);)

static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    return erl_db_catree_do_operation_get(tb, key, p, ret);
}

#define ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS Eterm *ret
ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(member,
                                                ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS,
                                                db_member_tree_common(&tb->common,
                                                                      base_node->root,
                                                                      key,
                                                                      ret,
                                                                      NULL);)

static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    return erl_db_catree_do_operation_member(tb, key, ret);
}

#define ERL_DB_CATREE_DO_OPERATION_GET_ELEMENT_PARAMS Process *p, int ndex, Eterm *ret
ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(get_element,
                                                ERL_DB_CATREE_DO_OPERATION_GET_ELEMENT_PARAMS,
                                                db_get_element_tree_common(p, &tb->common,
                                                                           base_node->root,
                                                                           key, ndex,
                                                                           ret, NULL))

static int db_get_element_catree(Process *p, DbTable *tbl,
			       Eterm key, int ndex, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    return erl_db_catree_do_operation_get_element(tb, key, p, ndex, ret);
}

#define ERL_DB_CATREE_DO_OPERATION_ERASE_PARAMS DbTable *tbl, Eterm *ret
ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(erase,
                                           ERL_DB_CATREE_DO_OPERATION_ERASE_PARAMS,
                                           db_erase_tree_common(tbl,
                                                                &base_node->root,
                                                                key,
                                                                ret,
                                                                NULL);)

static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    return erl_db_catree_do_operation_erase(tb, key, tbl, ret);
}

#define ERL_DB_CATREE_DO_OPERATION_ERASE_OBJECT_PARAMS DbTable *tbl, Eterm object, Eterm *ret
ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(erase_object,
                                           ERL_DB_CATREE_DO_OPERATION_ERASE_OBJECT_PARAMS,
                                           db_erase_object_tree_common(tbl,
                                                                       &base_node->root,
                                                                       object,
                                                                       ret,
                                                                       NULL);)

static int db_erase_object_catree(DbTable *tbl, Eterm object, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    Eterm key = GETKEY(&tb->common, tuple_val(object));
    return erl_db_catree_do_operation_erase_object(tb, key, tbl, object, ret);
}


static int db_slot_catree(Process *p, DbTable *tbl,
                          Eterm slot_term, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    int result;
    DbTableCATreeNode *base;
    base = merge_to_one_locked_base_node(&tb->common);
    result = db_slot_tree_common(p, tbl, base->u.base.root,
                                 slot_term, ret, NULL);
    wunlock_base_node(&(base->u.base));
    return result;
}

static int db_select_continue_catree(Process *p,
                                     DbTable *tbl,
                                     Eterm continuation,
                                     Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    int result;
    DbTableCATreeNode *base;
    base = merge_to_one_locked_base_node(&tb->common);
    result = db_select_continue_tree_common(p, &tb->common, &base->u.base.root,
                                            continuation, ret, NULL);
    wunlock_base_node(&(base->u.base));
    return result;
}


static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
                            Eterm pattern, int reverse, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    int result;
    DbTableCATreeNode *base;
    base = merge_to_one_locked_base_node(&tb->common);
    result = db_select_tree_common(p, &tb->common, &base->u.base.root,
                                   tid, pattern, reverse, ret,
                                   NULL);
    wunlock_base_node(&(base->u.base));
    return result;
}

static int db_select_count_continue_catree(Process *p,
                                           DbTable *tbl,
                                           Eterm continuation,
                                           Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    int result;
    DbTableCATreeNode *base;
    base = merge_to_one_locked_base_node(&tb->common);
    result = db_select_count_continue_tree_common(p, &tb->common,
                                                  &base->u.base.root,
                                                  continuation, ret, NULL);
    wunlock_base_node(&(base->u.base));
    return result;
}

static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
                                  Eterm pattern, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    int result;
    DbTableCATreeNode *base;
    base = merge_to_one_locked_base_node(&tb->common);
    result = db_select_count_tree_common(p, &tb->common, &base->u.base.root,
                                         tid, pattern, ret, NULL);
    wunlock_base_node(&(base->u.base));
    return result;
}

static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
                                  Eterm pattern, Sint chunk_size,
                                  int reversed, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    int result;
    DbTableCATreeNode *base;
    base = merge_to_one_locked_base_node(&tb->common);
    result = db_select_chunk_tree_common(p, &tb->common, &base->u.base.root,
                                         tid, pattern, chunk_size, reversed, ret, NULL);
    wunlock_base_node(&(base->u.base));
    return result;
}

static int db_select_delete_continue_catree(Process *p,
                                            DbTable *tbl,
                                            Eterm continuation,
                                            Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    DbTreeStack stack;
    TreeDbTerm * stack_array[STACK_NEED];
    int result;
    DbTableCATreeNode *base;
    init_tree_stack(&stack, stack_array, 0);
    base = merge_to_one_locked_base_node(&tb->common);
    result = db_select_delete_continue_tree_common(p, tbl, &base->u.base.root,
                                                   continuation, ret, &stack);
    wunlock_base_node(&(base->u.base));
    return result;
}

static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
                                   Eterm pattern, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    DbTreeStack stack;
    TreeDbTerm * stack_array[STACK_NEED];
    int result;
    DbTableCATreeNode *base;
    init_tree_stack(&stack, stack_array, 0);
    base = merge_to_one_locked_base_node(&tb->common);
    result = db_select_delete_tree_common(p, tbl, &base->u.base.root,
                                          tid, pattern, ret, &stack);
    wunlock_base_node(&(base->u.base));
    return result;
}

static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
                                    Eterm pattern, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    int result;
    DbTableCATreeNode *base;
    base = merge_to_one_locked_base_node(&tb->common);
    result = db_select_replace_tree_common(p, &tb->common,
                                           &base->u.base.root,
                                           tid, pattern, ret, NULL);
    wunlock_base_node(&(base->u.base));
    return result;
}

static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
                                             Eterm continuation, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    int result;
    DbTableCATreeNode *base;
    base = merge_to_one_locked_base_node(&tb->common);
    result = db_select_replace_continue_tree_common(p, &tb->common,
                                                    &base->u.base.root,
                                                    continuation, ret, NULL);
    wunlock_base_node(&(base->u.base));
    return result;
}

#define ERL_DB_CATREE_DO_OPERATION_TAKE_PARAMS Process *p, Eterm *ret, DbTable *tbl
ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(take,
                                           ERL_DB_CATREE_DO_OPERATION_TAKE_PARAMS,
                                           db_take_tree_common(p, tbl,
                                                               &base_node->root,
                                                               key, ret, NULL);)

static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
    DbTableCATree *tb = &tbl->catree;
    return erl_db_catree_do_operation_take(tb, key, p, ret, tbl);
}

/*
** Other interface routines (not directly coupled to one bif)
*/


/* Display tree contents (for dump) */
static void db_print_catree(fmtfn_t to, void *to_arg,
                            int show, DbTable *tbl)
{
    DbTableCATree *tb = &tbl->catree;
    DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common);
    db_print_tree_common(to, to_arg, show, base->u.base.root, tbl);
    wunlock_base_node(&(base->u.base));
}

/* Release all memory occupied by a single table */
static int db_free_table_catree(DbTable *tbl)
{
    while (db_free_table_continue_catree(tbl, ERTS_SWORD_MAX) < 0)
	;
    return 1;
}

static SWord db_free_table_continue_catree(DbTable *tbl, SWord reds)
{
    DbTableCATreeNode *first_base_node;
    DbTableCATree *tb = &tbl->catree;
    if (!tb->deletion) {
        tb->deletion = 1;
        tb->free_stack_elems.array =
            erts_db_alloc(ERTS_ALC_T_DB_STK,
                          (DbTable *) tb,
                          sizeof(TreeDbTerm *) * STACK_NEED);
        tb->free_stack_elems.pos = 0;
        tb->free_stack_elems.slot = 0;
        tb->free_stack_rnodes.array =
            erts_db_alloc(ERTS_ALC_T_DB_STK,
                          (DbTable *) tb,
                          sizeof(DbTableCATreeNode *) * STACK_NEED);
        tb->free_stack_rnodes.pos = 0;
        tb->free_stack_rnodes.size = STACK_NEED;
        PUSH_NODE(&tb->free_stack_rnodes, GET_ROOT(tb));
        tb->is_routing_nodes_freed = 0;
        tb->base_nodes_to_free_list = NULL;
    }
    if ( ! tb->is_routing_nodes_freed ) {
        reds = do_free_routing_nodes_catree_cont(tb, reds);
        if (reds < 0) {
            return reds; /* Not finished */
        } else {
            tb->is_routing_nodes_freed = 1; /* Ready with the routing nodes */
            first_base_node = catree_first_base_node_from_free_list(tb);
            PUSH_NODE(&tb->free_stack_elems, first_base_node->u.base.root);
        }
    }
    while (catree_first_base_node_from_free_list(tb) != NULL) {
        reds = do_free_base_node_cont(tb, reds);
        if (reds < 0) {
            return reds; /* Continue later */
        }
    }
    /* Time to free the main structure*/
    erts_db_free(ERTS_ALC_T_DB_STK,
                 (DbTable *) tb,
                 (void *) tb->free_stack_elems.array,
                 sizeof(TreeDbTerm *) * STACK_NEED);
    erts_db_free(ERTS_ALC_T_DB_STK,
                 (DbTable *) tb,
                 (void *) tb->free_stack_rnodes.array,
                 sizeof(DbTableCATreeNode *) * STACK_NEED);
    return 1;
}

static SWord db_delete_all_objects_catree(Process* p, DbTable* tbl, SWord reds)
{
    reds = db_free_table_continue_catree(tbl, reds);
    if (reds < 0)
        return reds;
    db_create_catree(p, tbl);
    erts_atomic_set_nob(&tbl->catree.common.nitems, 0);
    return reds;
}


static void db_foreach_offheap_catree(DbTable *tbl,
                                      void (*func)(ErlOffHeap *, void *),
                                      void *arg)
{
    DbTableCATree *tb = &tbl->catree;
    DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common);
    db_foreach_offheap_tree_common(base->u.base.root, func, arg);
    wunlock_base_node(&(base->u.base));
}

static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
                                   DbUpdateHandle *handle)
{
    DbTableCATree *tb = &tbl->catree;
    int res;
    ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node);
    res =  db_lookup_dbterm_tree_common(p, tbl, &base_node->root, key, obj, handle, NULL);
    if (res == 0) {
        ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART;
    } else {
        /* db_finalize_dbterm_catree will unlock */
        handle->lck = prev_node;
        handle->lck2 = current_node;
        handle->current_level = current_level;
    }
    return res;
}

static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *handle)
{
    DbTableCATree *tb = &(handle->tb->catree);
    DbTableCATreeNode *prev_node = handle->lck;    
    DbTableCATreeNode *current_node = handle->lck2;
    int current_level = handle->current_level;
    DbTableCATreeBaseNode *base_node = &current_node->u.base;
    db_finalize_dbterm_tree_common(cret, handle, NULL);
    ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART;
    return;
}

#ifdef ERTS_ENABLE_LOCK_COUNT
static void erts_lcnt_enable_db_catree_lock_count_helper(DbTableCATree *tb,
                                                         DbTableCATreeNode *node,
                                                         int enable)
{
    erts_lcnt_ref_t *lcnt_ref;
    erts_lock_flags_t lock_type;
    if (node->is_base_node) {
        lcnt_ref = &GET_BASE_NODE_LOCK(node)->lcnt;
        lock_type = ERTS_LOCK_TYPE_RWMUTEX;
    } else {
        erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_LEFT(node), enable);
        erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_RIGHT(node), enable);
        lcnt_ref = &GET_ROUTE_NODE_LOCK(node)->lcnt;
        lock_type = ERTS_LOCK_TYPE_MUTEX;
    }
    if (enable) {
        erts_lcnt_install_new_lock_info(lcnt_ref, "db_hash_slot", tb->common.the_name,
                                        lock_type | ERTS_LOCK_FLAGS_CATEGORY_DB);
    } else {
        erts_lcnt_uninstall(lcnt_ref);
    }
}

void erts_lcnt_enable_db_catree_lock_count(DbTableCATree *tb, int enable)
{
    erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_ROOT(tb), enable);
}
#endif /* ERTS_ENABLE_LOCK_COUNT */


#ifdef HARDDEBUG

/*
 * Not called, but kept as it might come to use
 */
static inline int my_check_table_tree(TreeDbTerm *t)
{
    int lh, rh;
    if (t == NULL)
	return 0;
    lh = my_check_table_tree(t->left);
    rh = my_check_table_tree(t->right);
    if ((rh - lh) != t->balance) {
	erts_fprintf(stderr, "Invalid tree balance for this node:\n");
	erts_fprintf(stderr,"balance = %d, left = 0x%08X, right = 0x%08X\n",
		     t->balance, t->left, t->right);
	erts_fprintf(stderr,"\nDump:\n---------------------------------\n");
	erts_fprintf(stderr,"\n---------------------------------\n");
        abort();
    }
    return ((rh > lh) ? rh : lh) + 1;
}

#endif