aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_db_catree.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_db_catree.c')
-rw-r--r--erts/emulator/beam/erl_db_catree.c977
1 files changed, 488 insertions, 489 deletions
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c
index 37a299df35..a8e48bce1b 100644
--- a/erts/emulator/beam/erl_db_catree.c
+++ b/erts/emulator/beam/erl_db_catree.c
@@ -210,33 +210,32 @@ DbTableMethod db_catree =
* Internal CA tree related helper functions and macros
*/
-#define GET_ROUTE_NODE_KEY(node) (node->baseOrRoute.route.key.tpl[0])
-#define GET_BASE_NODE_LOCK(node) (&(node->baseOrRoute.base.lock))
-#define GET_ROUTE_NODE_LOCK(node) (&(node->baseOrRoute.route.lock))
+#define GET_ROUTE_NODE_KEY(node) (node->u.route.key.term)
+#define GET_BASE_NODE_LOCK(node) (&(node->u.base.lock))
+#define GET_ROUTE_NODE_LOCK(node) (&(node->u.route.lock))
/* Helpers for reading and writing shared atomic variables */
/* No memory barrier */
#define GET_ROOT(tb) ((DbTableCATreeNode*)erts_atomic_read_nob(&(tb->root)))
-#define GET_LEFT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->baseOrRoute.route.left)))
-#define GET_RIGHT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->baseOrRoute.route.right)))
+#define GET_LEFT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->u.route.left)))
+#define GET_RIGHT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->u.route.right)))
#define SET_ROOT(tb, v) erts_atomic_set_nob(&((tb)->root), (erts_aint_t)(v))
-#define SET_LEFT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->baseOrRoute.route.left), (erts_aint_t)(v));
-#define SET_RIGHT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->baseOrRoute.route.right), (erts_aint_t)(v));
+#define SET_LEFT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->u.route.left), (erts_aint_t)(v));
+#define SET_RIGHT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v));
/* Release or acquire barriers */
#define GET_ROOT_ACQB(tb) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(tb->root)))
-#define GET_LEFT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->baseOrRoute.route.left)))
-#define GET_RIGHT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->baseOrRoute.route.right)))
+#define GET_LEFT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->u.route.left)))
+#define GET_RIGHT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->u.route.right)))
#define SET_ROOT_RELB(tb, v) erts_atomic_set_relb(&((tb)->root), (erts_aint_t)(v))
-#define SET_LEFT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->baseOrRoute.route.left), (erts_aint_t)(v));
-#define SET_RIGHT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->baseOrRoute.route.right), (erts_aint_t)(v));
+#define SET_LEFT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.left), (erts_aint_t)(v));
+#define SET_RIGHT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v));
/* Compares a key to the key in a route node */
-static ERTS_INLINE Sint cmp_key_route(DbTableCommon * tb,
- Eterm key,
+static ERTS_INLINE Sint cmp_key_route(Eterm key,
DbTableCATreeNode *obj)
{
return CMP(key, GET_ROUTE_NODE_KEY(obj));
@@ -279,7 +278,7 @@ int less_than_two_elements(TreeDbTerm *root)
* Inserts a TreeDbTerm into a tree. Returns the new root.
*/
static ERTS_INLINE
-TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data,
+TreeDbTerm* insert_TreeDbTerm(DbTableCATree *tb,
TreeDbTerm *insert_to_root,
TreeDbTerm *value_to_insert) {
/* Non recursive insertion in AVL tree, building our own stack */
@@ -295,7 +294,7 @@ TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data,
int dir;
TreeDbTerm *p1, *p2, *p;
- key = GETKEY(common_table_data, value_to_insert->dbterm.tpl);
+ key = GETKEY(tb, value_to_insert->dbterm.tpl);
dstack[dpos++] = DIR_END;
for (;;)
@@ -305,7 +304,7 @@ TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data,
(*this)->balance = 0;
(*this)->left = (*this)->right = NULL;
break;
- } else if ((c = cmp_key(common_table_data, key, *this)) < 0) {
+ } else if ((c = cmp_key(&tb->common, key, *this)) < 0) {
/* go lefts */
dstack[dpos++] = DIR_LEFT;
tstack[tpos++] = this;
@@ -392,7 +391,7 @@ TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data,
* left_wb and the tree containing the rest of the keys in the write
* back parameter right_wb.
*/
-static void split_tree(DbTableCommon *tb,
+static void split_tree(DbTableCATree *tb,
TreeDbTerm *root,
TreeDbTerm **split_key_node_wb,
TreeDbTerm **left_wb,
@@ -413,9 +412,7 @@ static void split_tree(DbTableCommon *tb,
split_node->left = NULL;
right_root = split_node->right;
split_node->right = NULL;
- right_root = insert_TreeDbTerm(tb,
- right_root,
- split_node);
+ right_root = insert_TreeDbTerm(tb, right_root, split_node);
*split_key_node_wb = split_node;
*left_wb = left_root;
*right_wb = right_root;
@@ -700,15 +697,17 @@ void runlock_base_node(DbTableCATreeBaseNode *base_node)
}
static ERTS_INLINE
-void lock_route_node(DbTableCATreeRouteNode *route_node)
+void lock_route_node(DbTableCATreeNode *route_node)
{
- erts_mtx_lock(&route_node->lock);
+ ASSERT(!route_node->is_base_node);
+ erts_mtx_lock(&route_node->u.route.lock);
}
static ERTS_INLINE
-void unlock_route_node(DbTableCATreeRouteNode *route_node)
+void unlock_route_node(DbTableCATreeNode *route_node)
{
- erts_mtx_unlock(&route_node->lock);
+ ASSERT(!route_node->is_base_node);
+ erts_mtx_unlock(&route_node->u.route.lock);
}
@@ -722,7 +721,6 @@ void unlock_route_node(DbTableCATreeRouteNode *route_node)
int retry; \
DbTableCATreeNode *current_node; \
DbTableCATreeNode *prev_node; \
- DbTableCommon* common_table_data = &tb->common; \
DbTableCATreeBaseNode *base_node; \
int current_level; \
(void)prev_node; \
@@ -734,13 +732,13 @@ void unlock_route_node(DbTableCATreeRouteNode *route_node)
while ( ! current_node->is_base_node ) { \
current_level = current_level + 1; \
prev_node = current_node; \
- if (cmp_key_route(common_table_data,key,current_node) < 0) { \
+ if (cmp_key_route(key,current_node) < 0) { \
current_node = GET_LEFT_ACQB(current_node); \
} else { \
current_node = GET_RIGHT_ACQB(current_node); \
} \
} \
- base_node = &current_node->baseOrRoute.base; \
+ base_node = &current_node->u.base; \
LOCK(base_node); \
if ( ! base_node->is_valid ) { \
/* Retry */ \
@@ -753,7 +751,7 @@ void unlock_route_node(DbTableCATreeRouteNode *route_node)
#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \
if (base_node->lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT \
&& current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) { \
- split_catree(&tb->common, prev_node, current_node); \
+ split_catree(tb, prev_node, current_node); \
} else if (base_node->lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) { \
join_catree(tb, prev_node, current_node); \
} else { \
@@ -784,141 +782,189 @@ void unlock_route_node(DbTableCATreeRouteNode *route_node)
}
+static ERTS_INLINE
+void copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size)
+{
+ dst->size = key_size;
+ if (key_size != 0) {
+ Eterm* hp = &dst->heap[0];
+ ErlOffHeap tmp_offheap;
+ tmp_offheap.first = NULL;
+ dst->term = copy_struct(key, key_size, &hp, &tmp_offheap);
+ dst->oh = tmp_offheap.first;
+ }
+ else {
+ ASSERT(is_immed(key));
+ dst->term = key;
+ dst->oh = NULL;
+ }
+}
-static DbTableCATreeNode *create_catree_base_node(DbTableCATree *tb)
+static ERTS_INLINE
+void destroy_route_key(DbRouteKey* key)
{
- DbTableCATreeNode *new_base_node_container =
- erts_db_alloc(ERTS_ALC_T_DB_TABLE,
- (DbTable *) tb,
- sizeof(DbTableCATreeNode));
- DbTableCATreeBaseNode *new_base_node =
- &new_base_node_container->baseOrRoute.base;
+ if (key->oh) {
+ ErlOffHeap oh;
+ oh.first = key->oh;
+ erts_cleanup_offheap(&oh);
+ }
+}
+
+
+#ifdef ERTS_ENABLE_LOCK_CHECK
+# define sizeof_base_node(KEY_SZ) \
+ (offsetof(DbTableCATreeNode, u.base.lc_key.heap) \
+ + (KEY_SZ)*sizeof(Eterm))
+# define LC_ORDER(ORDER) ORDER
+#else
+# define sizeof_base_node(KEY_SZ) \
+ offsetof(DbTableCATreeNode, u.base.end_of_struct__)
+# define LC_ORDER(ORDER) NIL
+#endif
+
+static DbTableCATreeNode *create_base_node(DbTableCATree *tb,
+ TreeDbTerm* root,
+ Eterm lc_key)
+{
+ DbTableCATreeNode *p;
erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
- new_base_node_container->is_base_node = 1;
- new_base_node->root = NULL;
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ Eterm lc_key_size = size_object(lc_key);
+#endif
+ p = erts_db_alloc(ERTS_ALC_T_DB_TABLE, (DbTable *) tb,
+ sizeof_base_node(lc_key_size));
+
+ p->is_base_node = 1;
+ p->u.base.root = root;
if (tb->common.type & DB_FREQ_READ)
rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
if (erts_ets_rwmtx_spin_count >= 0)
rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
- erts_rwmtx_init_opt(&new_base_node->lock, &rwmtx_opt,
- "erl_db_catree_base_node", tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB);
- new_base_node->lock_statistics = 0;
- new_base_node->is_valid = 1;
- new_base_node->tab = (DbTable *) tb;
- return new_base_node_container;
+
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ copy_route_key(&p->u.base.lc_key, lc_key, lc_key_size);
+#endif
+ erts_rwmtx_init_opt(&p->u.base.lock, &rwmtx_opt,
+ "erl_db_catree_base_node",
+ lc_key,
+ ERTS_LOCK_FLAGS_CATEGORY_DB);
+ p->u.base.lock_statistics = 0;
+ p->u.base.is_valid = 1;
+ return p;
+}
+
+static ERTS_INLINE
+DbTableCATreeNode *create_wlocked_base_node(DbTableCATree *tb,
+ TreeDbTerm* root,
+ Eterm lc_key)
+{
+ DbTableCATreeNode* p = create_base_node(tb, root, lc_key);
+ ethr_rwmutex_rwlock(&p->u.base.lock.rwmtx);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ erts_lc_trylock_flg(-1, &p->u.base.lock.lc, ERTS_LOCK_OPTIONS_RDWR);
+#endif
+ return p;
+}
+
+
+static ERTS_INLINE Uint sizeof_route_node(Uint key_size)
+{
+ return (offsetof(DbTableCATreeNode, u.route.key.heap)
+ + key_size*sizeof(Eterm));
}
static DbTableCATreeNode*
-create_catree_route_node(DbTableCommon * common_table_data,
- DbTableCATreeNode *left,
- DbTableCATreeNode *right,
- DbTerm * keyTerm)
+create_route_node(DbTableCATree *tb,
+ DbTableCATreeNode *left,
+ DbTableCATreeNode *right,
+ DbTerm * keyTerm,
+ DbTableCATreeNode* lc_parent)
{
- Eterm* top;
- Eterm key = GETKEY(common_table_data,keyTerm->tpl);
+ Eterm key = GETKEY(tb,keyTerm->tpl);
int key_size = size_object(key);
- Uint offset = offsetof(DbTableCATreeNode,baseOrRoute) +
- offsetof(DbTableCATreeRouteNode,key);
- size_t route_node_container_size =
- offset +
- sizeof(DbTerm) +
- sizeof(Eterm)*key_size;
- ErlOffHeap tmp_offheap;
- byte* new_route_node_container_bytes =
- erts_db_alloc(ERTS_ALC_T_DB_TABLE,
- (DbTable *) common_table_data,
- route_node_container_size);
- DbTerm* newp = (DbTerm*) (new_route_node_container_bytes + offset);
- DbTableCATreeNode *new_route_node_container =
- (DbTableCATreeNode*)new_route_node_container_bytes;
- DbTableCATreeRouteNode *new_route_node =
- &new_route_node_container->baseOrRoute.route;
- new_route_node->tab = (DbTable *)common_table_data;
- if (key_size != 0) {
- newp->size = key_size;
- top = &newp->tpl[1];
- tmp_offheap.first = NULL;
- newp->tpl[0] = copy_struct(key, key_size, &top, &tmp_offheap);
- newp->first_oh = tmp_offheap.first;
- } else {
- newp->size = key_size;
- newp->first_oh = NULL;
- newp->tpl[0] = key;
- }
- new_route_node_container->is_base_node = 0;
- new_route_node->is_valid = 1;
- erts_atomic_init_nob(&(new_route_node->left), (erts_aint_t)left);
- erts_atomic_init_nob(&(new_route_node->right), (erts_aint_t)right);
- erts_mtx_init(&new_route_node->lock, "erl_db_catree_route_node",
- NIL, ERTS_LOCK_FLAGS_CATEGORY_DB);
- return new_route_node_container;
+ DbTableCATreeNode* p = erts_db_alloc(ERTS_ALC_T_DB_TABLE,
+ (DbTable *) tb,
+ sizeof_route_node(key_size));
+
+ copy_route_key(&p->u.route.key, key, key_size);
+ p->is_base_node = 0;
+ p->u.route.is_valid = 1;
+ erts_atomic_init_nob(&p->u.route.left, (erts_aint_t)left);
+ erts_atomic_init_nob(&p->u.route.right, (erts_aint_t)right);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ /* Route node lock order is inverse tree depth (from leafs toward root) */
+ p->u.route.lc_order = (lc_parent == NULL ? MAX_SMALL :
+ lc_parent->u.route.lc_order - 1);
+ /*
+ * This assert may eventually fail as we don't increase 'lc_order' in join
+ * operations when route nodes move up in the tree.
+ * Tough luck if you run a lock-checking VM for such a long time on 32-bit.
+ */
+ ERTS_LC_ASSERT(p->u.route.lc_order >= 0);
+#endif
+ erts_mtx_init(&p->u.route.lock, "erl_db_catree_route_node",
+ LC_ORDER(make_small(p->u.route.lc_order)),
+ ERTS_LOCK_FLAGS_CATEGORY_DB);
+ return p;
}
-static void free_catree_base_node(void* base_node_container_ptr)
+static void do_free_base_node(void* vptr)
{
- DbTableCATreeNode *base_node_container =
- (DbTableCATreeNode *)base_node_container_ptr;
- DbTableCATreeBaseNode *base_node =
- &base_node_container->baseOrRoute.base;
- erts_rwmtx_destroy(&base_node->lock);
- erts_db_free(ERTS_ALC_T_DB_TABLE,
- base_node->tab, base_node_container,
- sizeof(DbTableCATreeNode));
-}
-
-static void free_catree_routing_node(void *route_node_container_ptr)
-{
- size_t route_node_container_size;
- byte* route_node_container_bytes = route_node_container_ptr;
- DbTableCATreeNode *route_node_container =
- (DbTableCATreeNode *)route_node_container_bytes;
- DbTableCATreeRouteNode *route_node =
- &route_node_container->baseOrRoute.route;
- int key_size = route_node->key.size;
- Uint offset = offsetof(DbTableCATreeNode,baseOrRoute) +
- offsetof(DbTableCATreeRouteNode,key);
- ErlOffHeap tmp_oh;
- DbTerm* db_term = (DbTerm*) (route_node_container_bytes + offset);
- erts_mtx_destroy(&route_node->lock);
- route_node_container_size =
- offset +
- sizeof(DbTerm) +
- sizeof(Eterm)*key_size;
- if (key_size != 0) {
- tmp_oh.first = db_term->first_oh;
- erts_cleanup_offheap(&tmp_oh);
- }
- erts_db_free(ERTS_ALC_T_DB_TABLE,
- route_node->tab,
- route_node_container,
- route_node_container_size);
+ DbTableCATreeNode *p = (DbTableCATreeNode *)vptr;
+ ASSERT(p->is_base_node);
+ erts_rwmtx_destroy(&p->u.base.lock);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ destroy_route_key(&p->u.base.lc_key);
+#endif
+ erts_free(ERTS_ALC_T_DB_TABLE, p);
+}
+
+static void free_catree_base_node(DbTableCATree* tb, DbTableCATreeNode* p)
+{
+ ASSERT(p->is_base_node);
+ ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_base_node(p->u.base.lc_key.size), 0);
+ do_free_base_node(p);
+}
+
+static void do_free_route_node(void *vptr)
+{
+ DbTableCATreeNode *p = (DbTableCATreeNode *)vptr;
+ ASSERT(!p->is_base_node);
+ erts_mtx_destroy(&p->u.route.lock);
+ destroy_route_key(&p->u.route.key);
+ erts_free(ERTS_ALC_T_DB_TABLE, p);
}
+static void free_catree_route_node(DbTableCATree* tb, DbTableCATreeNode* p)
+{
+ ASSERT(!p->is_base_node);
+ ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_route_node(p->u.route.key.size), 0);
+ do_free_route_node(p);
+}
+
+
/*
* Returns the parent routing node of the specified
- * route_node_container if such a routing node exists or NULL if
- * route_node_container is attached to the root
+ * route node 'child' if such a parent exists
+ * or NULL if 'child' is attached to the root.
*/
static ERTS_INLINE DbTableCATreeNode *
parent_of(DbTableCATree *tb,
- DbTableCATreeNode *route_node_container)
+ DbTableCATreeNode *child)
{
+ Eterm key = GET_ROUTE_NODE_KEY(child);
+ DbTableCATreeNode *current = GET_ROOT_ACQB(tb);
+ DbTableCATreeNode *prev = NULL;
- Eterm key = GET_ROUTE_NODE_KEY(route_node_container);
- DbTableCATreeNode *current_node = GET_ROOT_ACQB(tb);
- DbTableCATreeNode *prev_node = NULL;
- if (current_node == route_node_container) {
- return NULL;
- }
- while (current_node != route_node_container) {
- prev_node = current_node;
- if (cmp_key_route((DbTableCommon *)tb, key, current_node) < 0) {
- current_node = GET_LEFT_ACQB(current_node);
+ while (current != child) {
+ prev = current;
+ if (cmp_key_route(key, current) < 0) {
+ current = GET_LEFT_ACQB(current);
} else {
- current_node = GET_RIGHT_ACQB(current_node);
+ current = GET_RIGHT_ACQB(current);
}
}
- return prev_node;
+ return prev;
}
@@ -953,11 +999,7 @@ leftmost_route_node(DbTableCATreeNode *root)
prev_node = node;
node = GET_LEFT_ACQB(node);
}
- if (prev_node == NULL) {
- return NULL;
- } else {
- return prev_node;
- }
+ return prev_node;
}
static ERTS_INLINE DbTableCATreeNode*
@@ -969,11 +1011,7 @@ rightmost_route_node(DbTableCATreeNode *root)
prev_node = node;
node = GET_RIGHT_ACQB(node);
}
- if (prev_node == NULL) {
- return NULL;
- } else {
- return prev_node;
- }
+ return prev_node;
}
static ERTS_INLINE DbTableCATreeNode*
@@ -988,8 +1026,7 @@ leftmost_base_node_and_path(DbTableCATreeNode *root, CATreeNodeStack * stack)
}
static ERTS_INLINE DbTableCATreeNode*
-get_next_base_node_and_path(DbTableCommon *common_table_data,
- DbTableCATreeNode *base_node,
+get_next_base_node_and_path(DbTableCATreeNode *base_node,
CATreeNodeStack *stack)
{
if (EMPTY_NODE(stack)) { /* The parent of b is the root */
@@ -1001,11 +1038,11 @@ get_next_base_node_and_path(DbTableCommon *common_table_data,
stack);
} else {
Eterm pkey =
- TOP_NODE(stack)->baseOrRoute.route.key.tpl[0]; /* pKey = key of parent */
+ TOP_NODE(stack)->u.route.key.term; /* pKey = key of parent */
POP_NODE(stack);
while (!EMPTY_NODE(stack)) {
- if (TOP_NODE(stack)->baseOrRoute.route.is_valid &&
- cmp_key_route(common_table_data, pkey, TOP_NODE(stack)) <= 0) {
+ if (TOP_NODE(stack)->u.route.is_valid &&
+ cmp_key_route(pkey, TOP_NODE(stack)) <= 0) {
return leftmost_base_node_and_path(GET_RIGHT_ACQB(TOP_NODE(stack)), stack);
} else {
POP_NODE(stack);
@@ -1034,25 +1071,23 @@ lock_first_base_node(DbTable *tbl,
CATreeNodeStack *search_stack_ptr,
CATreeNodeStack *locked_base_nodes_stack_ptr)
{
- int retry;
DbTableCATreeNode *current_node;
DbTableCATreeBaseNode *base_node;
DbTableCATree* tb = &tbl->catree;
- do {
- retry = 0;
+ while (1) {
current_node = GET_ROOT_ACQB(tb);
while ( ! current_node->is_base_node ) {
PUSH_NODE(search_stack_ptr, current_node);
current_node = GET_LEFT_ACQB(current_node);
}
- base_node = &current_node->baseOrRoute.base;
+ base_node = &current_node->u.base;
rlock_base_node(base_node);
- if ( ! base_node->is_valid ) {
- /* Retry */
- runlock_base_node(base_node);
- retry = 1;
- }
- } while(retry);
+ if (base_node->is_valid)
+ break;
+ /* Retry */
+ runlock_base_node(base_node);
+ search_stack_ptr->pos = 0;
+ }
push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
return current_node;
}
@@ -1066,19 +1101,20 @@ find_and_lock_next_base_node_and_path(DbTable *tbl,
DbTableCATreeNode *current_node;
DbTableCATreeBaseNode *base_node;
CATreeNodeStack * tmp_stack_ptr;
- DbTableCommon* common_table_data;
- retry_find_and_lock_next_base_node:
- current_node = TOP_NODE(locked_base_nodes_stack_ptr);
- common_table_data = &tbl->common;
- clone_stack(*search_stack_ptr_ptr, *search_stack_copy_ptr_ptr);
- current_node =
- get_next_base_node_and_path(common_table_data, current_node, *search_stack_ptr_ptr);
- if (current_node == NULL) {
- return NULL;
- }
- base_node = &current_node->baseOrRoute.base;
- rlock_base_node(base_node);
- if ( ! base_node->is_valid ) {
+
+ while (1) {
+ current_node = TOP_NODE(locked_base_nodes_stack_ptr);
+ clone_stack(*search_stack_ptr_ptr, *search_stack_copy_ptr_ptr);
+ current_node =
+ get_next_base_node_and_path(current_node, *search_stack_ptr_ptr);
+ if (current_node == NULL) {
+ return NULL;
+ }
+ base_node = &current_node->u.base;
+ rlock_base_node(base_node);
+ if (base_node->is_valid)
+ break;
+
/* Retry */
runlock_base_node(base_node);
/* Revert to previous state */
@@ -1086,10 +1122,9 @@ find_and_lock_next_base_node_and_path(DbTable *tbl,
tmp_stack_ptr = *search_stack_ptr_ptr;
*search_stack_ptr_ptr = *search_stack_copy_ptr_ptr;
*search_stack_copy_ptr_ptr = tmp_stack_ptr;
- goto retry_find_and_lock_next_base_node;
- } else {
- push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
}
+
+ push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
return base_node;
}
@@ -1102,7 +1137,7 @@ void unlock_and_release_locked_base_node_stack(DbTable *tbl,
int i;
for (i = 0; i < locked_base_nodes_stack_ptr->pos; i++) {
current_node = locked_base_nodes_stack_ptr->array[i];
- base_node = &current_node->baseOrRoute.base;
+ base_node = &current_node->u.base;
if (locked_base_nodes_stack_ptr->pos > 1) {
base_node->lock_statistics = /* This is not atomic which is fine as */
base_node->lock_statistics + /* correctness does not depend on that. */
@@ -1166,19 +1201,18 @@ lock_base_node_with_key(DbTable *tbl,
DbTableCATreeNode *current_node;
DbTableCATreeBaseNode *base_node;
DbTableCATree* tb = &tbl->catree;
- DbTableCommon* common_table_data = &tbl->common;
do {
retry = 0;
current_node = GET_ROOT_ACQB(tb);
while ( ! current_node->is_base_node ) {
PUSH_NODE(search_stack_ptr, current_node);
- if( cmp_key_route(common_table_data,key,current_node) < 0 ) {
+ if( cmp_key_route(key,current_node) < 0 ) {
current_node = GET_LEFT_ACQB(current_node);
} else {
current_node = GET_RIGHT_ACQB(current_node);
}
}
- base_node = &current_node->baseOrRoute.base;
+ base_node = &current_node->u.base;
rlock_base_node(base_node);
if ( ! base_node->is_valid ) {
/* Retry */
@@ -1196,100 +1230,90 @@ lock_base_node_with_key(DbTable *tbl,
* node to join with.
*/
static DbTableCATreeNode*
-erl_db_catree_force_join_right(DbTableCommon *common_table_data,
- DbTableCATreeNode *parent_container,
- DbTableCATreeNode *base_container,
+erl_db_catree_force_join_right(DbTableCATree *tb,
+ DbTableCATreeNode *parent,
+ DbTableCATreeNode *thiz,
DbTableCATreeNode **result_parent_wb)
{
- DbTableCATreeRouteNode *parent;
- DbTableCATreeNode *gparent_container;
- DbTableCATreeRouteNode *gparent;
- DbTableCATreeBaseNode *base = &base_container->baseOrRoute.base;
- DbTableCATree *tb = (DbTableCATree *)common_table_data;
- DbTableCATreeNode *neighbor_base_container;
- DbTableCATreeBaseNode *neighbor_base;
- DbTableCATreeNode *new_neighbor_base;
- DbTableCATreeNode *neighbor_base_parent;
- int neighbour_not_valid;
- if (parent_container == NULL) {
+ DbTableCATreeNode *gparent;
+ DbTableCATreeNode *neighbor;
+ DbTableCATreeNode *new_neighbor;
+ DbTableCATreeNode *neighbor_parent;
+ TreeDbTerm* new_root;
+
+ if (parent == NULL) {
return NULL;
}
- parent = &parent_container->baseOrRoute.route;
- do {
- neighbor_base_container = leftmost_base_node(GET_RIGHT_ACQB(parent_container));
- neighbor_base = &neighbor_base_container->baseOrRoute.base;
- wlock_base_node_no_stats(neighbor_base);
- neighbour_not_valid = !neighbor_base->is_valid;
- if (neighbour_not_valid) {
- wunlock_base_node(neighbor_base);
- }
- } while (neighbour_not_valid);
+ ASSERT(thiz == GET_LEFT(parent));
+ while (1) {
+ neighbor = leftmost_base_node(GET_RIGHT_ACQB(parent));
+ wlock_base_node_no_stats(&neighbor->u.base);
+ if (neighbor->u.base.is_valid)
+ break;
+ wunlock_base_node(&neighbor->u.base);
+ }
lock_route_node(parent);
- parent->is_valid = 0;
- neighbor_base->is_valid = 0;
- base->is_valid = 0;
- gparent = NULL;
- gparent_container = NULL;
- do {
- if (gparent != NULL) {
- unlock_route_node(gparent);
- }
- gparent_container = parent_of(tb, parent_container);
- if (gparent_container != NULL) {
- gparent = &gparent_container->baseOrRoute.route;
- lock_route_node(gparent);
- } else {
- gparent = NULL;
- }
- } while (gparent != NULL && !gparent->is_valid);
+ parent->u.route.is_valid = 0;
+ neighbor->u.base.is_valid = 0;
+ thiz->u.base.is_valid = 0;
+ while (1) {
+ gparent = parent_of(tb, parent);
+ if (gparent == NULL)
+ break;
+ lock_route_node(gparent);
+ if (gparent->u.route.is_valid)
+ break;
+ unlock_route_node(gparent);
+ }
if (gparent == NULL) {
- SET_ROOT_RELB(tb, GET_RIGHT(parent_container));
- } else if (GET_LEFT(gparent_container) == parent_container) {
- SET_LEFT_RELB(gparent_container, GET_RIGHT(parent_container));
+ SET_ROOT_RELB(tb, GET_RIGHT(parent));
+ } else if (GET_LEFT(gparent) == parent) {
+ SET_LEFT_RELB(gparent, GET_RIGHT(parent));
} else {
- SET_RIGHT_RELB(gparent_container, GET_RIGHT(parent_container));
+ SET_RIGHT_RELB(gparent, GET_RIGHT(parent));
}
unlock_route_node(parent);
if (gparent != NULL) {
unlock_route_node(gparent);
}
- new_neighbor_base = create_catree_base_node(tb);
- new_neighbor_base->baseOrRoute.base.root =
- join_trees(base->root, neighbor_base->root);
- wlock_base_node_no_stats(&(new_neighbor_base->baseOrRoute.base));
- neighbor_base_parent = NULL;
- if (GET_RIGHT(parent_container) == neighbor_base_container) {
- neighbor_base_parent = gparent_container;
+
+ new_root = join_trees(thiz->u.base.root, neighbor->u.base.root);
+ new_neighbor = create_wlocked_base_node(tb, new_root,
+ LC_ORDER(thiz->u.base.lc_key.term));
+
+ if (GET_RIGHT(parent) == neighbor) {
+ neighbor_parent = gparent;
} else {
- neighbor_base_parent =
- leftmost_route_node(GET_RIGHT(parent_container));
+ neighbor_parent = leftmost_route_node(GET_RIGHT(parent));
}
- if(neighbor_base_parent == NULL) {
- SET_ROOT_RELB(tb, new_neighbor_base);
- } else if (GET_LEFT(neighbor_base_parent) == neighbor_base_container) {
- SET_LEFT_RELB(neighbor_base_parent, new_neighbor_base);
+ if(neighbor_parent == NULL) {
+ SET_ROOT_RELB(tb, new_neighbor);
+ } else if (GET_LEFT(neighbor_parent) == neighbor) {
+ SET_LEFT_RELB(neighbor_parent, new_neighbor);
} else {
- SET_RIGHT_RELB(neighbor_base_parent, new_neighbor_base);
+ SET_RIGHT_RELB(neighbor_parent, new_neighbor);
}
- wunlock_base_node(base);
- wunlock_base_node(neighbor_base);
+ wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(&neighbor->u.base);
/* Free the parent and base */
- erts_schedule_thr_prgr_later_op(free_catree_routing_node,
- parent_container,
- &parent->free_item);
- erts_schedule_thr_prgr_later_op(free_catree_base_node,
- base_container,
- &base->free_item);
- erts_schedule_thr_prgr_later_op(free_catree_base_node,
- neighbor_base_container,
- &neighbor_base->free_item);
-
- if (parent_container == neighbor_base_container) {
- *result_parent_wb = gparent_container;
- } else {
- *result_parent_wb = neighbor_base_parent;
- }
- return new_neighbor_base;
+ erts_schedule_db_free(&tb->common,
+ do_free_route_node,
+ parent,
+ &parent->u.route.free_item,
+ sizeof_route_node(parent->u.route.key.size));
+ erts_schedule_db_free(&tb->common,
+ do_free_base_node,
+ thiz,
+ &thiz->u.base.free_item,
+ sizeof_base_node(thiz->u.base.lc_key.size));
+ erts_schedule_db_free(&tb->common,
+ do_free_base_node,
+ neighbor,
+ &neighbor->u.base.free_item,
+ sizeof_base_node(neighbor->u.base.lc_key.size));
+
+ *result_parent_wb = neighbor_parent;
+ return new_neighbor;
}
/*
@@ -1298,243 +1322,232 @@ erl_db_catree_force_join_right(DbTableCommon *common_table_data,
* locked state.
*/
static DbTableCATreeNode *
-merge_to_one_locked_base_node(DbTableCommon * common_table_data)
+merge_to_one_locked_base_node(DbTableCATree* tb)
{
- DbTableCATreeNode *parent_container;
- DbTableCATreeNode *new_parent_container;
- DbTableCATree *tb = (DbTableCATree *)common_table_data;
- DbTableCATreeNode *base_container;
- DbTableCATreeNode *new_base_container;
+ DbTableCATreeNode *parent;
+ DbTableCATreeNode *new_parent;
+ DbTableCATreeNode *base;
+ DbTableCATreeNode *new_base;
int is_not_valid;
/* Find first base node */
do {
- parent_container = NULL;
- base_container = GET_ROOT_ACQB(tb);
- while ( ! base_container->is_base_node ) {
- parent_container = base_container;
- base_container = GET_LEFT_ACQB(base_container);
+ parent = NULL;
+ base = GET_ROOT_ACQB(tb);
+ while ( ! base->is_base_node ) {
+ parent = base;
+ base = GET_LEFT_ACQB(base);
}
- wlock_base_node_no_stats(&(base_container->baseOrRoute.base));
- is_not_valid = ! base_container->baseOrRoute.base.is_valid;
+ wlock_base_node_no_stats(&(base->u.base));
+ is_not_valid = ! base->u.base.is_valid;
if (is_not_valid) {
- wunlock_base_node(&(base_container->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
}
} while(is_not_valid);
do {
- new_base_container = erl_db_catree_force_join_right(common_table_data,
- parent_container,
- base_container,
- &new_parent_container);
- if (new_base_container != NULL) {
- base_container = new_base_container;
- parent_container = new_parent_container;
+ new_base = erl_db_catree_force_join_right(tb,
+ parent,
+ base,
+ &new_parent);
+ if (new_base != NULL) {
+ base = new_base;
+ parent = new_parent;
}
- } while(new_base_container != NULL);
- return base_container;
+ } while(new_base != NULL);
+ return base;
}
static void join_catree(DbTableCATree *tb,
- DbTableCATreeNode *parent_container,
- DbTableCATreeNode *base_container)
-{
- DbTableCATreeRouteNode *parent;
- DbTableCATreeNode *gparent_container;
- DbTableCATreeRouteNode *gparent;
- DbTableCATreeBaseNode *base = &base_container->baseOrRoute.base;
- DbTableCATreeNode *neighbor_base_container;
- DbTableCATreeBaseNode *neighbor_base;
- DbTableCATreeNode *new_neighbor_base;
- DbTableCATreeNode *neighbor_base_parent;
- if (parent_container == NULL) {
- base->lock_statistics = 0;
- wunlock_base_node(base);
+ DbTableCATreeNode *parent,
+ DbTableCATreeNode *thiz)
+{
+ DbTableCATreeNode *gparent;
+ DbTableCATreeNode *neighbor;
+ DbTableCATreeNode *new_neighbor;
+ DbTableCATreeNode *neighbor_parent;
+
+ ASSERT(thiz->is_base_node);
+ if (parent == NULL) {
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(&thiz->u.base);
return;
}
- parent = &parent_container->baseOrRoute.route;
- if (GET_LEFT(parent_container) == base_container) {
- neighbor_base_container = leftmost_base_node(GET_RIGHT_ACQB(parent_container));
- neighbor_base = &neighbor_base_container->baseOrRoute.base;
- if (try_wlock_base_node(neighbor_base)) {
+ ASSERT(!parent->is_base_node);
+ if (GET_LEFT(parent) == thiz) {
+ neighbor = leftmost_base_node(GET_RIGHT_ACQB(parent));
+ if (try_wlock_base_node(&neighbor->u.base)) {
/* Failed to acquire lock */
- base->lock_statistics = 0;
- wunlock_base_node(base);
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(&thiz->u.base);
return;
- } else if (!neighbor_base->is_valid) {
- base->lock_statistics = 0;
- wunlock_base_node(base);
- wunlock_base_node(neighbor_base);
+ } else if (!neighbor->u.base.is_valid) {
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(&neighbor->u.base);
return;
} else {
lock_route_node(parent);
- parent->is_valid = 0;
- neighbor_base->is_valid = 0;
- base->is_valid = 0;
+ parent->u.route.is_valid = 0;
+ neighbor->u.base.is_valid = 0;
+ thiz->u.base.is_valid = 0;
gparent = NULL;
- gparent_container = NULL;
do {
if (gparent != NULL) {
unlock_route_node(gparent);
}
- gparent_container = parent_of(tb, parent_container);
- if (gparent_container != NULL) {
- gparent = &gparent_container->baseOrRoute.route;
+ gparent = parent_of(tb, parent);
+ if (gparent != NULL)
lock_route_node(gparent);
- } else {
- gparent = NULL;
- }
- } while (gparent != NULL && !gparent->is_valid);
+ } while (gparent != NULL && !gparent->u.route.is_valid);
+
if (gparent == NULL) {
- SET_ROOT_RELB(tb, GET_RIGHT(parent_container));
- } else if (GET_LEFT(gparent_container) == parent_container) {
- SET_LEFT_RELB(gparent_container, GET_RIGHT(parent_container));
+ SET_ROOT_RELB(tb, GET_RIGHT(parent));
+ } else if (GET_LEFT(gparent) == parent) {
+ SET_LEFT_RELB(gparent, GET_RIGHT(parent));
} else {
- SET_RIGHT_RELB(gparent_container, GET_RIGHT(parent_container));
+ SET_RIGHT_RELB(gparent, GET_RIGHT(parent));
}
unlock_route_node(parent);
if (gparent != NULL) {
unlock_route_node(gparent);
}
- new_neighbor_base = create_catree_base_node(tb);
- new_neighbor_base->baseOrRoute.base.root =
- join_trees(base->root, neighbor_base->root);
- neighbor_base_parent = NULL;
- if (GET_RIGHT(parent_container) == neighbor_base_container) {
- neighbor_base_parent = gparent_container;
+ {
+ TreeDbTerm* new_root = join_trees(thiz->u.base.root,
+ neighbor->u.base.root);
+ new_neighbor = create_base_node(tb, new_root,
+ LC_ORDER(thiz->u.base.lc_key.term));
+ }
+ if (GET_RIGHT(parent) == neighbor) {
+ neighbor_parent = gparent;
} else {
- neighbor_base_parent =
- leftmost_route_node(GET_RIGHT(parent_container));
+ neighbor_parent = leftmost_route_node(GET_RIGHT(parent));
}
}
} else { /* Symetric case */
- neighbor_base_container = rightmost_base_node(GET_LEFT_ACQB(parent_container));
- neighbor_base = &neighbor_base_container->baseOrRoute.base;
- if (try_wlock_base_node(neighbor_base)) {
+ ASSERT(GET_RIGHT(parent) == thiz);
+ neighbor = rightmost_base_node(GET_LEFT_ACQB(parent));
+ if (try_wlock_base_node(&neighbor->u.base)) {
/* Failed to acquire lock */
- base->lock_statistics = 0;
- wunlock_base_node(base);
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(&thiz->u.base);
return;
- } else if (!neighbor_base->is_valid) {
- base->lock_statistics = 0;
- wunlock_base_node(base);
- wunlock_base_node(neighbor_base);
+ } else if (!neighbor->u.base.is_valid) {
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(&neighbor->u.base);
return;
} else {
lock_route_node(parent);
- parent->is_valid = 0;
- neighbor_base->is_valid = 0;
- base->is_valid = 0;
+ parent->u.route.is_valid = 0;
+ neighbor->u.base.is_valid = 0;
+ thiz->u.base.is_valid = 0;
gparent = NULL;
- gparent_container = NULL;
do {
if (gparent != NULL) {
unlock_route_node(gparent);
}
- gparent_container = parent_of(tb, parent_container);
- if (gparent_container != NULL) {
- gparent = &gparent_container->baseOrRoute.route;
+ gparent = parent_of(tb, parent);
+ if (gparent != NULL) {
lock_route_node(gparent);
} else {
gparent = NULL;
}
- } while (gparent != NULL && !gparent->is_valid);
+ } while (gparent != NULL && !gparent->u.route.is_valid);
if (gparent == NULL) {
- SET_ROOT_RELB(tb, GET_LEFT(parent_container));
- } else if (GET_RIGHT(gparent_container) == parent_container) {
- SET_RIGHT_RELB(gparent_container, GET_LEFT(parent_container));
+ SET_ROOT_RELB(tb, GET_LEFT(parent));
+ } else if (GET_RIGHT(gparent) == parent) {
+ SET_RIGHT_RELB(gparent, GET_LEFT(parent));
} else {
- SET_LEFT_RELB(gparent_container, GET_LEFT(parent_container));
+ SET_LEFT_RELB(gparent, GET_LEFT(parent));
}
unlock_route_node(parent);
if (gparent != NULL) {
unlock_route_node(gparent);
}
- new_neighbor_base = create_catree_base_node(tb);
- new_neighbor_base->baseOrRoute.base.root =
- join_trees(neighbor_base->root, base->root);
- neighbor_base_parent = NULL;
- if (GET_LEFT(parent_container) == neighbor_base_container) {
- neighbor_base_parent = gparent_container;
+ {
+ TreeDbTerm* new_root = join_trees(neighbor->u.base.root,
+ thiz->u.base.root);
+ new_neighbor = create_base_node(tb, new_root,
+ LC_ORDER(thiz->u.base.lc_key.term));
+ }
+ if (GET_LEFT(parent) == neighbor) {
+ neighbor_parent = gparent;
} else {
- neighbor_base_parent =
- rightmost_route_node(GET_LEFT(parent_container));
+ neighbor_parent =
+ rightmost_route_node(GET_LEFT(parent));
}
}
}
/* Link in new neighbor and free nodes that are no longer in the tree */
- if (neighbor_base_parent == NULL) {
- SET_ROOT_RELB(tb, new_neighbor_base);
- } else if (GET_LEFT(neighbor_base_parent) == neighbor_base_container) {
- SET_LEFT_RELB(neighbor_base_parent, new_neighbor_base);
+ if (neighbor_parent == NULL) {
+ SET_ROOT_RELB(tb, new_neighbor);
+ } else if (GET_LEFT(neighbor_parent) == neighbor) {
+ SET_LEFT_RELB(neighbor_parent, new_neighbor);
} else {
- SET_RIGHT_RELB(neighbor_base_parent, new_neighbor_base);
+ SET_RIGHT_RELB(neighbor_parent, new_neighbor);
}
- wunlock_base_node(base);
- wunlock_base_node(neighbor_base);
+ wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(&neighbor->u.base);
/* Free the parent and base */
- erts_schedule_thr_prgr_later_op(free_catree_routing_node,
- parent_container,
- &parent->free_item);
- erts_schedule_thr_prgr_later_op(free_catree_base_node,
- base_container,
- &base->free_item);
- erts_schedule_thr_prgr_later_op(free_catree_base_node,
- neighbor_base_container,
- &neighbor_base->free_item);
-}
-
-
-static void split_catree(DbTableCommon *tb,
- DbTableCATreeNode *parent_container,
- DbTableCATreeNode *base_container) {
+ erts_schedule_db_free(&tb->common,
+ do_free_route_node,
+ parent,
+ &parent->u.route.free_item,
+ sizeof_route_node(parent->u.route.key.size));
+ erts_schedule_db_free(&tb->common,
+ do_free_base_node,
+ thiz,
+ &thiz->u.base.free_item,
+ sizeof_base_node(thiz->u.base.lc_key.size));
+ erts_schedule_db_free(&tb->common,
+ do_free_base_node,
+ neighbor,
+ &neighbor->u.base.free_item,
+ sizeof_base_node(neighbor->u.base.lc_key.size));
+}
+
+static void split_catree(DbTableCATree *tb,
+ DbTableCATreeNode* ERTS_RESTRICT parent,
+ DbTableCATreeNode* ERTS_RESTRICT base) {
TreeDbTerm *splitOutWriteBack;
- TreeDbTerm *leftWriteBack;
- TreeDbTerm *rightWriteBack;
- DbTableCATreeNode *left_base_node;
- DbTableCATreeNode *right_base_node;
- DbTableCATreeNode *routing_node_container;
- DbTableCATreeBaseNode *base = &base_container->baseOrRoute.base;
- DbTableCATreeRouteNode *parent;
- if (parent_container == NULL) {
- parent = NULL;
- } else {
- parent = &parent_container->baseOrRoute.route;
- }
+ DbTableCATreeNode* ERTS_RESTRICT new_left;
+ DbTableCATreeNode* ERTS_RESTRICT new_right;
+ DbTableCATreeNode* ERTS_RESTRICT new_route;
- if (less_than_two_elements(base->root)) {
- base->lock_statistics = 0;
- wunlock_base_node(base);
+ if (less_than_two_elements(base->u.base.root)) {
+ base->u.base.lock_statistics = 0;
+ wunlock_base_node(&base->u.base);
return;
} else {
- /* Split the tree */
- split_tree(tb,
- base->root,
- &splitOutWriteBack,
- &leftWriteBack,
- &rightWriteBack);
- /* Create new base nodes */
- left_base_node =
- create_catree_base_node((DbTableCATree*)tb);
- right_base_node =
- create_catree_base_node((DbTableCATree*)tb);
- left_base_node->baseOrRoute.base.root = leftWriteBack;
- right_base_node->baseOrRoute.base.root = rightWriteBack;
- routing_node_container = create_catree_route_node(tb,
- left_base_node,
- right_base_node,
- &splitOutWriteBack->dbterm);
+ TreeDbTerm *left_tree;
+ TreeDbTerm *right_tree;
+
+ split_tree(tb, base->u.base.root, &splitOutWriteBack,
+ &left_tree, &right_tree);
+
+ new_left = create_base_node(tb, left_tree,
+ LC_ORDER(GETKEY(tb, left_tree->dbterm.tpl)));
+ new_right = create_base_node(tb, right_tree,
+ LC_ORDER(GETKEY(tb, right_tree->dbterm.tpl)));
+ new_route = create_route_node(tb,
+ new_left,
+ new_right,
+ &splitOutWriteBack->dbterm,
+ parent);
if (parent == NULL) {
- SET_ROOT_RELB((DbTableCATree*)tb, routing_node_container);
- } else if(GET_LEFT(parent_container) == base_container) {
- SET_LEFT_RELB(parent_container, routing_node_container);
+ SET_ROOT_RELB(tb, new_route);
+ } else if(GET_LEFT(parent) == base) {
+ SET_LEFT_RELB(parent, new_route);
} else {
- SET_RIGHT_RELB(parent_container, routing_node_container);
+ SET_RIGHT_RELB(parent, new_route);
}
- base->is_valid = 0;
- wunlock_base_node(base);
- erts_schedule_thr_prgr_later_op(free_catree_base_node,
- base_container,
- &base->free_item);
+ base->u.base.is_valid = 0;
+ wunlock_base_node(&base->u.base);
+ erts_schedule_db_free(&tb->common,
+ do_free_base_node,
+ base,
+ &base->u.base.free_item,
+ sizeof_base_node(base->u.base.lc_key.size));
}
}
@@ -1546,7 +1559,7 @@ static void catree_add_base_node_to_free_list(
DbTableCATree *tb,
DbTableCATreeNode *base_node_container)
{
- base_node_container->baseOrRoute.base.next =
+ base_node_container->u.base.next =
tb->base_nodes_to_free_list;
tb->base_nodes_to_free_list = base_node_container;
}
@@ -1557,7 +1570,7 @@ static void catree_deque_base_node_from_free_list(DbTableCATree *tb)
return; /* List empty */
} else {
DbTableCATreeNode *first = tb->base_nodes_to_free_list;
- tb->base_nodes_to_free_list = first->baseOrRoute.base.next;
+ tb->base_nodes_to_free_list = first->u.base.next;
}
}
@@ -1596,7 +1609,7 @@ static SWord do_free_routing_nodes_catree_cont(DbTableCATree *tb, SWord num_left
PUSH_NODE(&tb->free_stack_rnodes, root);
root = p;
} else {
- free_catree_routing_node(root);
+ free_catree_route_node(tb, root);
if (--num_left >= 0) {
break;
} else {
@@ -1637,10 +1650,10 @@ static SWord do_free_base_node_cont(DbTableCATree *tb, SWord num_left)
}
}
catree_deque_base_node_from_free_list(tb);
- free_catree_base_node(base_node_container);
+ free_catree_base_node(tb, base_node_container);
base_node_container = catree_first_base_node_from_free_list(tb);
if (base_node_container != NULL) {
- PUSH_NODE(&tb->free_stack_elems, base_node_container->baseOrRoute.base.root);
+ PUSH_NODE(&tb->free_stack_elems, base_node_container->u.base.root);
}
return num_left;
}
@@ -1662,7 +1675,7 @@ void db_initialize_catree(void)
int db_create_catree(Process *p, DbTable *tbl)
{
DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *root = create_catree_base_node(tb);
+ DbTableCATreeNode *root = create_base_node(tb, NULL, NIL);
tb->deletion = 0;
tb->base_nodes_to_free_list = NULL;
erts_atomic_init_relb(&(tb->root), (erts_aint_t)root);
@@ -1675,7 +1688,7 @@ static int db_first_catree(Process *p, DbTable *tbl, Eterm *ret)
int result;
DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS;
/* Find first base node */
- base_node = &(lock_first_base_node(tbl, &search_stack, &locked_base_nodes_stack)->baseOrRoute.base);
+ base_node = &(lock_first_base_node(tbl, &search_stack, &locked_base_nodes_stack)->u.base);
/* Find next base node until non-empty base node is found */
while (base_node != NULL && base_node->root == NULL) {
base_node = find_and_lock_next_base_node_and_path(tbl, &search_stack_ptr, &search_stack_copy_ptr, locked_base_nodes_stack_ptr);
@@ -1716,10 +1729,9 @@ static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
static int db_last_catree(Process *p, DbTable *tbl, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common);
- int result = db_last_tree_common(p, tbl, base->baseOrRoute.base.root, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree);
+ int result = db_last_tree_common(p, tbl, base->u.base.root, ret, NULL);
+ wunlock_base_node(&(base->u.base));
return result;
}
@@ -1728,13 +1740,12 @@ static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_prev_tree_common(p, tbl, base->baseOrRoute.base.root, key, ret, &stack);
- wunlock_base_node(&(base->baseOrRoute.base));
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_prev_tree_common(p, tbl, base->u.base.root, key, ret, &stack);
+ wunlock_base_node(&(base->u.base));
return result;
}
@@ -1835,13 +1846,12 @@ static int db_erase_object_catree(DbTable *tbl, Eterm object, Eterm *ret)
static int db_slot_catree(Process *p, DbTable *tbl,
Eterm slot_term, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_slot_tree_common(p, tbl, base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_slot_tree_common(p, tbl, base->u.base.root,
slot_term, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
@@ -1850,13 +1860,12 @@ static int db_select_continue_catree(Process *p,
Eterm continuation,
Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_continue_tree_common(p, &tb->common, &base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_continue_tree_common(p, &tbl->common, &base->u.base.root,
continuation, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
@@ -1864,14 +1873,13 @@ static int db_select_continue_catree(Process *p,
static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, int reverse, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_tree_common(p, &tb->common, &base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_tree_common(p, &tbl->common, &base->u.base.root,
tid, pattern, reverse, ret,
NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
@@ -1880,27 +1888,25 @@ static int db_select_count_continue_catree(Process *p,
Eterm continuation,
Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_count_continue_tree_common(p, &tb->common,
- &base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_count_continue_tree_common(p, &tbl->common,
+ &base->u.base.root,
continuation, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_count_tree_common(p, &tb->common, &base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_count_tree_common(p, &tbl->common, &base->u.base.root,
tid, pattern, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
@@ -1908,13 +1914,12 @@ static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Sint chunk_size,
int reversed, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_chunk_tree_common(p, &tb->common, &base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_chunk_tree_common(p, &tbl->common, &base->u.base.root,
tid, pattern, chunk_size, reversed, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
@@ -1923,60 +1928,56 @@ static int db_select_delete_continue_catree(Process *p,
Eterm continuation,
Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
int result;
DbTableCATreeNode *base;
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_delete_continue_tree_common(p, tbl, &base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_delete_continue_tree_common(p, tbl, &base->u.base.root,
continuation, ret, &stack);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
int result;
DbTableCATreeNode *base;
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_delete_tree_common(p, tbl, &base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_delete_tree_common(p, tbl, &base->u.base.root,
tid, pattern, ret, &stack);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_replace_tree_common(p, &tb->common,
- &base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_replace_tree_common(p, &tbl->common,
+ &base->u.base.root,
tid, pattern, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_replace_continue_tree_common(p, &tb->common,
- &base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_replace_continue_tree_common(p, &tbl->common,
+ &base->u.base.root,
continuation, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
@@ -2002,10 +2003,9 @@ static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
static void db_print_catree(fmtfn_t to, void *to_arg,
int show, DbTable *tbl)
{
- DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common);
- db_print_tree_common(to, to_arg, show, base->baseOrRoute.base.root, tbl);
- wunlock_base_node(&(base->baseOrRoute.base));
+ DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree);
+ db_print_tree_common(to, to_arg, show, base->u.base.root, tbl);
+ wunlock_base_node(&(base->u.base));
}
/* Release all memory occupied by a single table */
@@ -2045,7 +2045,7 @@ static SWord db_free_table_continue_catree(DbTable *tbl, SWord reds)
} else {
tb->is_routing_nodes_freed = 1; /* Ready with the routing nodes */
first_base_node = catree_first_base_node_from_free_list(tb);
- PUSH_NODE(&tb->free_stack_elems, first_base_node->baseOrRoute.base.root);
+ PUSH_NODE(&tb->free_stack_elems, first_base_node->u.base.root);
}
}
while (catree_first_base_node_from_free_list(tb) != NULL) {
@@ -2081,10 +2081,9 @@ static void db_foreach_offheap_catree(DbTable *tbl,
void (*func)(ErlOffHeap *, void *),
void *arg)
{
- DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common);
- db_foreach_offheap_tree_common(base->baseOrRoute.base.root, func, arg);
- wunlock_base_node(&(base->baseOrRoute.base));
+ DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree);
+ db_foreach_offheap_tree_common(base->u.base.root, func, arg);
+ wunlock_base_node(&(base->u.base));
}
static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
@@ -2111,7 +2110,7 @@ static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *handle)
DbTableCATreeNode *prev_node = handle->lck;
DbTableCATreeNode *current_node = handle->lck2;
int current_level = handle->current_level;
- DbTableCATreeBaseNode *base_node = &current_node->baseOrRoute.base;
+ DbTableCATreeBaseNode *base_node = &current_node->u.base;
db_finalize_dbterm_tree_common(cret, handle, NULL);
ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART;
return;