aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_db_catree.c
diff options
context:
space:
mode:
authorSverker Eriksson <[email protected]>2018-09-28 14:37:22 +0200
committerSverker Eriksson <[email protected]>2018-10-03 19:00:53 +0200
commita875b2991bebe68cbb1ce1d0cb0b338fc3248cc9 (patch)
tree77c0a6b13cd4a793e6d5ef47028b916b54a5a1ec /erts/emulator/beam/erl_db_catree.c
parentd88239752cdeacfac8858439334599b3ec471803 (diff)
downloadotp-a875b2991bebe68cbb1ce1d0cb0b338fc3248cc9.tar.gz
otp-a875b2991bebe68cbb1ce1d0cb0b338fc3248cc9.tar.bz2
otp-a875b2991bebe68cbb1ce1d0cb0b338fc3248cc9.zip
erts: Do some refactoring in erl_db_catree.c
Fewer variables with shorter names and prefer DbTableCATree over DbTableCommon.
Diffstat (limited to 'erts/emulator/beam/erl_db_catree.c')
-rw-r--r--erts/emulator/beam/erl_db_catree.c653
1 files changed, 285 insertions, 368 deletions
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c
index 71768adcda..65ed7ab81d 100644
--- a/erts/emulator/beam/erl_db_catree.c
+++ b/erts/emulator/beam/erl_db_catree.c
@@ -235,8 +235,7 @@ DbTableMethod db_catree =
#define SET_RIGHT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v));
/* Compares a key to the key in a route node */
-static ERTS_INLINE Sint cmp_key_route(DbTableCommon * tb,
- Eterm key,
+static ERTS_INLINE Sint cmp_key_route(Eterm key,
DbTableCATreeNode *obj)
{
return CMP(key, GET_ROUTE_NODE_KEY(obj));
@@ -279,7 +278,7 @@ int less_than_two_elements(TreeDbTerm *root)
* Inserts a TreeDbTerm into a tree. Returns the new root.
*/
static ERTS_INLINE
-TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data,
+TreeDbTerm* insert_TreeDbTerm(DbTableCATree *tb,
TreeDbTerm *insert_to_root,
TreeDbTerm *value_to_insert) {
/* Non recursive insertion in AVL tree, building our own stack */
@@ -295,7 +294,7 @@ TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data,
int dir;
TreeDbTerm *p1, *p2, *p;
- key = GETKEY(common_table_data, value_to_insert->dbterm.tpl);
+ key = GETKEY(tb, value_to_insert->dbterm.tpl);
dstack[dpos++] = DIR_END;
for (;;)
@@ -305,7 +304,7 @@ TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data,
(*this)->balance = 0;
(*this)->left = (*this)->right = NULL;
break;
- } else if ((c = cmp_key(common_table_data, key, *this)) < 0) {
+ } else if ((c = cmp_key(&tb->common, key, *this)) < 0) {
/* go lefts */
dstack[dpos++] = DIR_LEFT;
tstack[tpos++] = this;
@@ -392,7 +391,7 @@ TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data,
* left_wb and the tree containing the rest of the keys in the write
* back parameter right_wb.
*/
-static void split_tree(DbTableCommon *tb,
+static void split_tree(DbTableCATree *tb,
TreeDbTerm *root,
TreeDbTerm **split_key_node_wb,
TreeDbTerm **left_wb,
@@ -413,9 +412,7 @@ static void split_tree(DbTableCommon *tb,
split_node->left = NULL;
right_root = split_node->right;
split_node->right = NULL;
- right_root = insert_TreeDbTerm(tb,
- right_root,
- split_node);
+ right_root = insert_TreeDbTerm(tb, right_root, split_node);
*split_key_node_wb = split_node;
*left_wb = left_root;
*right_wb = right_root;
@@ -700,15 +697,17 @@ void runlock_base_node(DbTableCATreeBaseNode *base_node)
}
static ERTS_INLINE
-void lock_route_node(DbTableCATreeRouteNode *route_node)
+void lock_route_node(DbTableCATreeNode *route_node)
{
- erts_mtx_lock(&route_node->lock);
+ ASSERT(!route_node->is_base_node);
+ erts_mtx_lock(&route_node->u.route.lock);
}
static ERTS_INLINE
-void unlock_route_node(DbTableCATreeRouteNode *route_node)
+void unlock_route_node(DbTableCATreeNode *route_node)
{
- erts_mtx_unlock(&route_node->lock);
+ ASSERT(!route_node->is_base_node);
+ erts_mtx_unlock(&route_node->u.route.lock);
}
@@ -722,7 +721,6 @@ void unlock_route_node(DbTableCATreeRouteNode *route_node)
int retry; \
DbTableCATreeNode *current_node; \
DbTableCATreeNode *prev_node; \
- DbTableCommon* common_table_data = &tb->common; \
DbTableCATreeBaseNode *base_node; \
int current_level; \
(void)prev_node; \
@@ -734,7 +732,7 @@ void unlock_route_node(DbTableCATreeRouteNode *route_node)
while ( ! current_node->is_base_node ) { \
current_level = current_level + 1; \
prev_node = current_node; \
- if (cmp_key_route(common_table_data,key,current_node) < 0) { \
+ if (cmp_key_route(key,current_node) < 0) { \
current_node = GET_LEFT_ACQB(current_node); \
} else { \
current_node = GET_RIGHT_ACQB(current_node); \
@@ -753,7 +751,7 @@ void unlock_route_node(DbTableCATreeRouteNode *route_node)
#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \
if (base_node->lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT \
&& current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) { \
- split_catree(&tb->common, prev_node, current_node); \
+ split_catree(tb, prev_node, current_node); \
} else if (base_node->lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) { \
join_catree(tb, prev_node, current_node); \
} else { \
@@ -787,24 +785,22 @@ void unlock_route_node(DbTableCATreeRouteNode *route_node)
static DbTableCATreeNode *create_catree_base_node(DbTableCATree *tb)
{
- DbTableCATreeNode *new_base_node_container =
- erts_db_alloc(ERTS_ALC_T_DB_TABLE,
- (DbTable *) tb,
- sizeof(DbTableCATreeNode));
- DbTableCATreeBaseNode *new_base_node =
- &new_base_node_container->u.base;
+ DbTableCATreeNode *p = erts_db_alloc(ERTS_ALC_T_DB_TABLE,
+ (DbTable *) tb,
+ sizeof(DbTableCATreeNode));
erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
- new_base_node_container->is_base_node = 1;
- new_base_node->root = NULL;
+ p->is_base_node = 1;
+ p->u.base.root = NULL;
if (tb->common.type & DB_FREQ_READ)
rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
if (erts_ets_rwmtx_spin_count >= 0)
rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
- erts_rwmtx_init_opt(&new_base_node->lock, &rwmtx_opt,
- "erl_db_catree_base_node", tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB);
- new_base_node->lock_statistics = 0;
- new_base_node->is_valid = 1;
- return new_base_node_container;
+ erts_rwmtx_init_opt(&p->u.base.lock, &rwmtx_opt,
+ "erl_db_catree_base_node", tb->common.the_name,
+ ERTS_LOCK_FLAGS_CATEGORY_DB);
+ p->u.base.lock_statistics = 0;
+ p->u.base.is_valid = 1;
+ return p;
}
static ERTS_INLINE Uint sizeof_route_node(Uint key_size)
@@ -814,47 +810,37 @@ static ERTS_INLINE Uint sizeof_route_node(Uint key_size)
}
static DbTableCATreeNode*
-create_catree_route_node(DbTableCommon * common_table_data,
+create_catree_route_node(DbTableCATree *tb,
DbTableCATreeNode *left,
DbTableCATreeNode *right,
DbTerm * keyTerm)
{
Eterm* top;
- Eterm key = GETKEY(common_table_data,keyTerm->tpl);
+ Eterm key = GETKEY(tb,keyTerm->tpl);
int key_size = size_object(key);
- Uint offset = offsetof(DbTableCATreeNode,u.route.key);
- size_t route_node_container_size =
- offset +
- sizeof(DbTerm) +
- sizeof(Eterm)*key_size;
ErlOffHeap tmp_offheap;
- byte* new_route_node_container_bytes =
- erts_db_alloc(ERTS_ALC_T_DB_TABLE,
- (DbTable *) common_table_data,
- route_node_container_size);
- DbTerm* newp = (DbTerm*) (new_route_node_container_bytes + offset);
- DbTableCATreeNode *new_route_node_container =
- (DbTableCATreeNode*)new_route_node_container_bytes;
- DbTableCATreeRouteNode *new_route_node =
- &new_route_node_container->u.route;
+ DbTableCATreeNode* p = erts_db_alloc(ERTS_ALC_T_DB_TABLE,
+ (DbTable *) tb,
+ sizeof_route_node(key_size));
+
if (key_size != 0) {
- newp->size = key_size;
- top = &newp->tpl[1];
+ p->u.route.key.size = key_size;
+ top = &p->u.route.key.tpl[1];
tmp_offheap.first = NULL;
- newp->tpl[0] = copy_struct(key, key_size, &top, &tmp_offheap);
- newp->first_oh = tmp_offheap.first;
+ p->u.route.key.tpl[0] = copy_struct(key, key_size, &top, &tmp_offheap);
+ p->u.route.key.first_oh = tmp_offheap.first;
} else {
- newp->size = key_size;
- newp->first_oh = NULL;
- newp->tpl[0] = key;
+ p->u.route.key.size = key_size;
+ p->u.route.key.first_oh = NULL;
+ p->u.route.key.tpl[0] = key;
}
- new_route_node_container->is_base_node = 0;
- new_route_node->is_valid = 1;
- erts_atomic_init_nob(&(new_route_node->left), (erts_aint_t)left);
- erts_atomic_init_nob(&(new_route_node->right), (erts_aint_t)right);
- erts_mtx_init(&new_route_node->lock, "erl_db_catree_route_node",
+ p->is_base_node = 0;
+ p->u.route.is_valid = 1;
+ erts_atomic_init_nob(&p->u.route.left, (erts_aint_t)left);
+ erts_atomic_init_nob(&p->u.route.right, (erts_aint_t)right);
+ erts_mtx_init(&p->u.route.lock, "erl_db_catree_route_node",
NIL, ERTS_LOCK_FLAGS_CATEGORY_DB);
- return new_route_node_container;
+ return p;
}
static void do_free_base_node(void* vptr)
@@ -895,29 +881,26 @@ static void free_catree_route_node(DbTableCATree* tb, DbTableCATreeNode* p)
/*
* Returns the parent routing node of the specified
- * route_node_container if such a routing node exists or NULL if
- * route_node_container is attached to the root
+ * route node 'child' if such a parent exists
+ * or NULL if 'child' is attached to the root.
*/
static ERTS_INLINE DbTableCATreeNode *
parent_of(DbTableCATree *tb,
- DbTableCATreeNode *route_node_container)
+ DbTableCATreeNode *child)
{
+ Eterm key = GET_ROUTE_NODE_KEY(child);
+ DbTableCATreeNode *current = GET_ROOT_ACQB(tb);
+ DbTableCATreeNode *prev = NULL;
- Eterm key = GET_ROUTE_NODE_KEY(route_node_container);
- DbTableCATreeNode *current_node = GET_ROOT_ACQB(tb);
- DbTableCATreeNode *prev_node = NULL;
- if (current_node == route_node_container) {
- return NULL;
- }
- while (current_node != route_node_container) {
- prev_node = current_node;
- if (cmp_key_route((DbTableCommon *)tb, key, current_node) < 0) {
- current_node = GET_LEFT_ACQB(current_node);
+ while (current != child) {
+ prev = current;
+ if (cmp_key_route(key, current) < 0) {
+ current = GET_LEFT_ACQB(current);
} else {
- current_node = GET_RIGHT_ACQB(current_node);
+ current = GET_RIGHT_ACQB(current);
}
}
- return prev_node;
+ return prev;
}
@@ -952,11 +935,7 @@ leftmost_route_node(DbTableCATreeNode *root)
prev_node = node;
node = GET_LEFT_ACQB(node);
}
- if (prev_node == NULL) {
- return NULL;
- } else {
- return prev_node;
- }
+ return prev_node;
}
static ERTS_INLINE DbTableCATreeNode*
@@ -968,11 +947,7 @@ rightmost_route_node(DbTableCATreeNode *root)
prev_node = node;
node = GET_RIGHT_ACQB(node);
}
- if (prev_node == NULL) {
- return NULL;
- } else {
- return prev_node;
- }
+ return prev_node;
}
static ERTS_INLINE DbTableCATreeNode*
@@ -987,8 +962,7 @@ leftmost_base_node_and_path(DbTableCATreeNode *root, CATreeNodeStack * stack)
}
static ERTS_INLINE DbTableCATreeNode*
-get_next_base_node_and_path(DbTableCommon *common_table_data,
- DbTableCATreeNode *base_node,
+get_next_base_node_and_path(DbTableCATreeNode *base_node,
CATreeNodeStack *stack)
{
if (EMPTY_NODE(stack)) { /* The parent of b is the root */
@@ -1004,7 +978,7 @@ get_next_base_node_and_path(DbTableCommon *common_table_data,
POP_NODE(stack);
while (!EMPTY_NODE(stack)) {
if (TOP_NODE(stack)->u.route.is_valid &&
- cmp_key_route(common_table_data, pkey, TOP_NODE(stack)) <= 0) {
+ cmp_key_route(pkey, TOP_NODE(stack)) <= 0) {
return leftmost_base_node_and_path(GET_RIGHT_ACQB(TOP_NODE(stack)), stack);
} else {
POP_NODE(stack);
@@ -1033,12 +1007,10 @@ lock_first_base_node(DbTable *tbl,
CATreeNodeStack *search_stack_ptr,
CATreeNodeStack *locked_base_nodes_stack_ptr)
{
- int retry;
DbTableCATreeNode *current_node;
DbTableCATreeBaseNode *base_node;
DbTableCATree* tb = &tbl->catree;
- do {
- retry = 0;
+ while (1) {
current_node = GET_ROOT_ACQB(tb);
while ( ! current_node->is_base_node ) {
PUSH_NODE(search_stack_ptr, current_node);
@@ -1046,12 +1018,11 @@ lock_first_base_node(DbTable *tbl,
}
base_node = &current_node->u.base;
rlock_base_node(base_node);
- if ( ! base_node->is_valid ) {
- /* Retry */
- runlock_base_node(base_node);
- retry = 1;
- }
- } while(retry);
+ if (base_node->is_valid)
+ break;
+ /* Retry */
+ runlock_base_node(base_node);
+ }
push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
return current_node;
}
@@ -1065,19 +1036,20 @@ find_and_lock_next_base_node_and_path(DbTable *tbl,
DbTableCATreeNode *current_node;
DbTableCATreeBaseNode *base_node;
CATreeNodeStack * tmp_stack_ptr;
- DbTableCommon* common_table_data;
- retry_find_and_lock_next_base_node:
- current_node = TOP_NODE(locked_base_nodes_stack_ptr);
- common_table_data = &tbl->common;
- clone_stack(*search_stack_ptr_ptr, *search_stack_copy_ptr_ptr);
- current_node =
- get_next_base_node_and_path(common_table_data, current_node, *search_stack_ptr_ptr);
- if (current_node == NULL) {
- return NULL;
- }
- base_node = &current_node->u.base;
- rlock_base_node(base_node);
- if ( ! base_node->is_valid ) {
+
+ while (1) {
+ current_node = TOP_NODE(locked_base_nodes_stack_ptr);
+ clone_stack(*search_stack_ptr_ptr, *search_stack_copy_ptr_ptr);
+ current_node =
+ get_next_base_node_and_path(current_node, *search_stack_ptr_ptr);
+ if (current_node == NULL) {
+ return NULL;
+ }
+ base_node = &current_node->u.base;
+ rlock_base_node(base_node);
+ if (base_node->is_valid)
+ break;
+
/* Retry */
runlock_base_node(base_node);
/* Revert to previous state */
@@ -1085,10 +1057,9 @@ find_and_lock_next_base_node_and_path(DbTable *tbl,
tmp_stack_ptr = *search_stack_ptr_ptr;
*search_stack_ptr_ptr = *search_stack_copy_ptr_ptr;
*search_stack_copy_ptr_ptr = tmp_stack_ptr;
- goto retry_find_and_lock_next_base_node;
- } else {
- push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
}
+
+ push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
return base_node;
}
@@ -1165,13 +1136,12 @@ lock_base_node_with_key(DbTable *tbl,
DbTableCATreeNode *current_node;
DbTableCATreeBaseNode *base_node;
DbTableCATree* tb = &tbl->catree;
- DbTableCommon* common_table_data = &tbl->common;
do {
retry = 0;
current_node = GET_ROOT_ACQB(tb);
while ( ! current_node->is_base_node ) {
PUSH_NODE(search_stack_ptr, current_node);
- if( cmp_key_route(common_table_data,key,current_node) < 0 ) {
+ if( cmp_key_route(key,current_node) < 0 ) {
current_node = GET_LEFT_ACQB(current_node);
} else {
current_node = GET_RIGHT_ACQB(current_node);
@@ -1195,106 +1165,93 @@ lock_base_node_with_key(DbTable *tbl,
* node to join with.
*/
static DbTableCATreeNode*
-erl_db_catree_force_join_right(DbTableCommon *common_table_data,
- DbTableCATreeNode *parent_container,
- DbTableCATreeNode *base_container,
+erl_db_catree_force_join_right(DbTableCATree *tb,
+ DbTableCATreeNode *parent,
+ DbTableCATreeNode *thiz,
DbTableCATreeNode **result_parent_wb)
{
- DbTableCATreeRouteNode *parent;
- DbTableCATreeNode *gparent_container;
- DbTableCATreeRouteNode *gparent;
- DbTableCATreeBaseNode *base = &base_container->u.base;
- DbTableCATree *tb = (DbTableCATree *)common_table_data;
- DbTableCATreeNode *neighbor_base_container;
- DbTableCATreeBaseNode *neighbor_base;
- DbTableCATreeNode *new_neighbor_base;
- DbTableCATreeNode *neighbor_base_parent;
+ DbTableCATreeNode *gparent;
+ DbTableCATreeNode *neighbor;
+ DbTableCATreeNode *new_neighbor;
+ DbTableCATreeNode *neighbor_parent;
int neighbour_not_valid;
- if (parent_container == NULL) {
+ if (parent == NULL) {
return NULL;
}
- parent = &parent_container->u.route;
do {
- neighbor_base_container = leftmost_base_node(GET_RIGHT_ACQB(parent_container));
- neighbor_base = &neighbor_base_container->u.base;
- wlock_base_node_no_stats(neighbor_base);
- neighbour_not_valid = !neighbor_base->is_valid;
+ neighbor = leftmost_base_node(GET_RIGHT_ACQB(parent));
+ wlock_base_node_no_stats(&neighbor->u.base);
+ neighbour_not_valid = !neighbor->u.base.is_valid;
if (neighbour_not_valid) {
- wunlock_base_node(neighbor_base);
+ wunlock_base_node(&neighbor->u.base);
}
} while (neighbour_not_valid);
lock_route_node(parent);
- parent->is_valid = 0;
- neighbor_base->is_valid = 0;
- base->is_valid = 0;
+ parent->u.route.is_valid = 0;
+ neighbor->u.base.is_valid = 0;
+ thiz->u.base.is_valid = 0;
gparent = NULL;
- gparent_container = NULL;
do {
if (gparent != NULL) {
unlock_route_node(gparent);
}
- gparent_container = parent_of(tb, parent_container);
- if (gparent_container != NULL) {
- gparent = &gparent_container->u.route;
+ gparent = parent_of(tb, parent);
+ if (gparent != NULL) {
lock_route_node(gparent);
- } else {
- gparent = NULL;
}
- } while (gparent != NULL && !gparent->is_valid);
+ } while (gparent != NULL && !gparent->u.route.is_valid);
if (gparent == NULL) {
- SET_ROOT_RELB(tb, GET_RIGHT(parent_container));
- } else if (GET_LEFT(gparent_container) == parent_container) {
- SET_LEFT_RELB(gparent_container, GET_RIGHT(parent_container));
+ SET_ROOT_RELB(tb, GET_RIGHT(parent));
+ } else if (GET_LEFT(gparent) == parent) {
+ SET_LEFT_RELB(gparent, GET_RIGHT(parent));
} else {
- SET_RIGHT_RELB(gparent_container, GET_RIGHT(parent_container));
+ SET_RIGHT_RELB(gparent, GET_RIGHT(parent));
}
unlock_route_node(parent);
if (gparent != NULL) {
unlock_route_node(gparent);
}
- new_neighbor_base = create_catree_base_node(tb);
- new_neighbor_base->u.base.root =
- join_trees(base->root, neighbor_base->root);
- wlock_base_node_no_stats(&(new_neighbor_base->u.base));
- neighbor_base_parent = NULL;
- if (GET_RIGHT(parent_container) == neighbor_base_container) {
- neighbor_base_parent = gparent_container;
+ new_neighbor = create_catree_base_node(tb);
+ new_neighbor->u.base.root =
+ join_trees(thiz->u.base.root, neighbor->u.base.root);
+ wlock_base_node_no_stats(&(new_neighbor->u.base));
+ if (GET_RIGHT(parent) == neighbor) {
+ neighbor_parent = gparent;
} else {
- neighbor_base_parent =
- leftmost_route_node(GET_RIGHT(parent_container));
+ neighbor_parent = leftmost_route_node(GET_RIGHT(parent));
}
- if(neighbor_base_parent == NULL) {
- SET_ROOT_RELB(tb, new_neighbor_base);
- } else if (GET_LEFT(neighbor_base_parent) == neighbor_base_container) {
- SET_LEFT_RELB(neighbor_base_parent, new_neighbor_base);
+ if(neighbor_parent == NULL) {
+ SET_ROOT_RELB(tb, new_neighbor);
+ } else if (GET_LEFT(neighbor_parent) == neighbor) {
+ SET_LEFT_RELB(neighbor_parent, new_neighbor);
} else {
- SET_RIGHT_RELB(neighbor_base_parent, new_neighbor_base);
+ SET_RIGHT_RELB(neighbor_parent, new_neighbor);
}
- wunlock_base_node(base);
- wunlock_base_node(neighbor_base);
+ wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(&neighbor->u.base);
/* Free the parent and base */
- erts_schedule_db_free(common_table_data,
+ erts_schedule_db_free(&tb->common,
do_free_route_node,
- parent_container,
- &parent->free_item,
- sizeof_route_node(parent->key.size));
- erts_schedule_db_free(common_table_data,
+ parent,
+ &parent->u.route.free_item,
+ sizeof_route_node(parent->u.route.key.size));
+ erts_schedule_db_free(&tb->common,
do_free_base_node,
- base_container,
- &base->free_item,
+ thiz,
+ &thiz->u.base.free_item,
sizeof(DbTableCATreeNode));
- erts_schedule_db_free(common_table_data,
+ erts_schedule_db_free(&tb->common,
do_free_base_node,
- neighbor_base_container,
- &neighbor_base->free_item,
+ neighbor,
+ &neighbor->u.base.free_item,
sizeof(DbTableCATreeNode));
- if (parent_container == neighbor_base_container) {
- *result_parent_wb = gparent_container;
+ if (parent == neighbor) {
+ *result_parent_wb = gparent;
} else {
- *result_parent_wb = neighbor_base_parent;
+ *result_parent_wb = neighbor_parent;
}
- return new_neighbor_base;
+ return new_neighbor;
}
/*
@@ -1303,250 +1260,224 @@ erl_db_catree_force_join_right(DbTableCommon *common_table_data,
* locked state.
*/
static DbTableCATreeNode *
-merge_to_one_locked_base_node(DbTableCommon * common_table_data)
+merge_to_one_locked_base_node(DbTableCATree* tb)
{
- DbTableCATreeNode *parent_container;
- DbTableCATreeNode *new_parent_container;
- DbTableCATree *tb = (DbTableCATree *)common_table_data;
- DbTableCATreeNode *base_container;
- DbTableCATreeNode *new_base_container;
+ DbTableCATreeNode *parent;
+ DbTableCATreeNode *new_parent;
+ DbTableCATreeNode *base;
+ DbTableCATreeNode *new_base;
int is_not_valid;
/* Find first base node */
do {
- parent_container = NULL;
- base_container = GET_ROOT_ACQB(tb);
- while ( ! base_container->is_base_node ) {
- parent_container = base_container;
- base_container = GET_LEFT_ACQB(base_container);
+ parent = NULL;
+ base = GET_ROOT_ACQB(tb);
+ while ( ! base->is_base_node ) {
+ parent = base;
+ base = GET_LEFT_ACQB(base);
}
- wlock_base_node_no_stats(&(base_container->u.base));
- is_not_valid = ! base_container->u.base.is_valid;
+ wlock_base_node_no_stats(&(base->u.base));
+ is_not_valid = ! base->u.base.is_valid;
if (is_not_valid) {
- wunlock_base_node(&(base_container->u.base));
+ wunlock_base_node(&(base->u.base));
}
} while(is_not_valid);
do {
- new_base_container = erl_db_catree_force_join_right(common_table_data,
- parent_container,
- base_container,
- &new_parent_container);
- if (new_base_container != NULL) {
- base_container = new_base_container;
- parent_container = new_parent_container;
+ new_base = erl_db_catree_force_join_right(tb,
+ parent,
+ base,
+ &new_parent);
+ if (new_base != NULL) {
+ base = new_base;
+ parent = new_parent;
}
- } while(new_base_container != NULL);
- return base_container;
+ } while(new_base != NULL);
+ return base;
}
static void join_catree(DbTableCATree *tb,
- DbTableCATreeNode *parent_container,
- DbTableCATreeNode *base_container)
-{
- DbTableCATreeRouteNode *parent;
- DbTableCATreeNode *gparent_container;
- DbTableCATreeRouteNode *gparent;
- DbTableCATreeBaseNode *base = &base_container->u.base;
- DbTableCATreeNode *neighbor_base_container;
- DbTableCATreeBaseNode *neighbor_base;
- DbTableCATreeNode *new_neighbor_base;
- DbTableCATreeNode *neighbor_base_parent;
- if (parent_container == NULL) {
- base->lock_statistics = 0;
- wunlock_base_node(base);
+ DbTableCATreeNode *parent,
+ DbTableCATreeNode *thiz)
+{
+ DbTableCATreeNode *gparent;
+ DbTableCATreeNode *neighbor;
+ DbTableCATreeNode *new_neighbor;
+ DbTableCATreeNode *neighbor_parent;
+
+ ASSERT(thiz->is_base_node);
+ if (parent == NULL) {
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(&thiz->u.base);
return;
}
- parent = &parent_container->u.route;
- if (GET_LEFT(parent_container) == base_container) {
- neighbor_base_container = leftmost_base_node(GET_RIGHT_ACQB(parent_container));
- neighbor_base = &neighbor_base_container->u.base;
- if (try_wlock_base_node(neighbor_base)) {
+ ASSERT(!parent->is_base_node);
+ if (GET_LEFT(parent) == thiz) {
+ neighbor = leftmost_base_node(GET_RIGHT_ACQB(parent));
+ if (try_wlock_base_node(&neighbor->u.base)) {
/* Failed to acquire lock */
- base->lock_statistics = 0;
- wunlock_base_node(base);
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(&thiz->u.base);
return;
- } else if (!neighbor_base->is_valid) {
- base->lock_statistics = 0;
- wunlock_base_node(base);
- wunlock_base_node(neighbor_base);
+ } else if (!neighbor->u.base.is_valid) {
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(&neighbor->u.base);
return;
} else {
lock_route_node(parent);
- parent->is_valid = 0;
- neighbor_base->is_valid = 0;
- base->is_valid = 0;
+ parent->u.route.is_valid = 0;
+ neighbor->u.base.is_valid = 0;
+ thiz->u.base.is_valid = 0;
gparent = NULL;
- gparent_container = NULL;
do {
if (gparent != NULL) {
unlock_route_node(gparent);
}
- gparent_container = parent_of(tb, parent_container);
- if (gparent_container != NULL) {
- gparent = &gparent_container->u.route;
+ gparent = parent_of(tb, parent);
+ if (gparent != NULL)
lock_route_node(gparent);
- } else {
- gparent = NULL;
- }
- } while (gparent != NULL && !gparent->is_valid);
+ } while (gparent != NULL && !gparent->u.route.is_valid);
+
if (gparent == NULL) {
- SET_ROOT_RELB(tb, GET_RIGHT(parent_container));
- } else if (GET_LEFT(gparent_container) == parent_container) {
- SET_LEFT_RELB(gparent_container, GET_RIGHT(parent_container));
+ SET_ROOT_RELB(tb, GET_RIGHT(parent));
+ } else if (GET_LEFT(gparent) == parent) {
+ SET_LEFT_RELB(gparent, GET_RIGHT(parent));
} else {
- SET_RIGHT_RELB(gparent_container, GET_RIGHT(parent_container));
+ SET_RIGHT_RELB(gparent, GET_RIGHT(parent));
}
unlock_route_node(parent);
if (gparent != NULL) {
unlock_route_node(gparent);
}
- new_neighbor_base = create_catree_base_node(tb);
- new_neighbor_base->u.base.root =
- join_trees(base->root, neighbor_base->root);
- neighbor_base_parent = NULL;
- if (GET_RIGHT(parent_container) == neighbor_base_container) {
- neighbor_base_parent = gparent_container;
+ new_neighbor = create_catree_base_node(tb);
+ new_neighbor->u.base.root = join_trees(thiz->u.base.root,
+ neighbor->u.base.root);
+ neighbor_parent = NULL;
+ if (GET_RIGHT(parent) == neighbor) {
+ neighbor_parent = gparent;
} else {
- neighbor_base_parent =
- leftmost_route_node(GET_RIGHT(parent_container));
+ neighbor_parent = leftmost_route_node(GET_RIGHT(parent));
}
}
} else { /* Symetric case */
- neighbor_base_container = rightmost_base_node(GET_LEFT_ACQB(parent_container));
- neighbor_base = &neighbor_base_container->u.base;
- if (try_wlock_base_node(neighbor_base)) {
+ ASSERT(GET_RIGHT(parent) == thiz);
+ neighbor = rightmost_base_node(GET_LEFT_ACQB(parent));
+ if (try_wlock_base_node(&neighbor->u.base)) {
/* Failed to acquire lock */
- base->lock_statistics = 0;
- wunlock_base_node(base);
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(&thiz->u.base);
return;
- } else if (!neighbor_base->is_valid) {
- base->lock_statistics = 0;
- wunlock_base_node(base);
- wunlock_base_node(neighbor_base);
+ } else if (!neighbor->u.base.is_valid) {
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(&neighbor->u.base);
return;
} else {
lock_route_node(parent);
- parent->is_valid = 0;
- neighbor_base->is_valid = 0;
- base->is_valid = 0;
+ parent->u.route.is_valid = 0;
+ neighbor->u.base.is_valid = 0;
+ thiz->u.base.is_valid = 0;
gparent = NULL;
- gparent_container = NULL;
do {
if (gparent != NULL) {
unlock_route_node(gparent);
}
- gparent_container = parent_of(tb, parent_container);
- if (gparent_container != NULL) {
- gparent = &gparent_container->u.route;
+ gparent = parent_of(tb, parent);
+ if (gparent != NULL) {
lock_route_node(gparent);
} else {
gparent = NULL;
}
- } while (gparent != NULL && !gparent->is_valid);
+ } while (gparent != NULL && !gparent->u.route.is_valid);
if (gparent == NULL) {
- SET_ROOT_RELB(tb, GET_LEFT(parent_container));
- } else if (GET_RIGHT(gparent_container) == parent_container) {
- SET_RIGHT_RELB(gparent_container, GET_LEFT(parent_container));
+ SET_ROOT_RELB(tb, GET_LEFT(parent));
+ } else if (GET_RIGHT(gparent) == parent) {
+ SET_RIGHT_RELB(gparent, GET_LEFT(parent));
} else {
- SET_LEFT_RELB(gparent_container, GET_LEFT(parent_container));
+ SET_LEFT_RELB(gparent, GET_LEFT(parent));
}
unlock_route_node(parent);
if (gparent != NULL) {
unlock_route_node(gparent);
}
- new_neighbor_base = create_catree_base_node(tb);
- new_neighbor_base->u.base.root =
- join_trees(neighbor_base->root, base->root);
- neighbor_base_parent = NULL;
- if (GET_LEFT(parent_container) == neighbor_base_container) {
- neighbor_base_parent = gparent_container;
+ new_neighbor = create_catree_base_node(tb);
+ new_neighbor->u.base.root = join_trees(neighbor->u.base.root,
+ thiz->u.base.root);
+ neighbor_parent = NULL;
+ if (GET_LEFT(parent) == neighbor) {
+ neighbor_parent = gparent;
} else {
- neighbor_base_parent =
- rightmost_route_node(GET_LEFT(parent_container));
+ neighbor_parent =
+ rightmost_route_node(GET_LEFT(parent));
}
}
}
/* Link in new neighbor and free nodes that are no longer in the tree */
- if (neighbor_base_parent == NULL) {
- SET_ROOT_RELB(tb, new_neighbor_base);
- } else if (GET_LEFT(neighbor_base_parent) == neighbor_base_container) {
- SET_LEFT_RELB(neighbor_base_parent, new_neighbor_base);
+ if (neighbor_parent == NULL) {
+ SET_ROOT_RELB(tb, new_neighbor);
+ } else if (GET_LEFT(neighbor_parent) == neighbor) {
+ SET_LEFT_RELB(neighbor_parent, new_neighbor);
} else {
- SET_RIGHT_RELB(neighbor_base_parent, new_neighbor_base);
+ SET_RIGHT_RELB(neighbor_parent, new_neighbor);
}
- wunlock_base_node(base);
- wunlock_base_node(neighbor_base);
+ wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(&neighbor->u.base);
/* Free the parent and base */
erts_schedule_db_free(&tb->common,
do_free_route_node,
- parent_container,
- &parent->free_item,
- sizeof_route_node(parent->key.size));
+ parent,
+ &parent->u.route.free_item,
+ sizeof_route_node(parent->u.route.key.size));
erts_schedule_db_free(&tb->common,
do_free_base_node,
- base_container,
- &base->free_item,
+ thiz,
+ &thiz->u.base.free_item,
sizeof(DbTableCATreeNode));
erts_schedule_db_free(&tb->common,
do_free_base_node,
- neighbor_base_container,
- &neighbor_base->free_item,
+ neighbor,
+ &neighbor->u.base.free_item,
sizeof(DbTableCATreeNode));
}
-
-static void split_catree(DbTableCommon *tb,
- DbTableCATreeNode *parent_container,
- DbTableCATreeNode *base_container) {
+static void split_catree(DbTableCATree *tb,
+ DbTableCATreeNode *parent,
+ DbTableCATreeNode *base) {
TreeDbTerm *splitOutWriteBack;
- TreeDbTerm *leftWriteBack;
- TreeDbTerm *rightWriteBack;
- DbTableCATreeNode *left_base_node;
- DbTableCATreeNode *right_base_node;
- DbTableCATreeNode *routing_node_container;
- DbTableCATreeBaseNode *base = &base_container->u.base;
- DbTableCATreeRouteNode *parent;
- if (parent_container == NULL) {
- parent = NULL;
- } else {
- parent = &parent_container->u.route;
- }
+ DbTableCATreeNode *new_left;
+ DbTableCATreeNode *new_right;
+ DbTableCATreeNode *new_route;
- if (less_than_two_elements(base->root)) {
- base->lock_statistics = 0;
- wunlock_base_node(base);
+ if (less_than_two_elements(base->u.base.root)) {
+ base->u.base.lock_statistics = 0;
+ wunlock_base_node(&base->u.base);
return;
} else {
- /* Split the tree */
+ new_left = create_catree_base_node(tb);
+ new_right = create_catree_base_node(tb);
+
split_tree(tb,
- base->root,
+ base->u.base.root,
&splitOutWriteBack,
- &leftWriteBack,
- &rightWriteBack);
- /* Create new base nodes */
- left_base_node =
- create_catree_base_node((DbTableCATree*)tb);
- right_base_node =
- create_catree_base_node((DbTableCATree*)tb);
- left_base_node->u.base.root = leftWriteBack;
- right_base_node->u.base.root = rightWriteBack;
- routing_node_container = create_catree_route_node(tb,
- left_base_node,
- right_base_node,
- &splitOutWriteBack->dbterm);
+ &new_left->u.base.root,
+ &new_right->u.base.root);
+ new_route = create_catree_route_node(tb,
+ new_left,
+ new_right,
+ &splitOutWriteBack->dbterm);
if (parent == NULL) {
- SET_ROOT_RELB((DbTableCATree*)tb, routing_node_container);
- } else if(GET_LEFT(parent_container) == base_container) {
- SET_LEFT_RELB(parent_container, routing_node_container);
+ SET_ROOT_RELB(tb, new_route);
+ } else if(GET_LEFT(parent) == base) {
+ SET_LEFT_RELB(parent, new_route);
} else {
- SET_RIGHT_RELB(parent_container, routing_node_container);
+ SET_RIGHT_RELB(parent, new_route);
}
- base->is_valid = 0;
- wunlock_base_node(base);
- erts_schedule_db_free(tb,
+ base->u.base.is_valid = 0;
+ wunlock_base_node(&base->u.base);
+ erts_schedule_db_free(&tb->common,
do_free_base_node,
- base_container,
- &base->free_item,
+ base,
+ &base->u.base.free_item,
sizeof(DbTableCATreeNode));
}
}
@@ -1729,8 +1660,7 @@ static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
static int db_last_catree(Process *p, DbTable *tbl, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common);
+ DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree);
int result = db_last_tree_common(p, tbl, base->u.base.root, ret, NULL);
wunlock_base_node(&(base->u.base));
return result;
@@ -1741,11 +1671,10 @@ static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tb->common);
+ base = merge_to_one_locked_base_node(&tbl->catree);
result = db_prev_tree_common(p, tbl, base->u.base.root, key, ret, &stack);
wunlock_base_node(&(base->u.base));
return result;
@@ -1848,10 +1777,9 @@ static int db_erase_object_catree(DbTable *tbl, Eterm object, Eterm *ret)
static int db_slot_catree(Process *p, DbTable *tbl,
Eterm slot_term, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
+ base = merge_to_one_locked_base_node(&tbl->catree);
result = db_slot_tree_common(p, tbl, base->u.base.root,
slot_term, ret, NULL);
wunlock_base_node(&(base->u.base));
@@ -1863,11 +1791,10 @@ static int db_select_continue_catree(Process *p,
Eterm continuation,
Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_continue_tree_common(p, &tb->common, &base->u.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_continue_tree_common(p, &tbl->common, &base->u.base.root,
continuation, ret, NULL);
wunlock_base_node(&(base->u.base));
return result;
@@ -1877,11 +1804,10 @@ static int db_select_continue_catree(Process *p,
static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, int reverse, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_tree_common(p, &tb->common, &base->u.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_tree_common(p, &tbl->common, &base->u.base.root,
tid, pattern, reverse, ret,
NULL);
wunlock_base_node(&(base->u.base));
@@ -1893,11 +1819,10 @@ static int db_select_count_continue_catree(Process *p,
Eterm continuation,
Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_count_continue_tree_common(p, &tb->common,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_count_continue_tree_common(p, &tbl->common,
&base->u.base.root,
continuation, ret, NULL);
wunlock_base_node(&(base->u.base));
@@ -1907,11 +1832,10 @@ static int db_select_count_continue_catree(Process *p,
static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_count_tree_common(p, &tb->common, &base->u.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_count_tree_common(p, &tbl->common, &base->u.base.root,
tid, pattern, ret, NULL);
wunlock_base_node(&(base->u.base));
return result;
@@ -1921,11 +1845,10 @@ static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Sint chunk_size,
int reversed, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_chunk_tree_common(p, &tb->common, &base->u.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_chunk_tree_common(p, &tbl->common, &base->u.base.root,
tid, pattern, chunk_size, reversed, ret, NULL);
wunlock_base_node(&(base->u.base));
return result;
@@ -1936,13 +1859,12 @@ static int db_select_delete_continue_catree(Process *p,
Eterm continuation,
Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
int result;
DbTableCATreeNode *base;
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tb->common);
+ base = merge_to_one_locked_base_node(&tbl->catree);
result = db_select_delete_continue_tree_common(p, tbl, &base->u.base.root,
continuation, ret, &stack);
wunlock_base_node(&(base->u.base));
@@ -1952,13 +1874,12 @@ static int db_select_delete_continue_catree(Process *p,
static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
int result;
DbTableCATreeNode *base;
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tb->common);
+ base = merge_to_one_locked_base_node(&tbl->catree);
result = db_select_delete_tree_common(p, tbl, &base->u.base.root,
tid, pattern, ret, &stack);
wunlock_base_node(&(base->u.base));
@@ -1968,11 +1889,10 @@ static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_replace_tree_common(p, &tb->common,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_replace_tree_common(p, &tbl->common,
&base->u.base.root,
tid, pattern, ret, NULL);
wunlock_base_node(&(base->u.base));
@@ -1982,11 +1902,10 @@ static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_replace_continue_tree_common(p, &tb->common,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_replace_continue_tree_common(p, &tbl->common,
&base->u.base.root,
continuation, ret, NULL);
wunlock_base_node(&(base->u.base));
@@ -2015,8 +1934,7 @@ static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
static void db_print_catree(fmtfn_t to, void *to_arg,
int show, DbTable *tbl)
{
- DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common);
+ DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree);
db_print_tree_common(to, to_arg, show, base->u.base.root, tbl);
wunlock_base_node(&(base->u.base));
}
@@ -2094,8 +2012,7 @@ static void db_foreach_offheap_catree(DbTable *tbl,
void (*func)(ErlOffHeap *, void *),
void *arg)
{
- DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common);
+ DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree);
db_foreach_offheap_tree_common(base->u.base.root, func, arg);
wunlock_base_node(&(base->u.base));
}