diff options
author | Sverker Eriksson <[email protected]> | 2018-09-28 14:37:22 +0200 |
---|---|---|
committer | Sverker Eriksson <[email protected]> | 2018-10-03 19:00:53 +0200 |
commit | a875b2991bebe68cbb1ce1d0cb0b338fc3248cc9 (patch) | |
tree | 77c0a6b13cd4a793e6d5ef47028b916b54a5a1ec /erts/emulator/beam/erl_db_catree.c | |
parent | d88239752cdeacfac8858439334599b3ec471803 (diff) | |
download | otp-a875b2991bebe68cbb1ce1d0cb0b338fc3248cc9.tar.gz otp-a875b2991bebe68cbb1ce1d0cb0b338fc3248cc9.tar.bz2 otp-a875b2991bebe68cbb1ce1d0cb0b338fc3248cc9.zip |
erts: Do some refactoring in erl_db_catree.c
Fewer variables with shorter names
and prefer DbTableCATree over DbTableCommon.
Diffstat (limited to 'erts/emulator/beam/erl_db_catree.c')
-rw-r--r-- | erts/emulator/beam/erl_db_catree.c | 653 |
1 files changed, 285 insertions, 368 deletions
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c index 71768adcda..65ed7ab81d 100644 --- a/erts/emulator/beam/erl_db_catree.c +++ b/erts/emulator/beam/erl_db_catree.c @@ -235,8 +235,7 @@ DbTableMethod db_catree = #define SET_RIGHT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v)); /* Compares a key to the key in a route node */ -static ERTS_INLINE Sint cmp_key_route(DbTableCommon * tb, - Eterm key, +static ERTS_INLINE Sint cmp_key_route(Eterm key, DbTableCATreeNode *obj) { return CMP(key, GET_ROUTE_NODE_KEY(obj)); @@ -279,7 +278,7 @@ int less_than_two_elements(TreeDbTerm *root) * Inserts a TreeDbTerm into a tree. Returns the new root. */ static ERTS_INLINE -TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data, +TreeDbTerm* insert_TreeDbTerm(DbTableCATree *tb, TreeDbTerm *insert_to_root, TreeDbTerm *value_to_insert) { /* Non recursive insertion in AVL tree, building our own stack */ @@ -295,7 +294,7 @@ TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data, int dir; TreeDbTerm *p1, *p2, *p; - key = GETKEY(common_table_data, value_to_insert->dbterm.tpl); + key = GETKEY(tb, value_to_insert->dbterm.tpl); dstack[dpos++] = DIR_END; for (;;) @@ -305,7 +304,7 @@ TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data, (*this)->balance = 0; (*this)->left = (*this)->right = NULL; break; - } else if ((c = cmp_key(common_table_data, key, *this)) < 0) { + } else if ((c = cmp_key(&tb->common, key, *this)) < 0) { /* go lefts */ dstack[dpos++] = DIR_LEFT; tstack[tpos++] = this; @@ -392,7 +391,7 @@ TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data, * left_wb and the tree containing the rest of the keys in the write * back parameter right_wb. */ -static void split_tree(DbTableCommon *tb, +static void split_tree(DbTableCATree *tb, TreeDbTerm *root, TreeDbTerm **split_key_node_wb, TreeDbTerm **left_wb, @@ -413,9 +412,7 @@ static void split_tree(DbTableCommon *tb, split_node->left = NULL; right_root = split_node->right; split_node->right = NULL; - right_root = insert_TreeDbTerm(tb, - right_root, - split_node); + right_root = insert_TreeDbTerm(tb, right_root, split_node); *split_key_node_wb = split_node; *left_wb = left_root; *right_wb = right_root; @@ -700,15 +697,17 @@ void runlock_base_node(DbTableCATreeBaseNode *base_node) } static ERTS_INLINE -void lock_route_node(DbTableCATreeRouteNode *route_node) +void lock_route_node(DbTableCATreeNode *route_node) { - erts_mtx_lock(&route_node->lock); + ASSERT(!route_node->is_base_node); + erts_mtx_lock(&route_node->u.route.lock); } static ERTS_INLINE -void unlock_route_node(DbTableCATreeRouteNode *route_node) +void unlock_route_node(DbTableCATreeNode *route_node) { - erts_mtx_unlock(&route_node->lock); + ASSERT(!route_node->is_base_node); + erts_mtx_unlock(&route_node->u.route.lock); } @@ -722,7 +721,6 @@ void unlock_route_node(DbTableCATreeRouteNode *route_node) int retry; \ DbTableCATreeNode *current_node; \ DbTableCATreeNode *prev_node; \ - DbTableCommon* common_table_data = &tb->common; \ DbTableCATreeBaseNode *base_node; \ int current_level; \ (void)prev_node; \ @@ -734,7 +732,7 @@ void unlock_route_node(DbTableCATreeRouteNode *route_node) while ( ! current_node->is_base_node ) { \ current_level = current_level + 1; \ prev_node = current_node; \ - if (cmp_key_route(common_table_data,key,current_node) < 0) { \ + if (cmp_key_route(key,current_node) < 0) { \ current_node = GET_LEFT_ACQB(current_node); \ } else { \ current_node = GET_RIGHT_ACQB(current_node); \ @@ -753,7 +751,7 @@ void unlock_route_node(DbTableCATreeRouteNode *route_node) #define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \ if (base_node->lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT \ && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) { \ - split_catree(&tb->common, prev_node, current_node); \ + split_catree(tb, prev_node, current_node); \ } else if (base_node->lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) { \ join_catree(tb, prev_node, current_node); \ } else { \ @@ -787,24 +785,22 @@ void unlock_route_node(DbTableCATreeRouteNode *route_node) static DbTableCATreeNode *create_catree_base_node(DbTableCATree *tb) { - DbTableCATreeNode *new_base_node_container = - erts_db_alloc(ERTS_ALC_T_DB_TABLE, - (DbTable *) tb, - sizeof(DbTableCATreeNode)); - DbTableCATreeBaseNode *new_base_node = - &new_base_node_container->u.base; + DbTableCATreeNode *p = erts_db_alloc(ERTS_ALC_T_DB_TABLE, + (DbTable *) tb, + sizeof(DbTableCATreeNode)); erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER; - new_base_node_container->is_base_node = 1; - new_base_node->root = NULL; + p->is_base_node = 1; + p->u.base.root = NULL; if (tb->common.type & DB_FREQ_READ) rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ; if (erts_ets_rwmtx_spin_count >= 0) rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count; - erts_rwmtx_init_opt(&new_base_node->lock, &rwmtx_opt, - "erl_db_catree_base_node", tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB); - new_base_node->lock_statistics = 0; - new_base_node->is_valid = 1; - return new_base_node_container; + erts_rwmtx_init_opt(&p->u.base.lock, &rwmtx_opt, + "erl_db_catree_base_node", tb->common.the_name, + ERTS_LOCK_FLAGS_CATEGORY_DB); + p->u.base.lock_statistics = 0; + p->u.base.is_valid = 1; + return p; } static ERTS_INLINE Uint sizeof_route_node(Uint key_size) @@ -814,47 +810,37 @@ static ERTS_INLINE Uint sizeof_route_node(Uint key_size) } static DbTableCATreeNode* -create_catree_route_node(DbTableCommon * common_table_data, +create_catree_route_node(DbTableCATree *tb, DbTableCATreeNode *left, DbTableCATreeNode *right, DbTerm * keyTerm) { Eterm* top; - Eterm key = GETKEY(common_table_data,keyTerm->tpl); + Eterm key = GETKEY(tb,keyTerm->tpl); int key_size = size_object(key); - Uint offset = offsetof(DbTableCATreeNode,u.route.key); - size_t route_node_container_size = - offset + - sizeof(DbTerm) + - sizeof(Eterm)*key_size; ErlOffHeap tmp_offheap; - byte* new_route_node_container_bytes = - erts_db_alloc(ERTS_ALC_T_DB_TABLE, - (DbTable *) common_table_data, - route_node_container_size); - DbTerm* newp = (DbTerm*) (new_route_node_container_bytes + offset); - DbTableCATreeNode *new_route_node_container = - (DbTableCATreeNode*)new_route_node_container_bytes; - DbTableCATreeRouteNode *new_route_node = - &new_route_node_container->u.route; + DbTableCATreeNode* p = erts_db_alloc(ERTS_ALC_T_DB_TABLE, + (DbTable *) tb, + sizeof_route_node(key_size)); + if (key_size != 0) { - newp->size = key_size; - top = &newp->tpl[1]; + p->u.route.key.size = key_size; + top = &p->u.route.key.tpl[1]; tmp_offheap.first = NULL; - newp->tpl[0] = copy_struct(key, key_size, &top, &tmp_offheap); - newp->first_oh = tmp_offheap.first; + p->u.route.key.tpl[0] = copy_struct(key, key_size, &top, &tmp_offheap); + p->u.route.key.first_oh = tmp_offheap.first; } else { - newp->size = key_size; - newp->first_oh = NULL; - newp->tpl[0] = key; + p->u.route.key.size = key_size; + p->u.route.key.first_oh = NULL; + p->u.route.key.tpl[0] = key; } - new_route_node_container->is_base_node = 0; - new_route_node->is_valid = 1; - erts_atomic_init_nob(&(new_route_node->left), (erts_aint_t)left); - erts_atomic_init_nob(&(new_route_node->right), (erts_aint_t)right); - erts_mtx_init(&new_route_node->lock, "erl_db_catree_route_node", + p->is_base_node = 0; + p->u.route.is_valid = 1; + erts_atomic_init_nob(&p->u.route.left, (erts_aint_t)left); + erts_atomic_init_nob(&p->u.route.right, (erts_aint_t)right); + erts_mtx_init(&p->u.route.lock, "erl_db_catree_route_node", NIL, ERTS_LOCK_FLAGS_CATEGORY_DB); - return new_route_node_container; + return p; } static void do_free_base_node(void* vptr) @@ -895,29 +881,26 @@ static void free_catree_route_node(DbTableCATree* tb, DbTableCATreeNode* p) /* * Returns the parent routing node of the specified - * route_node_container if such a routing node exists or NULL if - * route_node_container is attached to the root + * route node 'child' if such a parent exists + * or NULL if 'child' is attached to the root. */ static ERTS_INLINE DbTableCATreeNode * parent_of(DbTableCATree *tb, - DbTableCATreeNode *route_node_container) + DbTableCATreeNode *child) { + Eterm key = GET_ROUTE_NODE_KEY(child); + DbTableCATreeNode *current = GET_ROOT_ACQB(tb); + DbTableCATreeNode *prev = NULL; - Eterm key = GET_ROUTE_NODE_KEY(route_node_container); - DbTableCATreeNode *current_node = GET_ROOT_ACQB(tb); - DbTableCATreeNode *prev_node = NULL; - if (current_node == route_node_container) { - return NULL; - } - while (current_node != route_node_container) { - prev_node = current_node; - if (cmp_key_route((DbTableCommon *)tb, key, current_node) < 0) { - current_node = GET_LEFT_ACQB(current_node); + while (current != child) { + prev = current; + if (cmp_key_route(key, current) < 0) { + current = GET_LEFT_ACQB(current); } else { - current_node = GET_RIGHT_ACQB(current_node); + current = GET_RIGHT_ACQB(current); } } - return prev_node; + return prev; } @@ -952,11 +935,7 @@ leftmost_route_node(DbTableCATreeNode *root) prev_node = node; node = GET_LEFT_ACQB(node); } - if (prev_node == NULL) { - return NULL; - } else { - return prev_node; - } + return prev_node; } static ERTS_INLINE DbTableCATreeNode* @@ -968,11 +947,7 @@ rightmost_route_node(DbTableCATreeNode *root) prev_node = node; node = GET_RIGHT_ACQB(node); } - if (prev_node == NULL) { - return NULL; - } else { - return prev_node; - } + return prev_node; } static ERTS_INLINE DbTableCATreeNode* @@ -987,8 +962,7 @@ leftmost_base_node_and_path(DbTableCATreeNode *root, CATreeNodeStack * stack) } static ERTS_INLINE DbTableCATreeNode* -get_next_base_node_and_path(DbTableCommon *common_table_data, - DbTableCATreeNode *base_node, +get_next_base_node_and_path(DbTableCATreeNode *base_node, CATreeNodeStack *stack) { if (EMPTY_NODE(stack)) { /* The parent of b is the root */ @@ -1004,7 +978,7 @@ get_next_base_node_and_path(DbTableCommon *common_table_data, POP_NODE(stack); while (!EMPTY_NODE(stack)) { if (TOP_NODE(stack)->u.route.is_valid && - cmp_key_route(common_table_data, pkey, TOP_NODE(stack)) <= 0) { + cmp_key_route(pkey, TOP_NODE(stack)) <= 0) { return leftmost_base_node_and_path(GET_RIGHT_ACQB(TOP_NODE(stack)), stack); } else { POP_NODE(stack); @@ -1033,12 +1007,10 @@ lock_first_base_node(DbTable *tbl, CATreeNodeStack *search_stack_ptr, CATreeNodeStack *locked_base_nodes_stack_ptr) { - int retry; DbTableCATreeNode *current_node; DbTableCATreeBaseNode *base_node; DbTableCATree* tb = &tbl->catree; - do { - retry = 0; + while (1) { current_node = GET_ROOT_ACQB(tb); while ( ! current_node->is_base_node ) { PUSH_NODE(search_stack_ptr, current_node); @@ -1046,12 +1018,11 @@ lock_first_base_node(DbTable *tbl, } base_node = ¤t_node->u.base; rlock_base_node(base_node); - if ( ! base_node->is_valid ) { - /* Retry */ - runlock_base_node(base_node); - retry = 1; - } - } while(retry); + if (base_node->is_valid) + break; + /* Retry */ + runlock_base_node(base_node); + } push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node); return current_node; } @@ -1065,19 +1036,20 @@ find_and_lock_next_base_node_and_path(DbTable *tbl, DbTableCATreeNode *current_node; DbTableCATreeBaseNode *base_node; CATreeNodeStack * tmp_stack_ptr; - DbTableCommon* common_table_data; - retry_find_and_lock_next_base_node: - current_node = TOP_NODE(locked_base_nodes_stack_ptr); - common_table_data = &tbl->common; - clone_stack(*search_stack_ptr_ptr, *search_stack_copy_ptr_ptr); - current_node = - get_next_base_node_and_path(common_table_data, current_node, *search_stack_ptr_ptr); - if (current_node == NULL) { - return NULL; - } - base_node = ¤t_node->u.base; - rlock_base_node(base_node); - if ( ! base_node->is_valid ) { + + while (1) { + current_node = TOP_NODE(locked_base_nodes_stack_ptr); + clone_stack(*search_stack_ptr_ptr, *search_stack_copy_ptr_ptr); + current_node = + get_next_base_node_and_path(current_node, *search_stack_ptr_ptr); + if (current_node == NULL) { + return NULL; + } + base_node = ¤t_node->u.base; + rlock_base_node(base_node); + if (base_node->is_valid) + break; + /* Retry */ runlock_base_node(base_node); /* Revert to previous state */ @@ -1085,10 +1057,9 @@ find_and_lock_next_base_node_and_path(DbTable *tbl, tmp_stack_ptr = *search_stack_ptr_ptr; *search_stack_ptr_ptr = *search_stack_copy_ptr_ptr; *search_stack_copy_ptr_ptr = tmp_stack_ptr; - goto retry_find_and_lock_next_base_node; - } else { - push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node); } + + push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node); return base_node; } @@ -1165,13 +1136,12 @@ lock_base_node_with_key(DbTable *tbl, DbTableCATreeNode *current_node; DbTableCATreeBaseNode *base_node; DbTableCATree* tb = &tbl->catree; - DbTableCommon* common_table_data = &tbl->common; do { retry = 0; current_node = GET_ROOT_ACQB(tb); while ( ! current_node->is_base_node ) { PUSH_NODE(search_stack_ptr, current_node); - if( cmp_key_route(common_table_data,key,current_node) < 0 ) { + if( cmp_key_route(key,current_node) < 0 ) { current_node = GET_LEFT_ACQB(current_node); } else { current_node = GET_RIGHT_ACQB(current_node); @@ -1195,106 +1165,93 @@ lock_base_node_with_key(DbTable *tbl, * node to join with. */ static DbTableCATreeNode* -erl_db_catree_force_join_right(DbTableCommon *common_table_data, - DbTableCATreeNode *parent_container, - DbTableCATreeNode *base_container, +erl_db_catree_force_join_right(DbTableCATree *tb, + DbTableCATreeNode *parent, + DbTableCATreeNode *thiz, DbTableCATreeNode **result_parent_wb) { - DbTableCATreeRouteNode *parent; - DbTableCATreeNode *gparent_container; - DbTableCATreeRouteNode *gparent; - DbTableCATreeBaseNode *base = &base_container->u.base; - DbTableCATree *tb = (DbTableCATree *)common_table_data; - DbTableCATreeNode *neighbor_base_container; - DbTableCATreeBaseNode *neighbor_base; - DbTableCATreeNode *new_neighbor_base; - DbTableCATreeNode *neighbor_base_parent; + DbTableCATreeNode *gparent; + DbTableCATreeNode *neighbor; + DbTableCATreeNode *new_neighbor; + DbTableCATreeNode *neighbor_parent; int neighbour_not_valid; - if (parent_container == NULL) { + if (parent == NULL) { return NULL; } - parent = &parent_container->u.route; do { - neighbor_base_container = leftmost_base_node(GET_RIGHT_ACQB(parent_container)); - neighbor_base = &neighbor_base_container->u.base; - wlock_base_node_no_stats(neighbor_base); - neighbour_not_valid = !neighbor_base->is_valid; + neighbor = leftmost_base_node(GET_RIGHT_ACQB(parent)); + wlock_base_node_no_stats(&neighbor->u.base); + neighbour_not_valid = !neighbor->u.base.is_valid; if (neighbour_not_valid) { - wunlock_base_node(neighbor_base); + wunlock_base_node(&neighbor->u.base); } } while (neighbour_not_valid); lock_route_node(parent); - parent->is_valid = 0; - neighbor_base->is_valid = 0; - base->is_valid = 0; + parent->u.route.is_valid = 0; + neighbor->u.base.is_valid = 0; + thiz->u.base.is_valid = 0; gparent = NULL; - gparent_container = NULL; do { if (gparent != NULL) { unlock_route_node(gparent); } - gparent_container = parent_of(tb, parent_container); - if (gparent_container != NULL) { - gparent = &gparent_container->u.route; + gparent = parent_of(tb, parent); + if (gparent != NULL) { lock_route_node(gparent); - } else { - gparent = NULL; } - } while (gparent != NULL && !gparent->is_valid); + } while (gparent != NULL && !gparent->u.route.is_valid); if (gparent == NULL) { - SET_ROOT_RELB(tb, GET_RIGHT(parent_container)); - } else if (GET_LEFT(gparent_container) == parent_container) { - SET_LEFT_RELB(gparent_container, GET_RIGHT(parent_container)); + SET_ROOT_RELB(tb, GET_RIGHT(parent)); + } else if (GET_LEFT(gparent) == parent) { + SET_LEFT_RELB(gparent, GET_RIGHT(parent)); } else { - SET_RIGHT_RELB(gparent_container, GET_RIGHT(parent_container)); + SET_RIGHT_RELB(gparent, GET_RIGHT(parent)); } unlock_route_node(parent); if (gparent != NULL) { unlock_route_node(gparent); } - new_neighbor_base = create_catree_base_node(tb); - new_neighbor_base->u.base.root = - join_trees(base->root, neighbor_base->root); - wlock_base_node_no_stats(&(new_neighbor_base->u.base)); - neighbor_base_parent = NULL; - if (GET_RIGHT(parent_container) == neighbor_base_container) { - neighbor_base_parent = gparent_container; + new_neighbor = create_catree_base_node(tb); + new_neighbor->u.base.root = + join_trees(thiz->u.base.root, neighbor->u.base.root); + wlock_base_node_no_stats(&(new_neighbor->u.base)); + if (GET_RIGHT(parent) == neighbor) { + neighbor_parent = gparent; } else { - neighbor_base_parent = - leftmost_route_node(GET_RIGHT(parent_container)); + neighbor_parent = leftmost_route_node(GET_RIGHT(parent)); } - if(neighbor_base_parent == NULL) { - SET_ROOT_RELB(tb, new_neighbor_base); - } else if (GET_LEFT(neighbor_base_parent) == neighbor_base_container) { - SET_LEFT_RELB(neighbor_base_parent, new_neighbor_base); + if(neighbor_parent == NULL) { + SET_ROOT_RELB(tb, new_neighbor); + } else if (GET_LEFT(neighbor_parent) == neighbor) { + SET_LEFT_RELB(neighbor_parent, new_neighbor); } else { - SET_RIGHT_RELB(neighbor_base_parent, new_neighbor_base); + SET_RIGHT_RELB(neighbor_parent, new_neighbor); } - wunlock_base_node(base); - wunlock_base_node(neighbor_base); + wunlock_base_node(&thiz->u.base); + wunlock_base_node(&neighbor->u.base); /* Free the parent and base */ - erts_schedule_db_free(common_table_data, + erts_schedule_db_free(&tb->common, do_free_route_node, - parent_container, - &parent->free_item, - sizeof_route_node(parent->key.size)); - erts_schedule_db_free(common_table_data, + parent, + &parent->u.route.free_item, + sizeof_route_node(parent->u.route.key.size)); + erts_schedule_db_free(&tb->common, do_free_base_node, - base_container, - &base->free_item, + thiz, + &thiz->u.base.free_item, sizeof(DbTableCATreeNode)); - erts_schedule_db_free(common_table_data, + erts_schedule_db_free(&tb->common, do_free_base_node, - neighbor_base_container, - &neighbor_base->free_item, + neighbor, + &neighbor->u.base.free_item, sizeof(DbTableCATreeNode)); - if (parent_container == neighbor_base_container) { - *result_parent_wb = gparent_container; + if (parent == neighbor) { + *result_parent_wb = gparent; } else { - *result_parent_wb = neighbor_base_parent; + *result_parent_wb = neighbor_parent; } - return new_neighbor_base; + return new_neighbor; } /* @@ -1303,250 +1260,224 @@ erl_db_catree_force_join_right(DbTableCommon *common_table_data, * locked state. */ static DbTableCATreeNode * -merge_to_one_locked_base_node(DbTableCommon * common_table_data) +merge_to_one_locked_base_node(DbTableCATree* tb) { - DbTableCATreeNode *parent_container; - DbTableCATreeNode *new_parent_container; - DbTableCATree *tb = (DbTableCATree *)common_table_data; - DbTableCATreeNode *base_container; - DbTableCATreeNode *new_base_container; + DbTableCATreeNode *parent; + DbTableCATreeNode *new_parent; + DbTableCATreeNode *base; + DbTableCATreeNode *new_base; int is_not_valid; /* Find first base node */ do { - parent_container = NULL; - base_container = GET_ROOT_ACQB(tb); - while ( ! base_container->is_base_node ) { - parent_container = base_container; - base_container = GET_LEFT_ACQB(base_container); + parent = NULL; + base = GET_ROOT_ACQB(tb); + while ( ! base->is_base_node ) { + parent = base; + base = GET_LEFT_ACQB(base); } - wlock_base_node_no_stats(&(base_container->u.base)); - is_not_valid = ! base_container->u.base.is_valid; + wlock_base_node_no_stats(&(base->u.base)); + is_not_valid = ! base->u.base.is_valid; if (is_not_valid) { - wunlock_base_node(&(base_container->u.base)); + wunlock_base_node(&(base->u.base)); } } while(is_not_valid); do { - new_base_container = erl_db_catree_force_join_right(common_table_data, - parent_container, - base_container, - &new_parent_container); - if (new_base_container != NULL) { - base_container = new_base_container; - parent_container = new_parent_container; + new_base = erl_db_catree_force_join_right(tb, + parent, + base, + &new_parent); + if (new_base != NULL) { + base = new_base; + parent = new_parent; } - } while(new_base_container != NULL); - return base_container; + } while(new_base != NULL); + return base; } static void join_catree(DbTableCATree *tb, - DbTableCATreeNode *parent_container, - DbTableCATreeNode *base_container) -{ - DbTableCATreeRouteNode *parent; - DbTableCATreeNode *gparent_container; - DbTableCATreeRouteNode *gparent; - DbTableCATreeBaseNode *base = &base_container->u.base; - DbTableCATreeNode *neighbor_base_container; - DbTableCATreeBaseNode *neighbor_base; - DbTableCATreeNode *new_neighbor_base; - DbTableCATreeNode *neighbor_base_parent; - if (parent_container == NULL) { - base->lock_statistics = 0; - wunlock_base_node(base); + DbTableCATreeNode *parent, + DbTableCATreeNode *thiz) +{ + DbTableCATreeNode *gparent; + DbTableCATreeNode *neighbor; + DbTableCATreeNode *new_neighbor; + DbTableCATreeNode *neighbor_parent; + + ASSERT(thiz->is_base_node); + if (parent == NULL) { + thiz->u.base.lock_statistics = 0; + wunlock_base_node(&thiz->u.base); return; } - parent = &parent_container->u.route; - if (GET_LEFT(parent_container) == base_container) { - neighbor_base_container = leftmost_base_node(GET_RIGHT_ACQB(parent_container)); - neighbor_base = &neighbor_base_container->u.base; - if (try_wlock_base_node(neighbor_base)) { + ASSERT(!parent->is_base_node); + if (GET_LEFT(parent) == thiz) { + neighbor = leftmost_base_node(GET_RIGHT_ACQB(parent)); + if (try_wlock_base_node(&neighbor->u.base)) { /* Failed to acquire lock */ - base->lock_statistics = 0; - wunlock_base_node(base); + thiz->u.base.lock_statistics = 0; + wunlock_base_node(&thiz->u.base); return; - } else if (!neighbor_base->is_valid) { - base->lock_statistics = 0; - wunlock_base_node(base); - wunlock_base_node(neighbor_base); + } else if (!neighbor->u.base.is_valid) { + thiz->u.base.lock_statistics = 0; + wunlock_base_node(&thiz->u.base); + wunlock_base_node(&neighbor->u.base); return; } else { lock_route_node(parent); - parent->is_valid = 0; - neighbor_base->is_valid = 0; - base->is_valid = 0; + parent->u.route.is_valid = 0; + neighbor->u.base.is_valid = 0; + thiz->u.base.is_valid = 0; gparent = NULL; - gparent_container = NULL; do { if (gparent != NULL) { unlock_route_node(gparent); } - gparent_container = parent_of(tb, parent_container); - if (gparent_container != NULL) { - gparent = &gparent_container->u.route; + gparent = parent_of(tb, parent); + if (gparent != NULL) lock_route_node(gparent); - } else { - gparent = NULL; - } - } while (gparent != NULL && !gparent->is_valid); + } while (gparent != NULL && !gparent->u.route.is_valid); + if (gparent == NULL) { - SET_ROOT_RELB(tb, GET_RIGHT(parent_container)); - } else if (GET_LEFT(gparent_container) == parent_container) { - SET_LEFT_RELB(gparent_container, GET_RIGHT(parent_container)); + SET_ROOT_RELB(tb, GET_RIGHT(parent)); + } else if (GET_LEFT(gparent) == parent) { + SET_LEFT_RELB(gparent, GET_RIGHT(parent)); } else { - SET_RIGHT_RELB(gparent_container, GET_RIGHT(parent_container)); + SET_RIGHT_RELB(gparent, GET_RIGHT(parent)); } unlock_route_node(parent); if (gparent != NULL) { unlock_route_node(gparent); } - new_neighbor_base = create_catree_base_node(tb); - new_neighbor_base->u.base.root = - join_trees(base->root, neighbor_base->root); - neighbor_base_parent = NULL; - if (GET_RIGHT(parent_container) == neighbor_base_container) { - neighbor_base_parent = gparent_container; + new_neighbor = create_catree_base_node(tb); + new_neighbor->u.base.root = join_trees(thiz->u.base.root, + neighbor->u.base.root); + neighbor_parent = NULL; + if (GET_RIGHT(parent) == neighbor) { + neighbor_parent = gparent; } else { - neighbor_base_parent = - leftmost_route_node(GET_RIGHT(parent_container)); + neighbor_parent = leftmost_route_node(GET_RIGHT(parent)); } } } else { /* Symetric case */ - neighbor_base_container = rightmost_base_node(GET_LEFT_ACQB(parent_container)); - neighbor_base = &neighbor_base_container->u.base; - if (try_wlock_base_node(neighbor_base)) { + ASSERT(GET_RIGHT(parent) == thiz); + neighbor = rightmost_base_node(GET_LEFT_ACQB(parent)); + if (try_wlock_base_node(&neighbor->u.base)) { /* Failed to acquire lock */ - base->lock_statistics = 0; - wunlock_base_node(base); + thiz->u.base.lock_statistics = 0; + wunlock_base_node(&thiz->u.base); return; - } else if (!neighbor_base->is_valid) { - base->lock_statistics = 0; - wunlock_base_node(base); - wunlock_base_node(neighbor_base); + } else if (!neighbor->u.base.is_valid) { + thiz->u.base.lock_statistics = 0; + wunlock_base_node(&thiz->u.base); + wunlock_base_node(&neighbor->u.base); return; } else { lock_route_node(parent); - parent->is_valid = 0; - neighbor_base->is_valid = 0; - base->is_valid = 0; + parent->u.route.is_valid = 0; + neighbor->u.base.is_valid = 0; + thiz->u.base.is_valid = 0; gparent = NULL; - gparent_container = NULL; do { if (gparent != NULL) { unlock_route_node(gparent); } - gparent_container = parent_of(tb, parent_container); - if (gparent_container != NULL) { - gparent = &gparent_container->u.route; + gparent = parent_of(tb, parent); + if (gparent != NULL) { lock_route_node(gparent); } else { gparent = NULL; } - } while (gparent != NULL && !gparent->is_valid); + } while (gparent != NULL && !gparent->u.route.is_valid); if (gparent == NULL) { - SET_ROOT_RELB(tb, GET_LEFT(parent_container)); - } else if (GET_RIGHT(gparent_container) == parent_container) { - SET_RIGHT_RELB(gparent_container, GET_LEFT(parent_container)); + SET_ROOT_RELB(tb, GET_LEFT(parent)); + } else if (GET_RIGHT(gparent) == parent) { + SET_RIGHT_RELB(gparent, GET_LEFT(parent)); } else { - SET_LEFT_RELB(gparent_container, GET_LEFT(parent_container)); + SET_LEFT_RELB(gparent, GET_LEFT(parent)); } unlock_route_node(parent); if (gparent != NULL) { unlock_route_node(gparent); } - new_neighbor_base = create_catree_base_node(tb); - new_neighbor_base->u.base.root = - join_trees(neighbor_base->root, base->root); - neighbor_base_parent = NULL; - if (GET_LEFT(parent_container) == neighbor_base_container) { - neighbor_base_parent = gparent_container; + new_neighbor = create_catree_base_node(tb); + new_neighbor->u.base.root = join_trees(neighbor->u.base.root, + thiz->u.base.root); + neighbor_parent = NULL; + if (GET_LEFT(parent) == neighbor) { + neighbor_parent = gparent; } else { - neighbor_base_parent = - rightmost_route_node(GET_LEFT(parent_container)); + neighbor_parent = + rightmost_route_node(GET_LEFT(parent)); } } } /* Link in new neighbor and free nodes that are no longer in the tree */ - if (neighbor_base_parent == NULL) { - SET_ROOT_RELB(tb, new_neighbor_base); - } else if (GET_LEFT(neighbor_base_parent) == neighbor_base_container) { - SET_LEFT_RELB(neighbor_base_parent, new_neighbor_base); + if (neighbor_parent == NULL) { + SET_ROOT_RELB(tb, new_neighbor); + } else if (GET_LEFT(neighbor_parent) == neighbor) { + SET_LEFT_RELB(neighbor_parent, new_neighbor); } else { - SET_RIGHT_RELB(neighbor_base_parent, new_neighbor_base); + SET_RIGHT_RELB(neighbor_parent, new_neighbor); } - wunlock_base_node(base); - wunlock_base_node(neighbor_base); + wunlock_base_node(&thiz->u.base); + wunlock_base_node(&neighbor->u.base); /* Free the parent and base */ erts_schedule_db_free(&tb->common, do_free_route_node, - parent_container, - &parent->free_item, - sizeof_route_node(parent->key.size)); + parent, + &parent->u.route.free_item, + sizeof_route_node(parent->u.route.key.size)); erts_schedule_db_free(&tb->common, do_free_base_node, - base_container, - &base->free_item, + thiz, + &thiz->u.base.free_item, sizeof(DbTableCATreeNode)); erts_schedule_db_free(&tb->common, do_free_base_node, - neighbor_base_container, - &neighbor_base->free_item, + neighbor, + &neighbor->u.base.free_item, sizeof(DbTableCATreeNode)); } - -static void split_catree(DbTableCommon *tb, - DbTableCATreeNode *parent_container, - DbTableCATreeNode *base_container) { +static void split_catree(DbTableCATree *tb, + DbTableCATreeNode *parent, + DbTableCATreeNode *base) { TreeDbTerm *splitOutWriteBack; - TreeDbTerm *leftWriteBack; - TreeDbTerm *rightWriteBack; - DbTableCATreeNode *left_base_node; - DbTableCATreeNode *right_base_node; - DbTableCATreeNode *routing_node_container; - DbTableCATreeBaseNode *base = &base_container->u.base; - DbTableCATreeRouteNode *parent; - if (parent_container == NULL) { - parent = NULL; - } else { - parent = &parent_container->u.route; - } + DbTableCATreeNode *new_left; + DbTableCATreeNode *new_right; + DbTableCATreeNode *new_route; - if (less_than_two_elements(base->root)) { - base->lock_statistics = 0; - wunlock_base_node(base); + if (less_than_two_elements(base->u.base.root)) { + base->u.base.lock_statistics = 0; + wunlock_base_node(&base->u.base); return; } else { - /* Split the tree */ + new_left = create_catree_base_node(tb); + new_right = create_catree_base_node(tb); + split_tree(tb, - base->root, + base->u.base.root, &splitOutWriteBack, - &leftWriteBack, - &rightWriteBack); - /* Create new base nodes */ - left_base_node = - create_catree_base_node((DbTableCATree*)tb); - right_base_node = - create_catree_base_node((DbTableCATree*)tb); - left_base_node->u.base.root = leftWriteBack; - right_base_node->u.base.root = rightWriteBack; - routing_node_container = create_catree_route_node(tb, - left_base_node, - right_base_node, - &splitOutWriteBack->dbterm); + &new_left->u.base.root, + &new_right->u.base.root); + new_route = create_catree_route_node(tb, + new_left, + new_right, + &splitOutWriteBack->dbterm); if (parent == NULL) { - SET_ROOT_RELB((DbTableCATree*)tb, routing_node_container); - } else if(GET_LEFT(parent_container) == base_container) { - SET_LEFT_RELB(parent_container, routing_node_container); + SET_ROOT_RELB(tb, new_route); + } else if(GET_LEFT(parent) == base) { + SET_LEFT_RELB(parent, new_route); } else { - SET_RIGHT_RELB(parent_container, routing_node_container); + SET_RIGHT_RELB(parent, new_route); } - base->is_valid = 0; - wunlock_base_node(base); - erts_schedule_db_free(tb, + base->u.base.is_valid = 0; + wunlock_base_node(&base->u.base); + erts_schedule_db_free(&tb->common, do_free_base_node, - base_container, - &base->free_item, + base, + &base->u.base.free_item, sizeof(DbTableCATreeNode)); } } @@ -1729,8 +1660,7 @@ static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) static int db_last_catree(Process *p, DbTable *tbl, Eterm *ret) { - DbTableCATree *tb = &tbl->catree; - DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common); + DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree); int result = db_last_tree_common(p, tbl, base->u.base.root, ret, NULL); wunlock_base_node(&(base->u.base)); return result; @@ -1741,11 +1671,10 @@ static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) { DbTreeStack stack; TreeDbTerm * stack_array[STACK_NEED]; - DbTableCATree *tb = &tbl->catree; int result; DbTableCATreeNode *base; init_tree_stack(&stack, stack_array, 0); - base = merge_to_one_locked_base_node(&tb->common); + base = merge_to_one_locked_base_node(&tbl->catree); result = db_prev_tree_common(p, tbl, base->u.base.root, key, ret, &stack); wunlock_base_node(&(base->u.base)); return result; @@ -1848,10 +1777,9 @@ static int db_erase_object_catree(DbTable *tbl, Eterm object, Eterm *ret) static int db_slot_catree(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret) { - DbTableCATree *tb = &tbl->catree; int result; DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tb->common); + base = merge_to_one_locked_base_node(&tbl->catree); result = db_slot_tree_common(p, tbl, base->u.base.root, slot_term, ret, NULL); wunlock_base_node(&(base->u.base)); @@ -1863,11 +1791,10 @@ static int db_select_continue_catree(Process *p, Eterm continuation, Eterm *ret) { - DbTableCATree *tb = &tbl->catree; int result; DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tb->common); - result = db_select_continue_tree_common(p, &tb->common, &base->u.base.root, + base = merge_to_one_locked_base_node(&tbl->catree); + result = db_select_continue_tree_common(p, &tbl->common, &base->u.base.root, continuation, ret, NULL); wunlock_base_node(&(base->u.base)); return result; @@ -1877,11 +1804,10 @@ static int db_select_continue_catree(Process *p, static int db_select_catree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, int reverse, Eterm *ret) { - DbTableCATree *tb = &tbl->catree; int result; DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tb->common); - result = db_select_tree_common(p, &tb->common, &base->u.base.root, + base = merge_to_one_locked_base_node(&tbl->catree); + result = db_select_tree_common(p, &tbl->common, &base->u.base.root, tid, pattern, reverse, ret, NULL); wunlock_base_node(&(base->u.base)); @@ -1893,11 +1819,10 @@ static int db_select_count_continue_catree(Process *p, Eterm continuation, Eterm *ret) { - DbTableCATree *tb = &tbl->catree; int result; DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tb->common); - result = db_select_count_continue_tree_common(p, &tb->common, + base = merge_to_one_locked_base_node(&tbl->catree); + result = db_select_count_continue_tree_common(p, &tbl->common, &base->u.base.root, continuation, ret, NULL); wunlock_base_node(&(base->u.base)); @@ -1907,11 +1832,10 @@ static int db_select_count_continue_catree(Process *p, static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { - DbTableCATree *tb = &tbl->catree; int result; DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tb->common); - result = db_select_count_tree_common(p, &tb->common, &base->u.base.root, + base = merge_to_one_locked_base_node(&tbl->catree); + result = db_select_count_tree_common(p, &tbl->common, &base->u.base.root, tid, pattern, ret, NULL); wunlock_base_node(&(base->u.base)); return result; @@ -1921,11 +1845,10 @@ static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, int reversed, Eterm *ret) { - DbTableCATree *tb = &tbl->catree; int result; DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tb->common); - result = db_select_chunk_tree_common(p, &tb->common, &base->u.base.root, + base = merge_to_one_locked_base_node(&tbl->catree); + result = db_select_chunk_tree_common(p, &tbl->common, &base->u.base.root, tid, pattern, chunk_size, reversed, ret, NULL); wunlock_base_node(&(base->u.base)); return result; @@ -1936,13 +1859,12 @@ static int db_select_delete_continue_catree(Process *p, Eterm continuation, Eterm *ret) { - DbTableCATree *tb = &tbl->catree; DbTreeStack stack; TreeDbTerm * stack_array[STACK_NEED]; int result; DbTableCATreeNode *base; init_tree_stack(&stack, stack_array, 0); - base = merge_to_one_locked_base_node(&tb->common); + base = merge_to_one_locked_base_node(&tbl->catree); result = db_select_delete_continue_tree_common(p, tbl, &base->u.base.root, continuation, ret, &stack); wunlock_base_node(&(base->u.base)); @@ -1952,13 +1874,12 @@ static int db_select_delete_continue_catree(Process *p, static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { - DbTableCATree *tb = &tbl->catree; DbTreeStack stack; TreeDbTerm * stack_array[STACK_NEED]; int result; DbTableCATreeNode *base; init_tree_stack(&stack, stack_array, 0); - base = merge_to_one_locked_base_node(&tb->common); + base = merge_to_one_locked_base_node(&tbl->catree); result = db_select_delete_tree_common(p, tbl, &base->u.base.root, tid, pattern, ret, &stack); wunlock_base_node(&(base->u.base)); @@ -1968,11 +1889,10 @@ static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid, static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { - DbTableCATree *tb = &tbl->catree; int result; DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tb->common); - result = db_select_replace_tree_common(p, &tb->common, + base = merge_to_one_locked_base_node(&tbl->catree); + result = db_select_replace_tree_common(p, &tbl->common, &base->u.base.root, tid, pattern, ret, NULL); wunlock_base_node(&(base->u.base)); @@ -1982,11 +1902,10 @@ static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid, static int db_select_replace_continue_catree(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret) { - DbTableCATree *tb = &tbl->catree; int result; DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tb->common); - result = db_select_replace_continue_tree_common(p, &tb->common, + base = merge_to_one_locked_base_node(&tbl->catree); + result = db_select_replace_continue_tree_common(p, &tbl->common, &base->u.base.root, continuation, ret, NULL); wunlock_base_node(&(base->u.base)); @@ -2015,8 +1934,7 @@ static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) static void db_print_catree(fmtfn_t to, void *to_arg, int show, DbTable *tbl) { - DbTableCATree *tb = &tbl->catree; - DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common); + DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree); db_print_tree_common(to, to_arg, show, base->u.base.root, tbl); wunlock_base_node(&(base->u.base)); } @@ -2094,8 +2012,7 @@ static void db_foreach_offheap_catree(DbTable *tbl, void (*func)(ErlOffHeap *, void *), void *arg) { - DbTableCATree *tb = &tbl->catree; - DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common); + DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree); db_foreach_offheap_tree_common(base->u.base.root, func, arg); wunlock_base_node(&(base->u.base)); } |