diff options
Diffstat (limited to 'erts/emulator/beam/erl_db_catree.c')
-rw-r--r-- | erts/emulator/beam/erl_db_catree.c | 1414 |
1 files changed, 733 insertions, 681 deletions
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c index a8e48bce1b..b52a4a53fe 100644 --- a/erts/emulator/beam/erl_db_catree.c +++ b/erts/emulator/beam/erl_db_catree.c @@ -96,6 +96,12 @@ #include "erl_db_tree.h" #include "erl_db_tree_util.h" +#ifdef DEBUG +# define IF_DEBUG(X) X +#else +# define IF_DEBUG(X) +#endif + /* ** Forward declarations */ @@ -158,6 +164,14 @@ db_lookup_dbterm_catree(Process *, DbTable *, Eterm key, Eterm obj, DbUpdateHandle*); static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *); +static void split_catree(DbTableCATree *tb, + DbTableCATreeNode* ERTS_RESTRICT base, + DbTableCATreeNode* ERTS_RESTRICT parent); +static void join_catree(DbTableCATree *tb, + DbTableCATreeNode *thiz, + DbTableCATreeNode *parent); + + /* ** External interface */ @@ -241,30 +255,6 @@ static ERTS_INLINE Sint cmp_key_route(Eterm key, return CMP(key, GET_ROUTE_NODE_KEY(obj)); } -static ERTS_INLINE void push_node_dyn_array(DbTable *tb, - CATreeNodeStack *stack, - DbTableCATreeNode *node) -{ - int i; - if (stack->pos == stack->size) { - DbTableCATreeNode **newArray = - erts_db_alloc(ERTS_ALC_T_DB_STK, tb, - sizeof(DbTableCATreeNode*) * (stack->size*2)); - for (i = 0; i < stack->pos; i++) { - newArray[i] = stack->array[i]; - } - if (stack->size > STACK_NEED) { - /* Dynamically allocated array that needs to be deallocated */ - erts_db_free(ERTS_ALC_T_DB_STK, tb, - stack->array, - sizeof(DbTableCATreeNode *) * stack->size); - } - stack->array = newArray; - stack->size = stack->size*2; - } - PUSH_NODE(stack, node); -} - /* * Used by the split_tree function */ @@ -578,14 +568,14 @@ static TreeDbTerm* join_trees(TreeDbTerm *left_root_param, p = *this; if (dir == DIR_LEFT) { switch (p->balance) { - case 1: + case 1: p->balance = 0; state = 0; break; - case 0: + case 0: p->balance = -1; break; - case -1: /* The icky case */ + case -1: /* The icky case */ p1 = p->left; if (p1->balance == -1) { /* Single LL rotation */ p->left = p1->right; @@ -608,14 +598,14 @@ static TreeDbTerm* join_trees(TreeDbTerm *left_root_param, } } else { /* dir == DIR_RIGHT */ switch (p->balance) { - case -1: + case -1: p->balance = 0; state = 0; break; - case 0: + case 0: p->balance = 1; break; - case 1: + case 1: p1 = p->right; if (p1->balance == 1) { /* Single RR rotation */ p->right = p1->left; @@ -646,6 +636,31 @@ static TreeDbTerm* join_trees(TreeDbTerm *left_root_param, } } +#ifdef DEBUG +# define FORCE_RANDOM_SPLIT_JOIN +#endif +#ifdef FORCE_RANDOM_SPLIT_JOIN +static int dbg_fastrand(void) +{ + static int g_seed = 648835; + g_seed = (214013*g_seed+2531011); + return (g_seed>>16)&0x7FFF; +} + +static void dbg_maybe_force_splitjoin(DbTableCATreeNode* base_node) +{ + switch (dbg_fastrand() % 8) { + case 1: + base_node->u.base.lock_statistics = 1+ERL_DB_CATREE_HIGH_CONTENTION_LIMIT; + break; + case 2: + base_node->u.base.lock_statistics = -1+ERL_DB_CATREE_LOW_CONTENTION_LIMIT; + break; + } +} +#else +# define dbg_maybe_force_splitjoin(N) +#endif /* FORCE_RANDOM_SPLIT_JOIN */ static ERTS_INLINE int try_wlock_base_node(DbTableCATreeBaseNode *base_node) @@ -657,9 +672,10 @@ int try_wlock_base_node(DbTableCATreeBaseNode *base_node) * Locks a base node without adjusting the lock statistics */ static ERTS_INLINE -void wlock_base_node_no_stats(DbTableCATreeBaseNode *base_node) +void wlock_base_node_no_stats(DbTableCATreeNode *base_node) { - erts_rwmtx_rwlock(&base_node->lock); + ASSERT(base_node->is_base_node); + erts_rwmtx_rwlock(&base_node->u.base.lock); } /* @@ -667,33 +683,57 @@ void wlock_base_node_no_stats(DbTableCATreeBaseNode *base_node) * the lock was contended or not */ static ERTS_INLINE -void wlock_base_node(DbTableCATreeBaseNode *base_node) +void wlock_base_node(DbTableCATreeNode *base_node) { - if (try_wlock_base_node(base_node)) { + ASSERT(base_node->is_base_node); + if (try_wlock_base_node(&base_node->u.base)) { /* The lock is contended */ wlock_base_node_no_stats(base_node); - base_node->lock_statistics += ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION; + base_node->u.base.lock_statistics += ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION; } else { - base_node->lock_statistics += ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION; + base_node->u.base.lock_statistics += ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION; } } static ERTS_INLINE -void wunlock_base_node(DbTableCATreeBaseNode *base_node) +void wunlock_base_node(DbTableCATreeNode *base_node) { - erts_rwmtx_rwunlock(&base_node->lock); + erts_rwmtx_rwunlock(&base_node->u.base.lock); } static ERTS_INLINE -void rlock_base_node(DbTableCATreeBaseNode *base_node) +void wunlock_adapt_base_node(DbTableCATree* tb, + DbTableCATreeNode* node, + DbTableCATreeNode* parent, + int current_level) +{ + dbg_maybe_force_splitjoin(node); + if ((!node->u.base.root && parent && !(tb->common.status + & DB_CATREE_FORCE_SPLIT)) + || node->u.base.lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) { + join_catree(tb, node, parent); + } + else if (node->u.base.lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT + && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) { + split_catree(tb, node, parent); + } + else { + wunlock_base_node(node); + } +} + +static ERTS_INLINE +void rlock_base_node(DbTableCATreeNode *base_node) { - erts_rwmtx_rlock(&base_node->lock); + ASSERT(base_node->is_base_node); + erts_rwmtx_rlock(&base_node->u.base.lock); } static ERTS_INLINE -void runlock_base_node(DbTableCATreeBaseNode *base_node) +void runlock_base_node(DbTableCATreeNode *base_node) { - erts_rwmtx_runlock(&base_node->lock); + ASSERT(base_node->is_base_node); + erts_rwmtx_runlock(&base_node->u.base.lock); } static ERTS_INLINE @@ -710,80 +750,121 @@ void unlock_route_node(DbTableCATreeNode *route_node) erts_mtx_unlock(&route_node->u.route.lock); } +static ERTS_INLINE +void init_root_iterator(DbTableCATree* tb, CATreeRootIterator* iter, + int read_only) +{ + iter->tb = tb; + iter->read_only = read_only; + iter->locked_bnode = NULL; + iter->next_route_key = THE_NON_VALUE; + iter->search_key = NULL; +} -/* - * The following macros are used to create the ETS functions that only - * need to access one element (e.g. db_put_catree, db_get_catree and - * db_erase_catree). - */ +static ERTS_INLINE +void lock_iter_base_node(CATreeRootIterator* iter, + DbTableCATreeNode *base_node, + DbTableCATreeNode *parent, + int current_level) +{ + ASSERT(!iter->locked_bnode); + if (iter->read_only) + rlock_base_node(base_node); + else { + wlock_base_node(base_node); + iter->bnode_parent = parent; + iter->bnode_level = current_level; + } + iter->locked_bnode = base_node; +} -#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(LOCK,UNLOCK) \ - int retry; \ - DbTableCATreeNode *current_node; \ - DbTableCATreeNode *prev_node; \ - DbTableCATreeBaseNode *base_node; \ - int current_level; \ - (void)prev_node; \ - do { \ - retry = 0; \ - current_node = GET_ROOT_ACQB(tb); \ - prev_node = NULL; \ - current_level = 0; \ - while ( ! current_node->is_base_node ) { \ - current_level = current_level + 1; \ - prev_node = current_node; \ - if (cmp_key_route(key,current_node) < 0) { \ - current_node = GET_LEFT_ACQB(current_node); \ - } else { \ - current_node = GET_RIGHT_ACQB(current_node); \ - } \ - } \ - base_node = ¤t_node->u.base; \ - LOCK(base_node); \ - if ( ! base_node->is_valid ) { \ - /* Retry */ \ - UNLOCK(base_node); \ - retry = 1; \ - } \ - } while(retry); - - -#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \ - if (base_node->lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT \ - && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) { \ - split_catree(tb, prev_node, current_node); \ - } else if (base_node->lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) { \ - join_catree(tb, prev_node, current_node); \ - } else { \ - wunlock_base_node(base_node); \ +static ERTS_INLINE +void unlock_iter_base_node(CATreeRootIterator* iter) +{ + ASSERT(iter->locked_bnode); + if (iter->read_only) + runlock_base_node(iter->locked_bnode); + else if (iter->locked_bnode->u.base.is_valid) { + wunlock_adapt_base_node(iter->tb, iter->locked_bnode, + iter->bnode_parent, iter->bnode_level); } + else + wunlock_base_node(iter->locked_bnode); + iter->locked_bnode = NULL; +} + +static ERTS_INLINE +void destroy_root_iterator(CATreeRootIterator* iter) +{ + if (iter->locked_bnode) + unlock_iter_base_node(iter); + if (iter->search_key) + erts_free(ERTS_ALC_T_DB_TMP, iter->search_key); +} -#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \ - static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \ - Eterm key, \ - PARAMETERS){ \ - int result; \ - ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node) \ - result = SEQUENTAIL_CALL; \ - ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \ - return result; \ + +typedef struct +{ + DbTableCATreeNode *parent; + int current_level; +} FindBaseNode; + +static ERTS_INLINE +DbTableCATreeNode* find_base_node(DbTableCATree* tb, Eterm key, + FindBaseNode* fbn) +{ + DbTableCATreeNode* ERTS_RESTRICT node = GET_ROOT_ACQB(tb); + if (fbn) { + fbn->parent = NULL; + fbn->current_level = 0; } + while (!node->is_base_node) { + if (fbn) { + fbn->current_level++; + fbn->parent = node; + } + if (cmp_key_route(key, node) < 0) { + node = GET_LEFT_ACQB(node); + } else { + node = GET_RIGHT_ACQB(node); + } + } + return node; +} +static ERTS_INLINE +DbTableCATreeNode* find_rlock_valid_base_node(DbTableCATree* tb, Eterm key) +{ + DbTableCATreeNode* base_node; -#define ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \ - static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \ - Eterm key, \ - PARAMETERS){ \ - int result; \ - ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(rlock_base_node,runlock_base_node) \ - result = SEQUENTAIL_CALL; \ - runlock_base_node(base_node); \ - return result; \ + while (1) { + base_node = find_base_node(tb, key, NULL); + rlock_base_node(base_node); + if (base_node->u.base.is_valid) + break; + runlock_base_node(base_node); } + return base_node; +} + +static ERTS_INLINE +DbTableCATreeNode* find_wlock_valid_base_node(DbTableCATree* tb, Eterm key, + FindBaseNode* fbn) +{ + DbTableCATreeNode* base_node; + while (1) { + base_node = find_base_node(tb, key, fbn); + wlock_base_node(base_node); + if (base_node->u.base.is_valid) + break; + wunlock_base_node(base_node); + } + return base_node; +} static ERTS_INLINE -void copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size) +Eterm copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size) { dst->size = key_size; if (key_size != 0) { @@ -798,6 +879,7 @@ void copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size) dst->term = key; dst->oh = NULL; } + return dst->term; } static ERTS_INLINE @@ -812,27 +894,21 @@ void destroy_route_key(DbRouteKey* key) #ifdef ERTS_ENABLE_LOCK_CHECK -# define sizeof_base_node(KEY_SZ) \ - (offsetof(DbTableCATreeNode, u.base.lc_key.heap) \ - + (KEY_SZ)*sizeof(Eterm)) # define LC_ORDER(ORDER) ORDER #else -# define sizeof_base_node(KEY_SZ) \ - offsetof(DbTableCATreeNode, u.base.end_of_struct__) # define LC_ORDER(ORDER) NIL #endif +#define sizeof_base_node() \ + offsetof(DbTableCATreeNode, u.base.end_of_struct__) + static DbTableCATreeNode *create_base_node(DbTableCATree *tb, - TreeDbTerm* root, - Eterm lc_key) + TreeDbTerm* root) { DbTableCATreeNode *p; erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER; -#ifdef ERTS_ENABLE_LOCK_CHECK - Eterm lc_key_size = size_object(lc_key); -#endif p = erts_db_alloc(ERTS_ALC_T_DB_TABLE, (DbTable *) tb, - sizeof_base_node(lc_key_size)); + sizeof_base_node()); p->is_base_node = 1; p->u.base.root = root; @@ -841,32 +917,16 @@ static DbTableCATreeNode *create_base_node(DbTableCATree *tb, if (erts_ets_rwmtx_spin_count >= 0) rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count; -#ifdef ERTS_ENABLE_LOCK_CHECK - copy_route_key(&p->u.base.lc_key, lc_key, lc_key_size); -#endif erts_rwmtx_init_opt(&p->u.base.lock, &rwmtx_opt, "erl_db_catree_base_node", - lc_key, + NIL, ERTS_LOCK_FLAGS_CATEGORY_DB); - p->u.base.lock_statistics = 0; + p->u.base.lock_statistics = ((tb->common.status & DB_CATREE_FORCE_SPLIT) + ? INT_MAX : 0); p->u.base.is_valid = 1; return p; } -static ERTS_INLINE -DbTableCATreeNode *create_wlocked_base_node(DbTableCATree *tb, - TreeDbTerm* root, - Eterm lc_key) -{ - DbTableCATreeNode* p = create_base_node(tb, root, lc_key); - ethr_rwmutex_rwlock(&p->u.base.lock.rwmtx); -#ifdef ERTS_ENABLE_LOCK_CHECK - erts_lc_trylock_flg(-1, &p->u.base.lock.lc, ERTS_LOCK_OPTIONS_RDWR); -#endif - return p; -} - - static ERTS_INLINE Uint sizeof_route_node(Uint key_size) { return (offsetof(DbTableCATreeNode, u.route.key.heap) @@ -913,16 +973,13 @@ static void do_free_base_node(void* vptr) DbTableCATreeNode *p = (DbTableCATreeNode *)vptr; ASSERT(p->is_base_node); erts_rwmtx_destroy(&p->u.base.lock); -#ifdef ERTS_ENABLE_LOCK_CHECK - destroy_route_key(&p->u.base.lc_key); -#endif erts_free(ERTS_ALC_T_DB_TABLE, p); } static void free_catree_base_node(DbTableCATree* tb, DbTableCATreeNode* p) { ASSERT(p->is_base_node); - ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_base_node(p->u.base.lc_key.size), 0); + ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_base_node(), 0); do_free_base_node(p); } @@ -1014,154 +1071,6 @@ rightmost_route_node(DbTableCATreeNode *root) return prev_node; } -static ERTS_INLINE DbTableCATreeNode* -leftmost_base_node_and_path(DbTableCATreeNode *root, CATreeNodeStack * stack) -{ - DbTableCATreeNode * node = root; - while (!node->is_base_node) { - PUSH_NODE(stack, node); - node = GET_LEFT_ACQB(node); - } - return node; -} - -static ERTS_INLINE DbTableCATreeNode* -get_next_base_node_and_path(DbTableCATreeNode *base_node, - CATreeNodeStack *stack) -{ - if (EMPTY_NODE(stack)) { /* The parent of b is the root */ - return NULL; - } else { - if (GET_LEFT(TOP_NODE(stack)) == base_node) { - return leftmost_base_node_and_path( - GET_RIGHT_ACQB(TOP_NODE(stack)), - stack); - } else { - Eterm pkey = - TOP_NODE(stack)->u.route.key.term; /* pKey = key of parent */ - POP_NODE(stack); - while (!EMPTY_NODE(stack)) { - if (TOP_NODE(stack)->u.route.is_valid && - cmp_key_route(pkey, TOP_NODE(stack)) <= 0) { - return leftmost_base_node_and_path(GET_RIGHT_ACQB(TOP_NODE(stack)), stack); - } else { - POP_NODE(stack); - } - } - } - return NULL; - } -} - -static ERTS_INLINE void -clone_stack(CATreeNodeStack *search_stack_ptr, - CATreeNodeStack *search_stack_copy_ptr) -{ - int i; - search_stack_copy_ptr->pos = search_stack_ptr->pos; - for (i = 0; i < search_stack_ptr->pos; i++) { - search_stack_copy_ptr->array[i] = search_stack_ptr->array[i]; - } -} - - - -static ERTS_INLINE DbTableCATreeNode* -lock_first_base_node(DbTable *tbl, - CATreeNodeStack *search_stack_ptr, - CATreeNodeStack *locked_base_nodes_stack_ptr) -{ - DbTableCATreeNode *current_node; - DbTableCATreeBaseNode *base_node; - DbTableCATree* tb = &tbl->catree; - while (1) { - current_node = GET_ROOT_ACQB(tb); - while ( ! current_node->is_base_node ) { - PUSH_NODE(search_stack_ptr, current_node); - current_node = GET_LEFT_ACQB(current_node); - } - base_node = ¤t_node->u.base; - rlock_base_node(base_node); - if (base_node->is_valid) - break; - /* Retry */ - runlock_base_node(base_node); - search_stack_ptr->pos = 0; - } - push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node); - return current_node; -} - -static ERTS_INLINE DbTableCATreeBaseNode* -find_and_lock_next_base_node_and_path(DbTable *tbl, - CATreeNodeStack **search_stack_ptr_ptr, - CATreeNodeStack **search_stack_copy_ptr_ptr, - CATreeNodeStack *locked_base_nodes_stack_ptr) -{ - DbTableCATreeNode *current_node; - DbTableCATreeBaseNode *base_node; - CATreeNodeStack * tmp_stack_ptr; - - while (1) { - current_node = TOP_NODE(locked_base_nodes_stack_ptr); - clone_stack(*search_stack_ptr_ptr, *search_stack_copy_ptr_ptr); - current_node = - get_next_base_node_and_path(current_node, *search_stack_ptr_ptr); - if (current_node == NULL) { - return NULL; - } - base_node = ¤t_node->u.base; - rlock_base_node(base_node); - if (base_node->is_valid) - break; - - /* Retry */ - runlock_base_node(base_node); - /* Revert to previous state */ - current_node = TOP_NODE(locked_base_nodes_stack_ptr); - tmp_stack_ptr = *search_stack_ptr_ptr; - *search_stack_ptr_ptr = *search_stack_copy_ptr_ptr; - *search_stack_copy_ptr_ptr = tmp_stack_ptr; - } - - push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node); - return base_node; -} - -static ERTS_INLINE -void unlock_and_release_locked_base_node_stack(DbTable *tbl, - CATreeNodeStack *locked_base_nodes_stack_ptr) -{ - DbTableCATreeNode *current_node; - DbTableCATreeBaseNode *base_node; - int i; - for (i = 0; i < locked_base_nodes_stack_ptr->pos; i++) { - current_node = locked_base_nodes_stack_ptr->array[i]; - base_node = ¤t_node->u.base; - if (locked_base_nodes_stack_ptr->pos > 1) { - base_node->lock_statistics = /* This is not atomic which is fine as */ - base_node->lock_statistics + /* correctness does not depend on that. */ - ERL_DB_CATREE_LOCK_MORE_THAN_ONE_CONTRIBUTION; - } - runlock_base_node(base_node); - } - if (locked_base_nodes_stack_ptr->size > STACK_NEED) { - erts_db_free(ERTS_ALC_T_DB_STK, tbl, - locked_base_nodes_stack_ptr->array, - sizeof(DbTableCATreeNode *) * locked_base_nodes_stack_ptr->size); - } -} - -static ERTS_INLINE -void init_stack(CATreeNodeStack *stack, - DbTableCATreeNode **stack_array, - Uint init_size) -{ - stack->array = stack_array; - stack->pos = 0; - stack->size = init_size; -} - static ERTS_INLINE void init_tree_stack(DbTreeStack *stack, TreeDbTerm **stack_array, @@ -1172,194 +1081,9 @@ void init_tree_stack(DbTreeStack *stack, stack->slot = init_slot; } -#define DEC_ROUTE_NODE_STACK_AND_ARRAY(STACK_NAME) \ - CATreeNodeStack STACK_NAME; \ - CATreeNodeStack * STACK_NAME##_ptr = &(STACK_NAME); \ - DbTableCATreeNode *STACK_NAME##_array[STACK_NEED]; - -#define DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS \ -DEC_ROUTE_NODE_STACK_AND_ARRAY(search_stack) \ -DEC_ROUTE_NODE_STACK_AND_ARRAY(search_stack_copy) \ -DEC_ROUTE_NODE_STACK_AND_ARRAY(locked_base_nodes_stack) \ -init_stack(&search_stack, search_stack_array, 0); \ -init_stack(&search_stack_copy, search_stack_copy_array, 0); \ -init_stack(&locked_base_nodes_stack, locked_base_nodes_stack_array, STACK_NEED);/* Abuse as stack array size*/ - -/* - * Locks and returns the base node that contains the specified key if - * it is present. The function saves the search path to the found base - * node in search_stack_ptr and adds the found base node to - * locked_base_nodes_stack_ptr. - */ -static ERTS_INLINE DbTableCATreeBaseNode * -lock_base_node_with_key(DbTable *tbl, - Eterm key, - CATreeNodeStack * search_stack_ptr, - CATreeNodeStack * locked_base_nodes_stack_ptr) -{ - int retry; - DbTableCATreeNode *current_node; - DbTableCATreeBaseNode *base_node; - DbTableCATree* tb = &tbl->catree; - do { - retry = 0; - current_node = GET_ROOT_ACQB(tb); - while ( ! current_node->is_base_node ) { - PUSH_NODE(search_stack_ptr, current_node); - if( cmp_key_route(key,current_node) < 0 ) { - current_node = GET_LEFT_ACQB(current_node); - } else { - current_node = GET_RIGHT_ACQB(current_node); - } - } - base_node = ¤t_node->u.base; - rlock_base_node(base_node); - if ( ! base_node->is_valid ) { - /* Retry */ - runlock_base_node(base_node); - retry = 1; - } - } while(retry); - push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node); - return base_node; -} - -/* - * Joins a base node with it's right neighbor. Returns the base node - * resulting from the join in locked state or NULL if there is no base - * node to join with. - */ -static DbTableCATreeNode* -erl_db_catree_force_join_right(DbTableCATree *tb, - DbTableCATreeNode *parent, - DbTableCATreeNode *thiz, - DbTableCATreeNode **result_parent_wb) -{ - DbTableCATreeNode *gparent; - DbTableCATreeNode *neighbor; - DbTableCATreeNode *new_neighbor; - DbTableCATreeNode *neighbor_parent; - TreeDbTerm* new_root; - - if (parent == NULL) { - return NULL; - } - ASSERT(thiz == GET_LEFT(parent)); - while (1) { - neighbor = leftmost_base_node(GET_RIGHT_ACQB(parent)); - wlock_base_node_no_stats(&neighbor->u.base); - if (neighbor->u.base.is_valid) - break; - wunlock_base_node(&neighbor->u.base); - } - lock_route_node(parent); - parent->u.route.is_valid = 0; - neighbor->u.base.is_valid = 0; - thiz->u.base.is_valid = 0; - while (1) { - gparent = parent_of(tb, parent); - if (gparent == NULL) - break; - lock_route_node(gparent); - if (gparent->u.route.is_valid) - break; - unlock_route_node(gparent); - } - if (gparent == NULL) { - SET_ROOT_RELB(tb, GET_RIGHT(parent)); - } else if (GET_LEFT(gparent) == parent) { - SET_LEFT_RELB(gparent, GET_RIGHT(parent)); - } else { - SET_RIGHT_RELB(gparent, GET_RIGHT(parent)); - } - unlock_route_node(parent); - if (gparent != NULL) { - unlock_route_node(gparent); - } - - new_root = join_trees(thiz->u.base.root, neighbor->u.base.root); - new_neighbor = create_wlocked_base_node(tb, new_root, - LC_ORDER(thiz->u.base.lc_key.term)); - - if (GET_RIGHT(parent) == neighbor) { - neighbor_parent = gparent; - } else { - neighbor_parent = leftmost_route_node(GET_RIGHT(parent)); - } - if(neighbor_parent == NULL) { - SET_ROOT_RELB(tb, new_neighbor); - } else if (GET_LEFT(neighbor_parent) == neighbor) { - SET_LEFT_RELB(neighbor_parent, new_neighbor); - } else { - SET_RIGHT_RELB(neighbor_parent, new_neighbor); - } - wunlock_base_node(&thiz->u.base); - wunlock_base_node(&neighbor->u.base); - /* Free the parent and base */ - erts_schedule_db_free(&tb->common, - do_free_route_node, - parent, - &parent->u.route.free_item, - sizeof_route_node(parent->u.route.key.size)); - erts_schedule_db_free(&tb->common, - do_free_base_node, - thiz, - &thiz->u.base.free_item, - sizeof_base_node(thiz->u.base.lc_key.size)); - erts_schedule_db_free(&tb->common, - do_free_base_node, - neighbor, - &neighbor->u.base.free_item, - sizeof_base_node(neighbor->u.base.lc_key.size)); - - *result_parent_wb = neighbor_parent; - return new_neighbor; -} - -/* - * Used to merge together all base nodes for operations such as last - * and select_*. Returns the base node resulting from the merge in - * locked state. - */ -static DbTableCATreeNode * -merge_to_one_locked_base_node(DbTableCATree* tb) -{ - DbTableCATreeNode *parent; - DbTableCATreeNode *new_parent; - DbTableCATreeNode *base; - DbTableCATreeNode *new_base; - int is_not_valid; - /* Find first base node */ - do { - parent = NULL; - base = GET_ROOT_ACQB(tb); - while ( ! base->is_base_node ) { - parent = base; - base = GET_LEFT_ACQB(base); - } - wlock_base_node_no_stats(&(base->u.base)); - is_not_valid = ! base->u.base.is_valid; - if (is_not_valid) { - wunlock_base_node(&(base->u.base)); - } - } while(is_not_valid); - do { - new_base = erl_db_catree_force_join_right(tb, - parent, - base, - &new_parent); - if (new_base != NULL) { - base = new_base; - parent = new_parent; - } - } while(new_base != NULL); - return base; -} - - static void join_catree(DbTableCATree *tb, - DbTableCATreeNode *parent, - DbTableCATreeNode *thiz) + DbTableCATreeNode *thiz, + DbTableCATreeNode *parent) { DbTableCATreeNode *gparent; DbTableCATreeNode *neighbor; @@ -1369,7 +1093,7 @@ static void join_catree(DbTableCATree *tb, ASSERT(thiz->is_base_node); if (parent == NULL) { thiz->u.base.lock_statistics = 0; - wunlock_base_node(&thiz->u.base); + wunlock_base_node(thiz); return; } ASSERT(!parent->is_base_node); @@ -1378,12 +1102,12 @@ static void join_catree(DbTableCATree *tb, if (try_wlock_base_node(&neighbor->u.base)) { /* Failed to acquire lock */ thiz->u.base.lock_statistics = 0; - wunlock_base_node(&thiz->u.base); + wunlock_base_node(thiz); return; } else if (!neighbor->u.base.is_valid) { thiz->u.base.lock_statistics = 0; - wunlock_base_node(&thiz->u.base); - wunlock_base_node(&neighbor->u.base); + wunlock_base_node(thiz); + wunlock_base_node(neighbor); return; } else { lock_route_node(parent); @@ -1414,8 +1138,7 @@ static void join_catree(DbTableCATree *tb, { TreeDbTerm* new_root = join_trees(thiz->u.base.root, neighbor->u.base.root); - new_neighbor = create_base_node(tb, new_root, - LC_ORDER(thiz->u.base.lc_key.term)); + new_neighbor = create_base_node(tb, new_root); } if (GET_RIGHT(parent) == neighbor) { neighbor_parent = gparent; @@ -1429,12 +1152,12 @@ static void join_catree(DbTableCATree *tb, if (try_wlock_base_node(&neighbor->u.base)) { /* Failed to acquire lock */ thiz->u.base.lock_statistics = 0; - wunlock_base_node(&thiz->u.base); + wunlock_base_node(thiz); return; } else if (!neighbor->u.base.is_valid) { thiz->u.base.lock_statistics = 0; - wunlock_base_node(&thiz->u.base); - wunlock_base_node(&neighbor->u.base); + wunlock_base_node(thiz); + wunlock_base_node(neighbor); return; } else { lock_route_node(parent); @@ -1467,8 +1190,7 @@ static void join_catree(DbTableCATree *tb, { TreeDbTerm* new_root = join_trees(neighbor->u.base.root, thiz->u.base.root); - new_neighbor = create_base_node(tb, new_root, - LC_ORDER(thiz->u.base.lc_key.term)); + new_neighbor = create_base_node(tb, new_root); } if (GET_LEFT(parent) == neighbor) { neighbor_parent = gparent; @@ -1486,8 +1208,8 @@ static void join_catree(DbTableCATree *tb, } else { SET_RIGHT_RELB(neighbor_parent, new_neighbor); } - wunlock_base_node(&thiz->u.base); - wunlock_base_node(&neighbor->u.base); + wunlock_base_node(thiz); + wunlock_base_node(neighbor); /* Free the parent and base */ erts_schedule_db_free(&tb->common, do_free_route_node, @@ -1498,25 +1220,27 @@ static void join_catree(DbTableCATree *tb, do_free_base_node, thiz, &thiz->u.base.free_item, - sizeof_base_node(thiz->u.base.lc_key.size)); + sizeof_base_node()); erts_schedule_db_free(&tb->common, do_free_base_node, neighbor, &neighbor->u.base.free_item, - sizeof_base_node(neighbor->u.base.lc_key.size)); + sizeof_base_node()); } static void split_catree(DbTableCATree *tb, - DbTableCATreeNode* ERTS_RESTRICT parent, - DbTableCATreeNode* ERTS_RESTRICT base) { + DbTableCATreeNode* ERTS_RESTRICT base, + DbTableCATreeNode* ERTS_RESTRICT parent) +{ TreeDbTerm *splitOutWriteBack; DbTableCATreeNode* ERTS_RESTRICT new_left; DbTableCATreeNode* ERTS_RESTRICT new_right; DbTableCATreeNode* ERTS_RESTRICT new_route; if (less_than_two_elements(base->u.base.root)) { - base->u.base.lock_statistics = 0; - wunlock_base_node(&base->u.base); + if (!(tb->common.status & DB_CATREE_FORCE_SPLIT)) + base->u.base.lock_statistics = 0; + wunlock_base_node(base); return; } else { TreeDbTerm *left_tree; @@ -1525,10 +1249,8 @@ static void split_catree(DbTableCATree *tb, split_tree(tb, base->u.base.root, &splitOutWriteBack, &left_tree, &right_tree); - new_left = create_base_node(tb, left_tree, - LC_ORDER(GETKEY(tb, left_tree->dbterm.tpl))); - new_right = create_base_node(tb, right_tree, - LC_ORDER(GETKEY(tb, right_tree->dbterm.tpl))); + new_left = create_base_node(tb, left_tree); + new_right = create_base_node(tb, right_tree); new_route = create_route_node(tb, new_left, new_right, @@ -1542,12 +1264,12 @@ static void split_catree(DbTableCATree *tb, SET_RIGHT_RELB(parent, new_route); } base->u.base.is_valid = 0; - wunlock_base_node(&base->u.base); + wunlock_base_node(base); erts_schedule_db_free(&tb->common, do_free_base_node, base, &base->u.base.free_item, - sizeof_base_node(base->u.base.lc_key.size)); + sizeof_base_node()); } } @@ -1675,7 +1397,9 @@ void db_initialize_catree(void) int db_create_catree(Process *p, DbTable *tbl) { DbTableCATree *tb = &tbl->catree; - DbTableCATreeNode *root = create_base_node(tb, NULL, NIL); + DbTableCATreeNode *root; + + root = create_base_node(tb, NULL); tb->deletion = 0; tb->base_nodes_to_free_list = NULL; erts_atomic_init_relb(&(tb->root), (erts_aint_t)root); @@ -1684,162 +1408,416 @@ int db_create_catree(Process *p, DbTable *tbl) static int db_first_catree(Process *p, DbTable *tbl, Eterm *ret) { - DbTableCATreeBaseNode *base_node; + TreeDbTerm *root; + CATreeRootIterator iter; int result; - DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS; - /* Find first base node */ - base_node = &(lock_first_base_node(tbl, &search_stack, &locked_base_nodes_stack)->u.base); - /* Find next base node until non-empty base node is found */ - while (base_node != NULL && base_node->root == NULL) { - base_node = find_and_lock_next_base_node_and_path(tbl, &search_stack_ptr, &search_stack_copy_ptr, locked_base_nodes_stack_ptr); + + init_root_iterator(&tbl->catree, &iter, 1); + root = *catree_find_first_root(&iter); + if (!root) { + TreeDbTerm **pp = catree_find_next_root(&iter, NULL); + root = pp ? *pp : NULL; } - /* Get the return value */ - result = db_first_tree_common(p, tbl, (base_node == NULL ? NULL : base_node->root), ret, NULL); - /* Unlock base nodes */ - unlock_and_release_locked_base_node_stack(tbl, locked_base_nodes_stack_ptr); + + result = db_first_tree_common(p, tbl, root, ret, NULL); + + destroy_root_iterator(&iter); return result; } static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) { - DbTreeStack next_search_stack; - TreeDbTerm *next_search_stack_array[STACK_NEED]; - DbTableCATreeBaseNode *base_node; - int result = 0; - DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS; - init_tree_stack(&next_search_stack, next_search_stack_array, 0); - /* Lock base node with key */ - base_node = lock_base_node_with_key(tbl, key, &search_stack, &locked_base_nodes_stack); - /* Continue until key is not >= greatest key in base_node */ - while (base_node != NULL) { - result = db_next_tree_common(p, tbl, base_node->root, key, - ret, &next_search_stack); - if (result != DB_ERROR_NONE || *ret != am_EOT) { + DbTreeStack stack; + TreeDbTerm * stack_array[STACK_NEED]; + TreeDbTerm **rootp; + CATreeRootIterator iter; + int result; + + init_root_iterator(&tbl->catree, &iter, 1); + iter.next_route_key = key; + rootp = catree_find_next_root(&iter, NULL); + + do { + init_tree_stack(&stack, stack_array, 0); + result = db_next_tree_common(p, tbl, (rootp ? *rootp : NULL), key, ret, &stack); + if (result != DB_ERROR_NONE || *ret != am_EOT) break; - } - base_node = - find_and_lock_next_base_node_and_path(tbl, - &search_stack_ptr, - &search_stack_copy_ptr, - locked_base_nodes_stack_ptr); - } - unlock_and_release_locked_base_node_stack(tbl, &locked_base_nodes_stack); + + rootp = catree_find_next_root(&iter, NULL); + } while (rootp); + + destroy_root_iterator(&iter); return result; } static int db_last_catree(Process *p, DbTable *tbl, Eterm *ret) { - DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree); - int result = db_last_tree_common(p, tbl, base->u.base.root, ret, NULL); - wunlock_base_node(&(base->u.base)); + TreeDbTerm *root; + CATreeRootIterator iter; + int result; + + init_root_iterator(&tbl->catree, &iter, 1); + root = *catree_find_last_root(&iter); + if (!root) { + TreeDbTerm **pp = catree_find_prev_root(&iter, NULL); + root = pp ? *pp : NULL; + } + + result = db_last_tree_common(p, tbl, root, ret, NULL); + + destroy_root_iterator(&iter); return result; - } static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) { DbTreeStack stack; TreeDbTerm * stack_array[STACK_NEED]; + TreeDbTerm **rootp; + CATreeRootIterator iter; int result; - DbTableCATreeNode *base; - init_tree_stack(&stack, stack_array, 0); - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_prev_tree_common(p, tbl, base->u.base.root, key, ret, &stack); - wunlock_base_node(&(base->u.base)); + + init_root_iterator(&tbl->catree, &iter, 1); + iter.next_route_key = key; + rootp = catree_find_prev_root(&iter, NULL); + + do { + init_tree_stack(&stack, stack_array, 0); + result = db_prev_tree_common(p, tbl, (rootp ? *rootp : NULL), key, ret, + &stack); + if (result != DB_ERROR_NONE || *ret != am_EOT) + break; + rootp = catree_find_prev_root(&iter, NULL); + } while (rootp); + + destroy_root_iterator(&iter); return result; } -#define ERL_DB_CATREE_DO_OPERATION_PUT_PARAMS Eterm obj, int key_clash_fail -ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(put, - ERL_DB_CATREE_DO_OPERATION_PUT_PARAMS, - db_put_tree_common(&tb->common, - &base_node->root, - obj, - key_clash_fail, - NULL);) - static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail) { DbTableCATree *tb = &tbl->catree; Eterm key = GETKEY(&tb->common, tuple_val(obj)); - return erl_db_catree_do_operation_put(tb, - key, - obj, - key_clash_fail); + FindBaseNode fbn; + DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn); + int result = db_put_tree_common(&tb->common, &node->u.base.root, obj, + key_clash_fail, NULL); + wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level); + return result; } -#define ERL_DB_CATREE_DO_OPERATION_GET_PARAMS Process *p, Eterm *ret -ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(get, - ERL_DB_CATREE_DO_OPERATION_GET_PARAMS, - db_get_tree_common(p, &tb->common, - base_node->root, - key, ret, NULL);) - static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) { DbTableCATree *tb = &tbl->catree; - return erl_db_catree_do_operation_get(tb, key, p, ret); + DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key); + int result = db_get_tree_common(p, &tb->common, + node->u.base.root, + key, ret, NULL); + runlock_base_node(node); + return result; +} + +TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator* iter) +{ + FindBaseNode fbn; + DbTableCATreeNode* base_node; + + while (1) { + base_node = find_base_node(iter->tb, key, &fbn); + lock_iter_base_node(iter, base_node, fbn.parent, fbn.current_level); + if (base_node->u.base.is_valid) + break; + unlock_iter_base_node(iter); + } + return &base_node->u.base.root; +} + + +TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key, + CATreeRootIterator* iter) +{ +#ifdef DEBUG + DbTableCATreeNode *rejected_base = NULL; +#endif + DbTableCATreeNode *node; + DbTableCATreeNode *parent; + DbTableCATreeNode* next_route_node; + int current_level; + + ASSERT(!iter->locked_bnode); + + while (1) { + node = GET_ROOT_ACQB(iter->tb); + current_level = 0; + parent = NULL; + next_route_node = NULL; + while (!node->is_base_node) { + current_level++; + parent = node; + if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) <= 0) { + next_route_node = node; + node = GET_LEFT_ACQB(node); + } else { + node = GET_RIGHT_ACQB(node); + } + } + ASSERT(node != rejected_base); + lock_iter_base_node(iter, node, parent, current_level); + if (node->u.base.is_valid) { + iter->next_route_key = (next_route_node ? + next_route_node->u.route.key.term : + THE_NON_VALUE); + return &node->u.base.root; + } + /* Retry */ + unlock_iter_base_node(iter); +#ifdef DEBUG + rejected_base = node; +#endif + } +} + +static Eterm copy_iter_search_key(CATreeRootIterator* iter, Eterm key) +{ + Uint key_size; + + if (is_immed(key)) + return key; + + if (iter->search_key) { + ASSERT(key != iter->search_key->term); + destroy_route_key(iter->search_key); + } + key_size = size_object(key); + if (!iter->search_key || key_size > iter->search_key->size) { + iter->search_key = erts_realloc(ERTS_ALC_T_DB_TMP, + iter->search_key, + (offsetof(DbRouteKey, heap) + + key_size*sizeof(Eterm))); + } + return copy_route_key(iter->search_key, key, key_size); } -#define ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS Eterm *ret -ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(member, - ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS, - db_member_tree_common(&tb->common, - base_node->root, - key, - ret, - NULL);) +TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter, int next, + Eterm *keyp) +{ +#ifdef DEBUG + DbTableCATreeNode *rejected_invalid = NULL; + DbTableCATreeNode *rejected_empty = NULL; +#endif + DbTableCATreeNode *node; + DbTableCATreeNode *parent; + DbTableCATreeNode* next_route_node; + Eterm key = iter->next_route_key; + int current_level; + + if (iter->locked_bnode) { + if (keyp) + *keyp = copy_iter_search_key(iter, *keyp); + unlock_iter_base_node(iter); + } + + if (is_non_value(key)) + return NULL; + + while (1) { + node = GET_ROOT_ACQB(iter->tb); + current_level = 0; + parent = NULL; + next_route_node = NULL; + while (!node->is_base_node) { + current_level++; + parent = node; + if (next) { + if (cmp_key_route(key,node) < 0) { + next_route_node = node; + node = GET_LEFT_ACQB(node); + } else { + node = GET_RIGHT_ACQB(node); + } + } + else { + if (cmp_key_route(key,node) > 0) { + next_route_node = node; + node = GET_RIGHT_ACQB(node); + } else { + node = GET_LEFT_ACQB(node); + } + } + } + ASSERT(node != rejected_invalid); + lock_iter_base_node(iter, node, parent, current_level); + if (node->u.base.is_valid) { + ASSERT(node != rejected_empty); + if (node->u.base.root) { + iter->next_route_key = (next_route_node ? + next_route_node->u.route.key.term : + THE_NON_VALUE); + iter->locked_bnode = node; + return &node->u.base.root; + } + if (!next_route_node) { + unlock_iter_base_node(iter); + return NULL; + } + key = next_route_node->u.route.key.term; + IF_DEBUG(rejected_empty = node); + } + else + IF_DEBUG(rejected_invalid = node); + + /* Retry */ + unlock_iter_base_node(iter); + } +} + +TreeDbTerm** catree_find_next_root(CATreeRootIterator *iter, Eterm* keyp) +{ + return catree_find_nextprev_root(iter, 1, keyp); +} + +TreeDbTerm** catree_find_prev_root(CATreeRootIterator *iter, Eterm* keyp) +{ + return catree_find_nextprev_root(iter, 0, keyp); +} + + +TreeDbTerm** catree_find_next_from_pb_key_root(Eterm key, + CATreeRootIterator* iter) +{ +#ifdef DEBUG + DbTableCATreeNode *rejected_base = NULL; +#endif + DbTableCATreeNode *node; + DbTableCATreeNode *parent; + DbTableCATreeNode* next_route_node; + int current_level; + + ASSERT(!iter->locked_bnode); + + while (1) { + node = GET_ROOT_ACQB(iter->tb); + current_level = 0; + parent = NULL; + next_route_node = NULL; + while (!node->is_base_node) { + current_level++; + parent = node; + if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) >= 0) { + next_route_node = node; + node = GET_RIGHT_ACQB(node); + } else { + node = GET_LEFT_ACQB(node); + } + } + ASSERT(node != rejected_base); + lock_iter_base_node(iter, node, parent, current_level); + if (node->u.base.is_valid) { + iter->next_route_key = (next_route_node ? + next_route_node->u.route.key.term : + THE_NON_VALUE); + return &node->u.base.root; + } + /* Retry */ + unlock_iter_base_node(iter); +#ifdef DEBUG + rejected_base = node; +#endif + } +} + +static TreeDbTerm** catree_find_firstlast_root(CATreeRootIterator* iter, + int first) +{ +#ifdef DEBUG + DbTableCATreeNode *rejected_base = NULL; +#endif + DbTableCATreeNode *node; + DbTableCATreeNode* next_route_node; + int current_level; + + while (1) { + node = GET_ROOT_ACQB(iter->tb); + current_level = 0; + next_route_node = NULL; + while (!node->is_base_node) { + current_level++; + next_route_node = node; + node = first ? GET_LEFT_ACQB(node) : GET_RIGHT_ACQB(node); + } + ASSERT(node != rejected_base); + lock_iter_base_node(iter, node, next_route_node, current_level); + if (node->u.base.is_valid) { + iter->next_route_key = (next_route_node ? + next_route_node->u.route.key.term : + THE_NON_VALUE); + return &node->u.base.root; + } + /* Retry */ + unlock_iter_base_node(iter); +#ifdef DEBUG + rejected_base = node; +#endif + } +} + +TreeDbTerm** catree_find_first_root(CATreeRootIterator* iter) +{ + return catree_find_firstlast_root(iter, 1); +} + +TreeDbTerm** catree_find_last_root(CATreeRootIterator* iter) +{ + return catree_find_firstlast_root(iter, 0); +} static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret) { DbTableCATree *tb = &tbl->catree; - return erl_db_catree_do_operation_member(tb, key, ret); + DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key); + int result = db_member_tree_common(&tb->common, + node->u.base.root, + key, ret, NULL); + runlock_base_node(node); + return result; } -#define ERL_DB_CATREE_DO_OPERATION_GET_ELEMENT_PARAMS Process *p, int ndex, Eterm *ret -ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(get_element, - ERL_DB_CATREE_DO_OPERATION_GET_ELEMENT_PARAMS, - db_get_element_tree_common(p, &tb->common, - base_node->root, - key, ndex, - ret, NULL)) - static int db_get_element_catree(Process *p, DbTable *tbl, Eterm key, int ndex, Eterm *ret) { DbTableCATree *tb = &tbl->catree; - return erl_db_catree_do_operation_get_element(tb, key, p, ndex, ret); + DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key); + int result = db_get_element_tree_common(p, &tb->common, + node->u.base.root, + key, ndex, ret, NULL); + runlock_base_node(node); + return result; } -#define ERL_DB_CATREE_DO_OPERATION_ERASE_PARAMS DbTable *tbl, Eterm *ret -ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(erase, - ERL_DB_CATREE_DO_OPERATION_ERASE_PARAMS, - db_erase_tree_common(tbl, - &base_node->root, - key, - ret, - NULL);) - static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret) { DbTableCATree *tb = &tbl->catree; - return erl_db_catree_do_operation_erase(tb, key, tbl, ret); + FindBaseNode fbn; + DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn); + int result = db_erase_tree_common(tbl, &node->u.base.root, key, + ret, NULL); + wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level); + return result; } -#define ERL_DB_CATREE_DO_OPERATION_ERASE_OBJECT_PARAMS DbTable *tbl, Eterm object, Eterm *ret -ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(erase_object, - ERL_DB_CATREE_DO_OPERATION_ERASE_OBJECT_PARAMS, - db_erase_object_tree_common(tbl, - &base_node->root, - object, - ret, - NULL);) - static int db_erase_object_catree(DbTable *tbl, Eterm object, Eterm *ret) { DbTableCATree *tb = &tbl->catree; Eterm key = GETKEY(&tb->common, tuple_val(object)); - return erl_db_catree_do_operation_erase_object(tb, key, tbl, object, ret); + FindBaseNode fbn; + DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn); + int result = db_erase_object_tree_common(tbl, + &node->u.base.root, + object, + ret, + NULL); + wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level); + return result; } @@ -1847,11 +1825,12 @@ static int db_slot_catree(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_slot_tree_common(p, tbl, base->u.base.root, - slot_term, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_slot_tree_common(p, tbl, *catree_find_first_root(&iter), + slot_term, ret, NULL, &iter); + destroy_root_iterator(&iter); return result; } @@ -1861,25 +1840,25 @@ static int db_select_continue_catree(Process *p, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_continue_tree_common(p, &tbl->common, &base->u.base.root, - continuation, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_continue_tree_common(p, &tbl->common, + continuation, ret, NULL, &iter); + destroy_root_iterator(&iter); return result; } - static int db_select_catree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, int reverse, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_tree_common(p, &tbl->common, &base->u.base.root, - tid, pattern, reverse, ret, - NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_tree_common(p, tbl, tid, pattern, reverse, ret, + NULL, &iter); + destroy_root_iterator(&iter); return result; } @@ -1889,12 +1868,13 @@ static int db_select_count_continue_catree(Process *p, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_count_continue_tree_common(p, &tbl->common, - &base->u.base.root, - continuation, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_count_continue_tree_common(p, tbl, + continuation, ret, NULL, + &iter); + destroy_root_iterator(&iter); return result; } @@ -1902,11 +1882,12 @@ static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_count_tree_common(p, &tbl->common, &base->u.base.root, - tid, pattern, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_count_tree_common(p, tbl, + tid, pattern, ret, NULL, &iter); + destroy_root_iterator(&iter); return result; } @@ -1915,11 +1896,13 @@ static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid, int reversed, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_chunk_tree_common(p, &tbl->common, &base->u.base.root, - tid, pattern, chunk_size, reversed, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_chunk_tree_common(p, tbl, + tid, pattern, chunk_size, reversed, ret, + NULL, &iter); + destroy_root_iterator(&iter); return result; } @@ -1931,12 +1914,13 @@ static int db_select_delete_continue_catree(Process *p, DbTreeStack stack; TreeDbTerm * stack_array[STACK_NEED]; int result; - DbTableCATreeNode *base; + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 0); init_tree_stack(&stack, stack_array, 0); - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_delete_continue_tree_common(p, tbl, &base->u.base.root, - continuation, ret, &stack); - wunlock_base_node(&(base->u.base)); + result = db_select_delete_continue_tree_common(p, tbl, continuation, ret, + &stack, &iter); + destroy_root_iterator(&iter); return result; } @@ -1946,12 +1930,14 @@ static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid, DbTreeStack stack; TreeDbTerm * stack_array[STACK_NEED]; int result; - DbTableCATreeNode *base; + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 0); init_tree_stack(&stack, stack_array, 0); - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_delete_tree_common(p, tbl, &base->u.base.root, - tid, pattern, ret, &stack); - wunlock_base_node(&(base->u.base)); + result = db_select_delete_tree_common(p, tbl, + tid, pattern, ret, &stack, + &iter); + destroy_root_iterator(&iter); return result; } @@ -1959,12 +1945,12 @@ static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_replace_tree_common(p, &tbl->common, - &base->u.base.root, - tid, pattern, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 0); + result = db_select_replace_tree_common(p, tbl, + tid, pattern, ret, NULL, &iter); + destroy_root_iterator(&iter); return result; } @@ -1972,26 +1958,24 @@ static int db_select_replace_continue_catree(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_replace_continue_tree_common(p, &tbl->common, - &base->u.base.root, - continuation, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 0); + result = db_select_replace_continue_tree_common(p, tbl, continuation, ret, + NULL, &iter); + destroy_root_iterator(&iter); return result; } -#define ERL_DB_CATREE_DO_OPERATION_TAKE_PARAMS Process *p, Eterm *ret, DbTable *tbl -ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(take, - ERL_DB_CATREE_DO_OPERATION_TAKE_PARAMS, - db_take_tree_common(p, tbl, - &base_node->root, - key, ret, NULL);) - static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) { DbTableCATree *tb = &tbl->catree; - return erl_db_catree_do_operation_take(tb, key, p, ret, tbl); + FindBaseNode fbn; + DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn); + int result = db_take_tree_common(p, tbl, &node->u.base.root, key, + ret, NULL); + wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level); + return result; } /* @@ -2003,9 +1987,16 @@ static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) static void db_print_catree(fmtfn_t to, void *to_arg, int show, DbTable *tbl) { - DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree); - db_print_tree_common(to, to_arg, show, base->u.base.root, tbl); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + TreeDbTerm** root; + + init_root_iterator(&tbl->catree, &iter, 1); + root = catree_find_first_root(&iter); + do { + db_print_tree_common(to, to_arg, show, *root, tbl); + root = catree_find_next_root(&iter, NULL); + } while (root); + destroy_root_iterator(&iter); } /* Release all memory occupied by a single table */ @@ -2081,25 +2072,33 @@ static void db_foreach_offheap_catree(DbTable *tbl, void (*func)(ErlOffHeap *, void *), void *arg) { - DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree); - db_foreach_offheap_tree_common(base->u.base.root, func, arg); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + TreeDbTerm** root; + + init_root_iterator(&tbl->catree, &iter, 1); + root = catree_find_first_root(&iter); + do { + db_foreach_offheap_tree_common(*root, func, arg); + root = catree_find_next_root(&iter, NULL); + } while (root); + destroy_root_iterator(&iter); } static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm obj, DbUpdateHandle *handle) { DbTableCATree *tb = &tbl->catree; - int res; - ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node); - res = db_lookup_dbterm_tree_common(p, tbl, &base_node->root, key, obj, handle, NULL); + FindBaseNode fbn; + DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn); + int res = db_lookup_dbterm_tree_common(p, tbl, &node->u.base.root, key, + obj, handle, NULL); if (res == 0) { - ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART; + wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level); } else { /* db_finalize_dbterm_catree will unlock */ - handle->lck = prev_node; - handle->lck2 = current_node; - handle->current_level = current_level; + handle->u.catree.base_node = node; + handle->u.catree.parent = fbn.parent; + handle->u.catree.current_level = fbn.current_level; } return res; } @@ -2107,12 +2106,10 @@ static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm ob static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *handle) { DbTableCATree *tb = &(handle->tb->catree); - DbTableCATreeNode *prev_node = handle->lck; - DbTableCATreeNode *current_node = handle->lck2; - int current_level = handle->current_level; - DbTableCATreeBaseNode *base_node = ¤t_node->u.base; db_finalize_dbterm_tree_common(cret, handle, NULL); - ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART; + wunlock_adapt_base_node(tb, handle->u.catree.base_node, + handle->u.catree.parent, + handle->u.catree.current_level); return; } @@ -2146,6 +2143,61 @@ void erts_lcnt_enable_db_catree_lock_count(DbTableCATree *tb, int enable) } #endif /* ERTS_ENABLE_LOCK_COUNT */ +void db_catree_force_split(DbTableCATree* tb, int on) +{ + CATreeRootIterator iter; + TreeDbTerm** root; + + init_root_iterator(tb, &iter, 1); + root = catree_find_first_root(&iter); + do { + iter.locked_bnode->u.base.lock_statistics = (on ? INT_MAX : 0); + root = catree_find_next_root(&iter, NULL); + } while (root); + destroy_root_iterator(&iter); + + if (on) + tb->common.status |= DB_CATREE_FORCE_SPLIT; + else + tb->common.status &= ~DB_CATREE_FORCE_SPLIT; +} + +void db_calc_stats_catree(DbTableCATree* tb, DbCATreeStats* stats) +{ + DbTableCATreeNode* stack[ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT]; + DbTableCATreeNode* node; + Uint depth = 0; + + stats->route_nodes = 0; + stats->base_nodes = 0; + stats->max_depth = 0; + + node = GET_ROOT(tb); + do { + while (!node->is_base_node) { + stats->route_nodes++; + ASSERT(depth < sizeof(stack)/sizeof(*stack)); + stack[depth++] = node; /* PUSH parent */ + if (stats->max_depth < depth) + stats->max_depth = depth; + node = GET_LEFT(node); + } + stats->base_nodes++; + + while (depth > 0) { + DbTableCATreeNode* parent = stack[depth-1]; + if (node == GET_LEFT(parent)) { + node = GET_RIGHT(parent); + break; + } + else { + ASSERT(node == GET_RIGHT(parent)); + node = parent; + depth--; /* POP parent */ + } + } + } while (depth > 0); +} #ifdef HARDDEBUG |