aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_db_catree.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_db_catree.c')
-rw-r--r--erts/emulator/beam/erl_db_catree.c1414
1 files changed, 733 insertions, 681 deletions
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c
index a8e48bce1b..b52a4a53fe 100644
--- a/erts/emulator/beam/erl_db_catree.c
+++ b/erts/emulator/beam/erl_db_catree.c
@@ -96,6 +96,12 @@
#include "erl_db_tree.h"
#include "erl_db_tree_util.h"
+#ifdef DEBUG
+# define IF_DEBUG(X) X
+#else
+# define IF_DEBUG(X)
+#endif
+
/*
** Forward declarations
*/
@@ -158,6 +164,14 @@ db_lookup_dbterm_catree(Process *, DbTable *, Eterm key, Eterm obj,
DbUpdateHandle*);
static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *);
+static void split_catree(DbTableCATree *tb,
+ DbTableCATreeNode* ERTS_RESTRICT base,
+ DbTableCATreeNode* ERTS_RESTRICT parent);
+static void join_catree(DbTableCATree *tb,
+ DbTableCATreeNode *thiz,
+ DbTableCATreeNode *parent);
+
+
/*
** External interface
*/
@@ -241,30 +255,6 @@ static ERTS_INLINE Sint cmp_key_route(Eterm key,
return CMP(key, GET_ROUTE_NODE_KEY(obj));
}
-static ERTS_INLINE void push_node_dyn_array(DbTable *tb,
- CATreeNodeStack *stack,
- DbTableCATreeNode *node)
-{
- int i;
- if (stack->pos == stack->size) {
- DbTableCATreeNode **newArray =
- erts_db_alloc(ERTS_ALC_T_DB_STK, tb,
- sizeof(DbTableCATreeNode*) * (stack->size*2));
- for (i = 0; i < stack->pos; i++) {
- newArray[i] = stack->array[i];
- }
- if (stack->size > STACK_NEED) {
- /* Dynamically allocated array that needs to be deallocated */
- erts_db_free(ERTS_ALC_T_DB_STK, tb,
- stack->array,
- sizeof(DbTableCATreeNode *) * stack->size);
- }
- stack->array = newArray;
- stack->size = stack->size*2;
- }
- PUSH_NODE(stack, node);
-}
-
/*
* Used by the split_tree function
*/
@@ -578,14 +568,14 @@ static TreeDbTerm* join_trees(TreeDbTerm *left_root_param,
p = *this;
if (dir == DIR_LEFT) {
switch (p->balance) {
- case 1:
+ case 1:
p->balance = 0;
state = 0;
break;
- case 0:
+ case 0:
p->balance = -1;
break;
- case -1: /* The icky case */
+ case -1: /* The icky case */
p1 = p->left;
if (p1->balance == -1) { /* Single LL rotation */
p->left = p1->right;
@@ -608,14 +598,14 @@ static TreeDbTerm* join_trees(TreeDbTerm *left_root_param,
}
} else { /* dir == DIR_RIGHT */
switch (p->balance) {
- case -1:
+ case -1:
p->balance = 0;
state = 0;
break;
- case 0:
+ case 0:
p->balance = 1;
break;
- case 1:
+ case 1:
p1 = p->right;
if (p1->balance == 1) { /* Single RR rotation */
p->right = p1->left;
@@ -646,6 +636,31 @@ static TreeDbTerm* join_trees(TreeDbTerm *left_root_param,
}
}
+#ifdef DEBUG
+# define FORCE_RANDOM_SPLIT_JOIN
+#endif
+#ifdef FORCE_RANDOM_SPLIT_JOIN
+static int dbg_fastrand(void)
+{
+ static int g_seed = 648835;
+ g_seed = (214013*g_seed+2531011);
+ return (g_seed>>16)&0x7FFF;
+}
+
+static void dbg_maybe_force_splitjoin(DbTableCATreeNode* base_node)
+{
+ switch (dbg_fastrand() % 8) {
+ case 1:
+ base_node->u.base.lock_statistics = 1+ERL_DB_CATREE_HIGH_CONTENTION_LIMIT;
+ break;
+ case 2:
+ base_node->u.base.lock_statistics = -1+ERL_DB_CATREE_LOW_CONTENTION_LIMIT;
+ break;
+ }
+}
+#else
+# define dbg_maybe_force_splitjoin(N)
+#endif /* FORCE_RANDOM_SPLIT_JOIN */
static ERTS_INLINE
int try_wlock_base_node(DbTableCATreeBaseNode *base_node)
@@ -657,9 +672,10 @@ int try_wlock_base_node(DbTableCATreeBaseNode *base_node)
* Locks a base node without adjusting the lock statistics
*/
static ERTS_INLINE
-void wlock_base_node_no_stats(DbTableCATreeBaseNode *base_node)
+void wlock_base_node_no_stats(DbTableCATreeNode *base_node)
{
- erts_rwmtx_rwlock(&base_node->lock);
+ ASSERT(base_node->is_base_node);
+ erts_rwmtx_rwlock(&base_node->u.base.lock);
}
/*
@@ -667,33 +683,57 @@ void wlock_base_node_no_stats(DbTableCATreeBaseNode *base_node)
* the lock was contended or not
*/
static ERTS_INLINE
-void wlock_base_node(DbTableCATreeBaseNode *base_node)
+void wlock_base_node(DbTableCATreeNode *base_node)
{
- if (try_wlock_base_node(base_node)) {
+ ASSERT(base_node->is_base_node);
+ if (try_wlock_base_node(&base_node->u.base)) {
/* The lock is contended */
wlock_base_node_no_stats(base_node);
- base_node->lock_statistics += ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION;
+ base_node->u.base.lock_statistics += ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION;
} else {
- base_node->lock_statistics += ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION;
+ base_node->u.base.lock_statistics += ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION;
}
}
static ERTS_INLINE
-void wunlock_base_node(DbTableCATreeBaseNode *base_node)
+void wunlock_base_node(DbTableCATreeNode *base_node)
{
- erts_rwmtx_rwunlock(&base_node->lock);
+ erts_rwmtx_rwunlock(&base_node->u.base.lock);
}
static ERTS_INLINE
-void rlock_base_node(DbTableCATreeBaseNode *base_node)
+void wunlock_adapt_base_node(DbTableCATree* tb,
+ DbTableCATreeNode* node,
+ DbTableCATreeNode* parent,
+ int current_level)
+{
+ dbg_maybe_force_splitjoin(node);
+ if ((!node->u.base.root && parent && !(tb->common.status
+ & DB_CATREE_FORCE_SPLIT))
+ || node->u.base.lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) {
+ join_catree(tb, node, parent);
+ }
+ else if (node->u.base.lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT
+ && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) {
+ split_catree(tb, node, parent);
+ }
+ else {
+ wunlock_base_node(node);
+ }
+}
+
+static ERTS_INLINE
+void rlock_base_node(DbTableCATreeNode *base_node)
{
- erts_rwmtx_rlock(&base_node->lock);
+ ASSERT(base_node->is_base_node);
+ erts_rwmtx_rlock(&base_node->u.base.lock);
}
static ERTS_INLINE
-void runlock_base_node(DbTableCATreeBaseNode *base_node)
+void runlock_base_node(DbTableCATreeNode *base_node)
{
- erts_rwmtx_runlock(&base_node->lock);
+ ASSERT(base_node->is_base_node);
+ erts_rwmtx_runlock(&base_node->u.base.lock);
}
static ERTS_INLINE
@@ -710,80 +750,121 @@ void unlock_route_node(DbTableCATreeNode *route_node)
erts_mtx_unlock(&route_node->u.route.lock);
}
+static ERTS_INLINE
+void init_root_iterator(DbTableCATree* tb, CATreeRootIterator* iter,
+ int read_only)
+{
+ iter->tb = tb;
+ iter->read_only = read_only;
+ iter->locked_bnode = NULL;
+ iter->next_route_key = THE_NON_VALUE;
+ iter->search_key = NULL;
+}
-/*
- * The following macros are used to create the ETS functions that only
- * need to access one element (e.g. db_put_catree, db_get_catree and
- * db_erase_catree).
- */
+static ERTS_INLINE
+void lock_iter_base_node(CATreeRootIterator* iter,
+ DbTableCATreeNode *base_node,
+ DbTableCATreeNode *parent,
+ int current_level)
+{
+ ASSERT(!iter->locked_bnode);
+ if (iter->read_only)
+ rlock_base_node(base_node);
+ else {
+ wlock_base_node(base_node);
+ iter->bnode_parent = parent;
+ iter->bnode_level = current_level;
+ }
+ iter->locked_bnode = base_node;
+}
-#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(LOCK,UNLOCK) \
- int retry; \
- DbTableCATreeNode *current_node; \
- DbTableCATreeNode *prev_node; \
- DbTableCATreeBaseNode *base_node; \
- int current_level; \
- (void)prev_node; \
- do { \
- retry = 0; \
- current_node = GET_ROOT_ACQB(tb); \
- prev_node = NULL; \
- current_level = 0; \
- while ( ! current_node->is_base_node ) { \
- current_level = current_level + 1; \
- prev_node = current_node; \
- if (cmp_key_route(key,current_node) < 0) { \
- current_node = GET_LEFT_ACQB(current_node); \
- } else { \
- current_node = GET_RIGHT_ACQB(current_node); \
- } \
- } \
- base_node = &current_node->u.base; \
- LOCK(base_node); \
- if ( ! base_node->is_valid ) { \
- /* Retry */ \
- UNLOCK(base_node); \
- retry = 1; \
- } \
- } while(retry);
-
-
-#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \
- if (base_node->lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT \
- && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) { \
- split_catree(tb, prev_node, current_node); \
- } else if (base_node->lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) { \
- join_catree(tb, prev_node, current_node); \
- } else { \
- wunlock_base_node(base_node); \
+static ERTS_INLINE
+void unlock_iter_base_node(CATreeRootIterator* iter)
+{
+ ASSERT(iter->locked_bnode);
+ if (iter->read_only)
+ runlock_base_node(iter->locked_bnode);
+ else if (iter->locked_bnode->u.base.is_valid) {
+ wunlock_adapt_base_node(iter->tb, iter->locked_bnode,
+ iter->bnode_parent, iter->bnode_level);
}
+ else
+ wunlock_base_node(iter->locked_bnode);
+ iter->locked_bnode = NULL;
+}
+
+static ERTS_INLINE
+void destroy_root_iterator(CATreeRootIterator* iter)
+{
+ if (iter->locked_bnode)
+ unlock_iter_base_node(iter);
+ if (iter->search_key)
+ erts_free(ERTS_ALC_T_DB_TMP, iter->search_key);
+}
-#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \
- static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \
- Eterm key, \
- PARAMETERS){ \
- int result; \
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node) \
- result = SEQUENTAIL_CALL; \
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \
- return result; \
+
+typedef struct
+{
+ DbTableCATreeNode *parent;
+ int current_level;
+} FindBaseNode;
+
+static ERTS_INLINE
+DbTableCATreeNode* find_base_node(DbTableCATree* tb, Eterm key,
+ FindBaseNode* fbn)
+{
+ DbTableCATreeNode* ERTS_RESTRICT node = GET_ROOT_ACQB(tb);
+ if (fbn) {
+ fbn->parent = NULL;
+ fbn->current_level = 0;
}
+ while (!node->is_base_node) {
+ if (fbn) {
+ fbn->current_level++;
+ fbn->parent = node;
+ }
+ if (cmp_key_route(key, node) < 0) {
+ node = GET_LEFT_ACQB(node);
+ } else {
+ node = GET_RIGHT_ACQB(node);
+ }
+ }
+ return node;
+}
+static ERTS_INLINE
+DbTableCATreeNode* find_rlock_valid_base_node(DbTableCATree* tb, Eterm key)
+{
+ DbTableCATreeNode* base_node;
-#define ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \
- static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \
- Eterm key, \
- PARAMETERS){ \
- int result; \
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(rlock_base_node,runlock_base_node) \
- result = SEQUENTAIL_CALL; \
- runlock_base_node(base_node); \
- return result; \
+ while (1) {
+ base_node = find_base_node(tb, key, NULL);
+ rlock_base_node(base_node);
+ if (base_node->u.base.is_valid)
+ break;
+ runlock_base_node(base_node);
}
+ return base_node;
+}
+
+static ERTS_INLINE
+DbTableCATreeNode* find_wlock_valid_base_node(DbTableCATree* tb, Eterm key,
+ FindBaseNode* fbn)
+{
+ DbTableCATreeNode* base_node;
+ while (1) {
+ base_node = find_base_node(tb, key, fbn);
+ wlock_base_node(base_node);
+ if (base_node->u.base.is_valid)
+ break;
+ wunlock_base_node(base_node);
+ }
+ return base_node;
+}
static ERTS_INLINE
-void copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size)
+Eterm copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size)
{
dst->size = key_size;
if (key_size != 0) {
@@ -798,6 +879,7 @@ void copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size)
dst->term = key;
dst->oh = NULL;
}
+ return dst->term;
}
static ERTS_INLINE
@@ -812,27 +894,21 @@ void destroy_route_key(DbRouteKey* key)
#ifdef ERTS_ENABLE_LOCK_CHECK
-# define sizeof_base_node(KEY_SZ) \
- (offsetof(DbTableCATreeNode, u.base.lc_key.heap) \
- + (KEY_SZ)*sizeof(Eterm))
# define LC_ORDER(ORDER) ORDER
#else
-# define sizeof_base_node(KEY_SZ) \
- offsetof(DbTableCATreeNode, u.base.end_of_struct__)
# define LC_ORDER(ORDER) NIL
#endif
+#define sizeof_base_node() \
+ offsetof(DbTableCATreeNode, u.base.end_of_struct__)
+
static DbTableCATreeNode *create_base_node(DbTableCATree *tb,
- TreeDbTerm* root,
- Eterm lc_key)
+ TreeDbTerm* root)
{
DbTableCATreeNode *p;
erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
-#ifdef ERTS_ENABLE_LOCK_CHECK
- Eterm lc_key_size = size_object(lc_key);
-#endif
p = erts_db_alloc(ERTS_ALC_T_DB_TABLE, (DbTable *) tb,
- sizeof_base_node(lc_key_size));
+ sizeof_base_node());
p->is_base_node = 1;
p->u.base.root = root;
@@ -841,32 +917,16 @@ static DbTableCATreeNode *create_base_node(DbTableCATree *tb,
if (erts_ets_rwmtx_spin_count >= 0)
rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
-#ifdef ERTS_ENABLE_LOCK_CHECK
- copy_route_key(&p->u.base.lc_key, lc_key, lc_key_size);
-#endif
erts_rwmtx_init_opt(&p->u.base.lock, &rwmtx_opt,
"erl_db_catree_base_node",
- lc_key,
+ NIL,
ERTS_LOCK_FLAGS_CATEGORY_DB);
- p->u.base.lock_statistics = 0;
+ p->u.base.lock_statistics = ((tb->common.status & DB_CATREE_FORCE_SPLIT)
+ ? INT_MAX : 0);
p->u.base.is_valid = 1;
return p;
}
-static ERTS_INLINE
-DbTableCATreeNode *create_wlocked_base_node(DbTableCATree *tb,
- TreeDbTerm* root,
- Eterm lc_key)
-{
- DbTableCATreeNode* p = create_base_node(tb, root, lc_key);
- ethr_rwmutex_rwlock(&p->u.base.lock.rwmtx);
-#ifdef ERTS_ENABLE_LOCK_CHECK
- erts_lc_trylock_flg(-1, &p->u.base.lock.lc, ERTS_LOCK_OPTIONS_RDWR);
-#endif
- return p;
-}
-
-
static ERTS_INLINE Uint sizeof_route_node(Uint key_size)
{
return (offsetof(DbTableCATreeNode, u.route.key.heap)
@@ -913,16 +973,13 @@ static void do_free_base_node(void* vptr)
DbTableCATreeNode *p = (DbTableCATreeNode *)vptr;
ASSERT(p->is_base_node);
erts_rwmtx_destroy(&p->u.base.lock);
-#ifdef ERTS_ENABLE_LOCK_CHECK
- destroy_route_key(&p->u.base.lc_key);
-#endif
erts_free(ERTS_ALC_T_DB_TABLE, p);
}
static void free_catree_base_node(DbTableCATree* tb, DbTableCATreeNode* p)
{
ASSERT(p->is_base_node);
- ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_base_node(p->u.base.lc_key.size), 0);
+ ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_base_node(), 0);
do_free_base_node(p);
}
@@ -1014,154 +1071,6 @@ rightmost_route_node(DbTableCATreeNode *root)
return prev_node;
}
-static ERTS_INLINE DbTableCATreeNode*
-leftmost_base_node_and_path(DbTableCATreeNode *root, CATreeNodeStack * stack)
-{
- DbTableCATreeNode * node = root;
- while (!node->is_base_node) {
- PUSH_NODE(stack, node);
- node = GET_LEFT_ACQB(node);
- }
- return node;
-}
-
-static ERTS_INLINE DbTableCATreeNode*
-get_next_base_node_and_path(DbTableCATreeNode *base_node,
- CATreeNodeStack *stack)
-{
- if (EMPTY_NODE(stack)) { /* The parent of b is the root */
- return NULL;
- } else {
- if (GET_LEFT(TOP_NODE(stack)) == base_node) {
- return leftmost_base_node_and_path(
- GET_RIGHT_ACQB(TOP_NODE(stack)),
- stack);
- } else {
- Eterm pkey =
- TOP_NODE(stack)->u.route.key.term; /* pKey = key of parent */
- POP_NODE(stack);
- while (!EMPTY_NODE(stack)) {
- if (TOP_NODE(stack)->u.route.is_valid &&
- cmp_key_route(pkey, TOP_NODE(stack)) <= 0) {
- return leftmost_base_node_and_path(GET_RIGHT_ACQB(TOP_NODE(stack)), stack);
- } else {
- POP_NODE(stack);
- }
- }
- }
- return NULL;
- }
-}
-
-static ERTS_INLINE void
-clone_stack(CATreeNodeStack *search_stack_ptr,
- CATreeNodeStack *search_stack_copy_ptr)
-{
- int i;
- search_stack_copy_ptr->pos = search_stack_ptr->pos;
- for (i = 0; i < search_stack_ptr->pos; i++) {
- search_stack_copy_ptr->array[i] = search_stack_ptr->array[i];
- }
-}
-
-
-
-static ERTS_INLINE DbTableCATreeNode*
-lock_first_base_node(DbTable *tbl,
- CATreeNodeStack *search_stack_ptr,
- CATreeNodeStack *locked_base_nodes_stack_ptr)
-{
- DbTableCATreeNode *current_node;
- DbTableCATreeBaseNode *base_node;
- DbTableCATree* tb = &tbl->catree;
- while (1) {
- current_node = GET_ROOT_ACQB(tb);
- while ( ! current_node->is_base_node ) {
- PUSH_NODE(search_stack_ptr, current_node);
- current_node = GET_LEFT_ACQB(current_node);
- }
- base_node = &current_node->u.base;
- rlock_base_node(base_node);
- if (base_node->is_valid)
- break;
- /* Retry */
- runlock_base_node(base_node);
- search_stack_ptr->pos = 0;
- }
- push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
- return current_node;
-}
-
-static ERTS_INLINE DbTableCATreeBaseNode*
-find_and_lock_next_base_node_and_path(DbTable *tbl,
- CATreeNodeStack **search_stack_ptr_ptr,
- CATreeNodeStack **search_stack_copy_ptr_ptr,
- CATreeNodeStack *locked_base_nodes_stack_ptr)
-{
- DbTableCATreeNode *current_node;
- DbTableCATreeBaseNode *base_node;
- CATreeNodeStack * tmp_stack_ptr;
-
- while (1) {
- current_node = TOP_NODE(locked_base_nodes_stack_ptr);
- clone_stack(*search_stack_ptr_ptr, *search_stack_copy_ptr_ptr);
- current_node =
- get_next_base_node_and_path(current_node, *search_stack_ptr_ptr);
- if (current_node == NULL) {
- return NULL;
- }
- base_node = &current_node->u.base;
- rlock_base_node(base_node);
- if (base_node->is_valid)
- break;
-
- /* Retry */
- runlock_base_node(base_node);
- /* Revert to previous state */
- current_node = TOP_NODE(locked_base_nodes_stack_ptr);
- tmp_stack_ptr = *search_stack_ptr_ptr;
- *search_stack_ptr_ptr = *search_stack_copy_ptr_ptr;
- *search_stack_copy_ptr_ptr = tmp_stack_ptr;
- }
-
- push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
- return base_node;
-}
-
-static ERTS_INLINE
-void unlock_and_release_locked_base_node_stack(DbTable *tbl,
- CATreeNodeStack *locked_base_nodes_stack_ptr)
-{
- DbTableCATreeNode *current_node;
- DbTableCATreeBaseNode *base_node;
- int i;
- for (i = 0; i < locked_base_nodes_stack_ptr->pos; i++) {
- current_node = locked_base_nodes_stack_ptr->array[i];
- base_node = &current_node->u.base;
- if (locked_base_nodes_stack_ptr->pos > 1) {
- base_node->lock_statistics = /* This is not atomic which is fine as */
- base_node->lock_statistics + /* correctness does not depend on that. */
- ERL_DB_CATREE_LOCK_MORE_THAN_ONE_CONTRIBUTION;
- }
- runlock_base_node(base_node);
- }
- if (locked_base_nodes_stack_ptr->size > STACK_NEED) {
- erts_db_free(ERTS_ALC_T_DB_STK, tbl,
- locked_base_nodes_stack_ptr->array,
- sizeof(DbTableCATreeNode *) * locked_base_nodes_stack_ptr->size);
- }
-}
-
-static ERTS_INLINE
-void init_stack(CATreeNodeStack *stack,
- DbTableCATreeNode **stack_array,
- Uint init_size)
-{
- stack->array = stack_array;
- stack->pos = 0;
- stack->size = init_size;
-}
-
static ERTS_INLINE
void init_tree_stack(DbTreeStack *stack,
TreeDbTerm **stack_array,
@@ -1172,194 +1081,9 @@ void init_tree_stack(DbTreeStack *stack,
stack->slot = init_slot;
}
-#define DEC_ROUTE_NODE_STACK_AND_ARRAY(STACK_NAME) \
- CATreeNodeStack STACK_NAME; \
- CATreeNodeStack * STACK_NAME##_ptr = &(STACK_NAME); \
- DbTableCATreeNode *STACK_NAME##_array[STACK_NEED];
-
-#define DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS \
-DEC_ROUTE_NODE_STACK_AND_ARRAY(search_stack) \
-DEC_ROUTE_NODE_STACK_AND_ARRAY(search_stack_copy) \
-DEC_ROUTE_NODE_STACK_AND_ARRAY(locked_base_nodes_stack) \
-init_stack(&search_stack, search_stack_array, 0); \
-init_stack(&search_stack_copy, search_stack_copy_array, 0); \
-init_stack(&locked_base_nodes_stack, locked_base_nodes_stack_array, STACK_NEED);/* Abuse as stack array size*/
-
-/*
- * Locks and returns the base node that contains the specified key if
- * it is present. The function saves the search path to the found base
- * node in search_stack_ptr and adds the found base node to
- * locked_base_nodes_stack_ptr.
- */
-static ERTS_INLINE DbTableCATreeBaseNode *
-lock_base_node_with_key(DbTable *tbl,
- Eterm key,
- CATreeNodeStack * search_stack_ptr,
- CATreeNodeStack * locked_base_nodes_stack_ptr)
-{
- int retry;
- DbTableCATreeNode *current_node;
- DbTableCATreeBaseNode *base_node;
- DbTableCATree* tb = &tbl->catree;
- do {
- retry = 0;
- current_node = GET_ROOT_ACQB(tb);
- while ( ! current_node->is_base_node ) {
- PUSH_NODE(search_stack_ptr, current_node);
- if( cmp_key_route(key,current_node) < 0 ) {
- current_node = GET_LEFT_ACQB(current_node);
- } else {
- current_node = GET_RIGHT_ACQB(current_node);
- }
- }
- base_node = &current_node->u.base;
- rlock_base_node(base_node);
- if ( ! base_node->is_valid ) {
- /* Retry */
- runlock_base_node(base_node);
- retry = 1;
- }
- } while(retry);
- push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
- return base_node;
-}
-
-/*
- * Joins a base node with it's right neighbor. Returns the base node
- * resulting from the join in locked state or NULL if there is no base
- * node to join with.
- */
-static DbTableCATreeNode*
-erl_db_catree_force_join_right(DbTableCATree *tb,
- DbTableCATreeNode *parent,
- DbTableCATreeNode *thiz,
- DbTableCATreeNode **result_parent_wb)
-{
- DbTableCATreeNode *gparent;
- DbTableCATreeNode *neighbor;
- DbTableCATreeNode *new_neighbor;
- DbTableCATreeNode *neighbor_parent;
- TreeDbTerm* new_root;
-
- if (parent == NULL) {
- return NULL;
- }
- ASSERT(thiz == GET_LEFT(parent));
- while (1) {
- neighbor = leftmost_base_node(GET_RIGHT_ACQB(parent));
- wlock_base_node_no_stats(&neighbor->u.base);
- if (neighbor->u.base.is_valid)
- break;
- wunlock_base_node(&neighbor->u.base);
- }
- lock_route_node(parent);
- parent->u.route.is_valid = 0;
- neighbor->u.base.is_valid = 0;
- thiz->u.base.is_valid = 0;
- while (1) {
- gparent = parent_of(tb, parent);
- if (gparent == NULL)
- break;
- lock_route_node(gparent);
- if (gparent->u.route.is_valid)
- break;
- unlock_route_node(gparent);
- }
- if (gparent == NULL) {
- SET_ROOT_RELB(tb, GET_RIGHT(parent));
- } else if (GET_LEFT(gparent) == parent) {
- SET_LEFT_RELB(gparent, GET_RIGHT(parent));
- } else {
- SET_RIGHT_RELB(gparent, GET_RIGHT(parent));
- }
- unlock_route_node(parent);
- if (gparent != NULL) {
- unlock_route_node(gparent);
- }
-
- new_root = join_trees(thiz->u.base.root, neighbor->u.base.root);
- new_neighbor = create_wlocked_base_node(tb, new_root,
- LC_ORDER(thiz->u.base.lc_key.term));
-
- if (GET_RIGHT(parent) == neighbor) {
- neighbor_parent = gparent;
- } else {
- neighbor_parent = leftmost_route_node(GET_RIGHT(parent));
- }
- if(neighbor_parent == NULL) {
- SET_ROOT_RELB(tb, new_neighbor);
- } else if (GET_LEFT(neighbor_parent) == neighbor) {
- SET_LEFT_RELB(neighbor_parent, new_neighbor);
- } else {
- SET_RIGHT_RELB(neighbor_parent, new_neighbor);
- }
- wunlock_base_node(&thiz->u.base);
- wunlock_base_node(&neighbor->u.base);
- /* Free the parent and base */
- erts_schedule_db_free(&tb->common,
- do_free_route_node,
- parent,
- &parent->u.route.free_item,
- sizeof_route_node(parent->u.route.key.size));
- erts_schedule_db_free(&tb->common,
- do_free_base_node,
- thiz,
- &thiz->u.base.free_item,
- sizeof_base_node(thiz->u.base.lc_key.size));
- erts_schedule_db_free(&tb->common,
- do_free_base_node,
- neighbor,
- &neighbor->u.base.free_item,
- sizeof_base_node(neighbor->u.base.lc_key.size));
-
- *result_parent_wb = neighbor_parent;
- return new_neighbor;
-}
-
-/*
- * Used to merge together all base nodes for operations such as last
- * and select_*. Returns the base node resulting from the merge in
- * locked state.
- */
-static DbTableCATreeNode *
-merge_to_one_locked_base_node(DbTableCATree* tb)
-{
- DbTableCATreeNode *parent;
- DbTableCATreeNode *new_parent;
- DbTableCATreeNode *base;
- DbTableCATreeNode *new_base;
- int is_not_valid;
- /* Find first base node */
- do {
- parent = NULL;
- base = GET_ROOT_ACQB(tb);
- while ( ! base->is_base_node ) {
- parent = base;
- base = GET_LEFT_ACQB(base);
- }
- wlock_base_node_no_stats(&(base->u.base));
- is_not_valid = ! base->u.base.is_valid;
- if (is_not_valid) {
- wunlock_base_node(&(base->u.base));
- }
- } while(is_not_valid);
- do {
- new_base = erl_db_catree_force_join_right(tb,
- parent,
- base,
- &new_parent);
- if (new_base != NULL) {
- base = new_base;
- parent = new_parent;
- }
- } while(new_base != NULL);
- return base;
-}
-
-
static void join_catree(DbTableCATree *tb,
- DbTableCATreeNode *parent,
- DbTableCATreeNode *thiz)
+ DbTableCATreeNode *thiz,
+ DbTableCATreeNode *parent)
{
DbTableCATreeNode *gparent;
DbTableCATreeNode *neighbor;
@@ -1369,7 +1093,7 @@ static void join_catree(DbTableCATree *tb,
ASSERT(thiz->is_base_node);
if (parent == NULL) {
thiz->u.base.lock_statistics = 0;
- wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(thiz);
return;
}
ASSERT(!parent->is_base_node);
@@ -1378,12 +1102,12 @@ static void join_catree(DbTableCATree *tb,
if (try_wlock_base_node(&neighbor->u.base)) {
/* Failed to acquire lock */
thiz->u.base.lock_statistics = 0;
- wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(thiz);
return;
} else if (!neighbor->u.base.is_valid) {
thiz->u.base.lock_statistics = 0;
- wunlock_base_node(&thiz->u.base);
- wunlock_base_node(&neighbor->u.base);
+ wunlock_base_node(thiz);
+ wunlock_base_node(neighbor);
return;
} else {
lock_route_node(parent);
@@ -1414,8 +1138,7 @@ static void join_catree(DbTableCATree *tb,
{
TreeDbTerm* new_root = join_trees(thiz->u.base.root,
neighbor->u.base.root);
- new_neighbor = create_base_node(tb, new_root,
- LC_ORDER(thiz->u.base.lc_key.term));
+ new_neighbor = create_base_node(tb, new_root);
}
if (GET_RIGHT(parent) == neighbor) {
neighbor_parent = gparent;
@@ -1429,12 +1152,12 @@ static void join_catree(DbTableCATree *tb,
if (try_wlock_base_node(&neighbor->u.base)) {
/* Failed to acquire lock */
thiz->u.base.lock_statistics = 0;
- wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(thiz);
return;
} else if (!neighbor->u.base.is_valid) {
thiz->u.base.lock_statistics = 0;
- wunlock_base_node(&thiz->u.base);
- wunlock_base_node(&neighbor->u.base);
+ wunlock_base_node(thiz);
+ wunlock_base_node(neighbor);
return;
} else {
lock_route_node(parent);
@@ -1467,8 +1190,7 @@ static void join_catree(DbTableCATree *tb,
{
TreeDbTerm* new_root = join_trees(neighbor->u.base.root,
thiz->u.base.root);
- new_neighbor = create_base_node(tb, new_root,
- LC_ORDER(thiz->u.base.lc_key.term));
+ new_neighbor = create_base_node(tb, new_root);
}
if (GET_LEFT(parent) == neighbor) {
neighbor_parent = gparent;
@@ -1486,8 +1208,8 @@ static void join_catree(DbTableCATree *tb,
} else {
SET_RIGHT_RELB(neighbor_parent, new_neighbor);
}
- wunlock_base_node(&thiz->u.base);
- wunlock_base_node(&neighbor->u.base);
+ wunlock_base_node(thiz);
+ wunlock_base_node(neighbor);
/* Free the parent and base */
erts_schedule_db_free(&tb->common,
do_free_route_node,
@@ -1498,25 +1220,27 @@ static void join_catree(DbTableCATree *tb,
do_free_base_node,
thiz,
&thiz->u.base.free_item,
- sizeof_base_node(thiz->u.base.lc_key.size));
+ sizeof_base_node());
erts_schedule_db_free(&tb->common,
do_free_base_node,
neighbor,
&neighbor->u.base.free_item,
- sizeof_base_node(neighbor->u.base.lc_key.size));
+ sizeof_base_node());
}
static void split_catree(DbTableCATree *tb,
- DbTableCATreeNode* ERTS_RESTRICT parent,
- DbTableCATreeNode* ERTS_RESTRICT base) {
+ DbTableCATreeNode* ERTS_RESTRICT base,
+ DbTableCATreeNode* ERTS_RESTRICT parent)
+{
TreeDbTerm *splitOutWriteBack;
DbTableCATreeNode* ERTS_RESTRICT new_left;
DbTableCATreeNode* ERTS_RESTRICT new_right;
DbTableCATreeNode* ERTS_RESTRICT new_route;
if (less_than_two_elements(base->u.base.root)) {
- base->u.base.lock_statistics = 0;
- wunlock_base_node(&base->u.base);
+ if (!(tb->common.status & DB_CATREE_FORCE_SPLIT))
+ base->u.base.lock_statistics = 0;
+ wunlock_base_node(base);
return;
} else {
TreeDbTerm *left_tree;
@@ -1525,10 +1249,8 @@ static void split_catree(DbTableCATree *tb,
split_tree(tb, base->u.base.root, &splitOutWriteBack,
&left_tree, &right_tree);
- new_left = create_base_node(tb, left_tree,
- LC_ORDER(GETKEY(tb, left_tree->dbterm.tpl)));
- new_right = create_base_node(tb, right_tree,
- LC_ORDER(GETKEY(tb, right_tree->dbterm.tpl)));
+ new_left = create_base_node(tb, left_tree);
+ new_right = create_base_node(tb, right_tree);
new_route = create_route_node(tb,
new_left,
new_right,
@@ -1542,12 +1264,12 @@ static void split_catree(DbTableCATree *tb,
SET_RIGHT_RELB(parent, new_route);
}
base->u.base.is_valid = 0;
- wunlock_base_node(&base->u.base);
+ wunlock_base_node(base);
erts_schedule_db_free(&tb->common,
do_free_base_node,
base,
&base->u.base.free_item,
- sizeof_base_node(base->u.base.lc_key.size));
+ sizeof_base_node());
}
}
@@ -1675,7 +1397,9 @@ void db_initialize_catree(void)
int db_create_catree(Process *p, DbTable *tbl)
{
DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *root = create_base_node(tb, NULL, NIL);
+ DbTableCATreeNode *root;
+
+ root = create_base_node(tb, NULL);
tb->deletion = 0;
tb->base_nodes_to_free_list = NULL;
erts_atomic_init_relb(&(tb->root), (erts_aint_t)root);
@@ -1684,162 +1408,416 @@ int db_create_catree(Process *p, DbTable *tbl)
static int db_first_catree(Process *p, DbTable *tbl, Eterm *ret)
{
- DbTableCATreeBaseNode *base_node;
+ TreeDbTerm *root;
+ CATreeRootIterator iter;
int result;
- DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS;
- /* Find first base node */
- base_node = &(lock_first_base_node(tbl, &search_stack, &locked_base_nodes_stack)->u.base);
- /* Find next base node until non-empty base node is found */
- while (base_node != NULL && base_node->root == NULL) {
- base_node = find_and_lock_next_base_node_and_path(tbl, &search_stack_ptr, &search_stack_copy_ptr, locked_base_nodes_stack_ptr);
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ root = *catree_find_first_root(&iter);
+ if (!root) {
+ TreeDbTerm **pp = catree_find_next_root(&iter, NULL);
+ root = pp ? *pp : NULL;
}
- /* Get the return value */
- result = db_first_tree_common(p, tbl, (base_node == NULL ? NULL : base_node->root), ret, NULL);
- /* Unlock base nodes */
- unlock_and_release_locked_base_node_stack(tbl, locked_base_nodes_stack_ptr);
+
+ result = db_first_tree_common(p, tbl, root, ret, NULL);
+
+ destroy_root_iterator(&iter);
return result;
}
static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
- DbTreeStack next_search_stack;
- TreeDbTerm *next_search_stack_array[STACK_NEED];
- DbTableCATreeBaseNode *base_node;
- int result = 0;
- DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS;
- init_tree_stack(&next_search_stack, next_search_stack_array, 0);
- /* Lock base node with key */
- base_node = lock_base_node_with_key(tbl, key, &search_stack, &locked_base_nodes_stack);
- /* Continue until key is not >= greatest key in base_node */
- while (base_node != NULL) {
- result = db_next_tree_common(p, tbl, base_node->root, key,
- ret, &next_search_stack);
- if (result != DB_ERROR_NONE || *ret != am_EOT) {
+ DbTreeStack stack;
+ TreeDbTerm * stack_array[STACK_NEED];
+ TreeDbTerm **rootp;
+ CATreeRootIterator iter;
+ int result;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ iter.next_route_key = key;
+ rootp = catree_find_next_root(&iter, NULL);
+
+ do {
+ init_tree_stack(&stack, stack_array, 0);
+ result = db_next_tree_common(p, tbl, (rootp ? *rootp : NULL), key, ret, &stack);
+ if (result != DB_ERROR_NONE || *ret != am_EOT)
break;
- }
- base_node =
- find_and_lock_next_base_node_and_path(tbl,
- &search_stack_ptr,
- &search_stack_copy_ptr,
- locked_base_nodes_stack_ptr);
- }
- unlock_and_release_locked_base_node_stack(tbl, &locked_base_nodes_stack);
+
+ rootp = catree_find_next_root(&iter, NULL);
+ } while (rootp);
+
+ destroy_root_iterator(&iter);
return result;
}
static int db_last_catree(Process *p, DbTable *tbl, Eterm *ret)
{
- DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree);
- int result = db_last_tree_common(p, tbl, base->u.base.root, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ TreeDbTerm *root;
+ CATreeRootIterator iter;
+ int result;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ root = *catree_find_last_root(&iter);
+ if (!root) {
+ TreeDbTerm **pp = catree_find_prev_root(&iter, NULL);
+ root = pp ? *pp : NULL;
+ }
+
+ result = db_last_tree_common(p, tbl, root, ret, NULL);
+
+ destroy_root_iterator(&iter);
return result;
-
}
static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
+ TreeDbTerm **rootp;
+ CATreeRootIterator iter;
int result;
- DbTableCATreeNode *base;
- init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_prev_tree_common(p, tbl, base->u.base.root, key, ret, &stack);
- wunlock_base_node(&(base->u.base));
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ iter.next_route_key = key;
+ rootp = catree_find_prev_root(&iter, NULL);
+
+ do {
+ init_tree_stack(&stack, stack_array, 0);
+ result = db_prev_tree_common(p, tbl, (rootp ? *rootp : NULL), key, ret,
+ &stack);
+ if (result != DB_ERROR_NONE || *ret != am_EOT)
+ break;
+ rootp = catree_find_prev_root(&iter, NULL);
+ } while (rootp);
+
+ destroy_root_iterator(&iter);
return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_PUT_PARAMS Eterm obj, int key_clash_fail
-ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(put,
- ERL_DB_CATREE_DO_OPERATION_PUT_PARAMS,
- db_put_tree_common(&tb->common,
- &base_node->root,
- obj,
- key_clash_fail,
- NULL);)
-
static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail)
{
DbTableCATree *tb = &tbl->catree;
Eterm key = GETKEY(&tb->common, tuple_val(obj));
- return erl_db_catree_do_operation_put(tb,
- key,
- obj,
- key_clash_fail);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_put_tree_common(&tb->common, &node->u.base.root, obj,
+ key_clash_fail, NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_GET_PARAMS Process *p, Eterm *ret
-ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(get,
- ERL_DB_CATREE_DO_OPERATION_GET_PARAMS,
- db_get_tree_common(p, &tb->common,
- base_node->root,
- key, ret, NULL);)
-
static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_get(tb, key, p, ret);
+ DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
+ int result = db_get_tree_common(p, &tb->common,
+ node->u.base.root,
+ key, ret, NULL);
+ runlock_base_node(node);
+ return result;
+}
+
+TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator* iter)
+{
+ FindBaseNode fbn;
+ DbTableCATreeNode* base_node;
+
+ while (1) {
+ base_node = find_base_node(iter->tb, key, &fbn);
+ lock_iter_base_node(iter, base_node, fbn.parent, fbn.current_level);
+ if (base_node->u.base.is_valid)
+ break;
+ unlock_iter_base_node(iter);
+ }
+ return &base_node->u.base.root;
+}
+
+
+TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key,
+ CATreeRootIterator* iter)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode *parent;
+ DbTableCATreeNode* next_route_node;
+ int current_level;
+
+ ASSERT(!iter->locked_bnode);
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ parent = NULL;
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ current_level++;
+ parent = node;
+ if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) <= 0) {
+ next_route_node = node;
+ node = GET_LEFT_ACQB(node);
+ } else {
+ node = GET_RIGHT_ACQB(node);
+ }
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, node, parent, current_level);
+ if (node->u.base.is_valid) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ return &node->u.base.root;
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+static Eterm copy_iter_search_key(CATreeRootIterator* iter, Eterm key)
+{
+ Uint key_size;
+
+ if (is_immed(key))
+ return key;
+
+ if (iter->search_key) {
+ ASSERT(key != iter->search_key->term);
+ destroy_route_key(iter->search_key);
+ }
+ key_size = size_object(key);
+ if (!iter->search_key || key_size > iter->search_key->size) {
+ iter->search_key = erts_realloc(ERTS_ALC_T_DB_TMP,
+ iter->search_key,
+ (offsetof(DbRouteKey, heap)
+ + key_size*sizeof(Eterm)));
+ }
+ return copy_route_key(iter->search_key, key, key_size);
}
-#define ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS Eterm *ret
-ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(member,
- ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS,
- db_member_tree_common(&tb->common,
- base_node->root,
- key,
- ret,
- NULL);)
+TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter, int next,
+ Eterm *keyp)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_invalid = NULL;
+ DbTableCATreeNode *rejected_empty = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode *parent;
+ DbTableCATreeNode* next_route_node;
+ Eterm key = iter->next_route_key;
+ int current_level;
+
+ if (iter->locked_bnode) {
+ if (keyp)
+ *keyp = copy_iter_search_key(iter, *keyp);
+ unlock_iter_base_node(iter);
+ }
+
+ if (is_non_value(key))
+ return NULL;
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ parent = NULL;
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ current_level++;
+ parent = node;
+ if (next) {
+ if (cmp_key_route(key,node) < 0) {
+ next_route_node = node;
+ node = GET_LEFT_ACQB(node);
+ } else {
+ node = GET_RIGHT_ACQB(node);
+ }
+ }
+ else {
+ if (cmp_key_route(key,node) > 0) {
+ next_route_node = node;
+ node = GET_RIGHT_ACQB(node);
+ } else {
+ node = GET_LEFT_ACQB(node);
+ }
+ }
+ }
+ ASSERT(node != rejected_invalid);
+ lock_iter_base_node(iter, node, parent, current_level);
+ if (node->u.base.is_valid) {
+ ASSERT(node != rejected_empty);
+ if (node->u.base.root) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ iter->locked_bnode = node;
+ return &node->u.base.root;
+ }
+ if (!next_route_node) {
+ unlock_iter_base_node(iter);
+ return NULL;
+ }
+ key = next_route_node->u.route.key.term;
+ IF_DEBUG(rejected_empty = node);
+ }
+ else
+ IF_DEBUG(rejected_invalid = node);
+
+ /* Retry */
+ unlock_iter_base_node(iter);
+ }
+}
+
+TreeDbTerm** catree_find_next_root(CATreeRootIterator *iter, Eterm* keyp)
+{
+ return catree_find_nextprev_root(iter, 1, keyp);
+}
+
+TreeDbTerm** catree_find_prev_root(CATreeRootIterator *iter, Eterm* keyp)
+{
+ return catree_find_nextprev_root(iter, 0, keyp);
+}
+
+
+TreeDbTerm** catree_find_next_from_pb_key_root(Eterm key,
+ CATreeRootIterator* iter)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode *parent;
+ DbTableCATreeNode* next_route_node;
+ int current_level;
+
+ ASSERT(!iter->locked_bnode);
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ parent = NULL;
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ current_level++;
+ parent = node;
+ if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) >= 0) {
+ next_route_node = node;
+ node = GET_RIGHT_ACQB(node);
+ } else {
+ node = GET_LEFT_ACQB(node);
+ }
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, node, parent, current_level);
+ if (node->u.base.is_valid) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ return &node->u.base.root;
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+static TreeDbTerm** catree_find_firstlast_root(CATreeRootIterator* iter,
+ int first)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode* next_route_node;
+ int current_level;
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ current_level++;
+ next_route_node = node;
+ node = first ? GET_LEFT_ACQB(node) : GET_RIGHT_ACQB(node);
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, node, next_route_node, current_level);
+ if (node->u.base.is_valid) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ return &node->u.base.root;
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+TreeDbTerm** catree_find_first_root(CATreeRootIterator* iter)
+{
+ return catree_find_firstlast_root(iter, 1);
+}
+
+TreeDbTerm** catree_find_last_root(CATreeRootIterator* iter)
+{
+ return catree_find_firstlast_root(iter, 0);
+}
static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_member(tb, key, ret);
+ DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
+ int result = db_member_tree_common(&tb->common,
+ node->u.base.root,
+ key, ret, NULL);
+ runlock_base_node(node);
+ return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_GET_ELEMENT_PARAMS Process *p, int ndex, Eterm *ret
-ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(get_element,
- ERL_DB_CATREE_DO_OPERATION_GET_ELEMENT_PARAMS,
- db_get_element_tree_common(p, &tb->common,
- base_node->root,
- key, ndex,
- ret, NULL))
-
static int db_get_element_catree(Process *p, DbTable *tbl,
Eterm key, int ndex, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_get_element(tb, key, p, ndex, ret);
+ DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
+ int result = db_get_element_tree_common(p, &tb->common,
+ node->u.base.root,
+ key, ndex, ret, NULL);
+ runlock_base_node(node);
+ return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_ERASE_PARAMS DbTable *tbl, Eterm *ret
-ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(erase,
- ERL_DB_CATREE_DO_OPERATION_ERASE_PARAMS,
- db_erase_tree_common(tbl,
- &base_node->root,
- key,
- ret,
- NULL);)
-
static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_erase(tb, key, tbl, ret);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_erase_tree_common(tbl, &node->u.base.root, key,
+ ret, NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_ERASE_OBJECT_PARAMS DbTable *tbl, Eterm object, Eterm *ret
-ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(erase_object,
- ERL_DB_CATREE_DO_OPERATION_ERASE_OBJECT_PARAMS,
- db_erase_object_tree_common(tbl,
- &base_node->root,
- object,
- ret,
- NULL);)
-
static int db_erase_object_catree(DbTable *tbl, Eterm object, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
Eterm key = GETKEY(&tb->common, tuple_val(object));
- return erl_db_catree_do_operation_erase_object(tb, key, tbl, object, ret);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_erase_object_tree_common(tbl,
+ &node->u.base.root,
+ object,
+ ret,
+ NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
}
@@ -1847,11 +1825,12 @@ static int db_slot_catree(Process *p, DbTable *tbl,
Eterm slot_term, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_slot_tree_common(p, tbl, base->u.base.root,
- slot_term, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_slot_tree_common(p, tbl, *catree_find_first_root(&iter),
+ slot_term, ret, NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1861,25 +1840,25 @@ static int db_select_continue_catree(Process *p,
Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_continue_tree_common(p, &tbl->common, &base->u.base.root,
- continuation, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_continue_tree_common(p, &tbl->common,
+ continuation, ret, NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
-
static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, int reverse, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_tree_common(p, &tbl->common, &base->u.base.root,
- tid, pattern, reverse, ret,
- NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_tree_common(p, tbl, tid, pattern, reverse, ret,
+ NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1889,12 +1868,13 @@ static int db_select_count_continue_catree(Process *p,
Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_count_continue_tree_common(p, &tbl->common,
- &base->u.base.root,
- continuation, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_count_continue_tree_common(p, tbl,
+ continuation, ret, NULL,
+ &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1902,11 +1882,12 @@ static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_count_tree_common(p, &tbl->common, &base->u.base.root,
- tid, pattern, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_count_tree_common(p, tbl,
+ tid, pattern, ret, NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1915,11 +1896,13 @@ static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
int reversed, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_chunk_tree_common(p, &tbl->common, &base->u.base.root,
- tid, pattern, chunk_size, reversed, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_chunk_tree_common(p, tbl,
+ tid, pattern, chunk_size, reversed, ret,
+ NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1931,12 +1914,13 @@ static int db_select_delete_continue_catree(Process *p,
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
int result;
- DbTableCATreeNode *base;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_delete_continue_tree_common(p, tbl, &base->u.base.root,
- continuation, ret, &stack);
- wunlock_base_node(&(base->u.base));
+ result = db_select_delete_continue_tree_common(p, tbl, continuation, ret,
+ &stack, &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1946,12 +1930,14 @@ static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
int result;
- DbTableCATreeNode *base;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_delete_tree_common(p, tbl, &base->u.base.root,
- tid, pattern, ret, &stack);
- wunlock_base_node(&(base->u.base));
+ result = db_select_delete_tree_common(p, tbl,
+ tid, pattern, ret, &stack,
+ &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1959,12 +1945,12 @@ static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_replace_tree_common(p, &tbl->common,
- &base->u.base.root,
- tid, pattern, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
+ result = db_select_replace_tree_common(p, tbl,
+ tid, pattern, ret, NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1972,26 +1958,24 @@ static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_replace_continue_tree_common(p, &tbl->common,
- &base->u.base.root,
- continuation, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
+ result = db_select_replace_continue_tree_common(p, tbl, continuation, ret,
+ NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_TAKE_PARAMS Process *p, Eterm *ret, DbTable *tbl
-ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(take,
- ERL_DB_CATREE_DO_OPERATION_TAKE_PARAMS,
- db_take_tree_common(p, tbl,
- &base_node->root,
- key, ret, NULL);)
-
static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_take(tb, key, p, ret, tbl);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_take_tree_common(p, tbl, &node->u.base.root, key,
+ ret, NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
}
/*
@@ -2003,9 +1987,16 @@ static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
static void db_print_catree(fmtfn_t to, void *to_arg,
int show, DbTable *tbl)
{
- DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree);
- db_print_tree_common(to, to_arg, show, base->u.base.root, tbl);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+ TreeDbTerm** root;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ root = catree_find_first_root(&iter);
+ do {
+ db_print_tree_common(to, to_arg, show, *root, tbl);
+ root = catree_find_next_root(&iter, NULL);
+ } while (root);
+ destroy_root_iterator(&iter);
}
/* Release all memory occupied by a single table */
@@ -2081,25 +2072,33 @@ static void db_foreach_offheap_catree(DbTable *tbl,
void (*func)(ErlOffHeap *, void *),
void *arg)
{
- DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree);
- db_foreach_offheap_tree_common(base->u.base.root, func, arg);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+ TreeDbTerm** root;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ root = catree_find_first_root(&iter);
+ do {
+ db_foreach_offheap_tree_common(*root, func, arg);
+ root = catree_find_next_root(&iter, NULL);
+ } while (root);
+ destroy_root_iterator(&iter);
}
static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
DbUpdateHandle *handle)
{
DbTableCATree *tb = &tbl->catree;
- int res;
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node);
- res = db_lookup_dbterm_tree_common(p, tbl, &base_node->root, key, obj, handle, NULL);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int res = db_lookup_dbterm_tree_common(p, tbl, &node->u.base.root, key,
+ obj, handle, NULL);
if (res == 0) {
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART;
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
} else {
/* db_finalize_dbterm_catree will unlock */
- handle->lck = prev_node;
- handle->lck2 = current_node;
- handle->current_level = current_level;
+ handle->u.catree.base_node = node;
+ handle->u.catree.parent = fbn.parent;
+ handle->u.catree.current_level = fbn.current_level;
}
return res;
}
@@ -2107,12 +2106,10 @@ static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm ob
static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *handle)
{
DbTableCATree *tb = &(handle->tb->catree);
- DbTableCATreeNode *prev_node = handle->lck;
- DbTableCATreeNode *current_node = handle->lck2;
- int current_level = handle->current_level;
- DbTableCATreeBaseNode *base_node = &current_node->u.base;
db_finalize_dbterm_tree_common(cret, handle, NULL);
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART;
+ wunlock_adapt_base_node(tb, handle->u.catree.base_node,
+ handle->u.catree.parent,
+ handle->u.catree.current_level);
return;
}
@@ -2146,6 +2143,61 @@ void erts_lcnt_enable_db_catree_lock_count(DbTableCATree *tb, int enable)
}
#endif /* ERTS_ENABLE_LOCK_COUNT */
+void db_catree_force_split(DbTableCATree* tb, int on)
+{
+ CATreeRootIterator iter;
+ TreeDbTerm** root;
+
+ init_root_iterator(tb, &iter, 1);
+ root = catree_find_first_root(&iter);
+ do {
+ iter.locked_bnode->u.base.lock_statistics = (on ? INT_MAX : 0);
+ root = catree_find_next_root(&iter, NULL);
+ } while (root);
+ destroy_root_iterator(&iter);
+
+ if (on)
+ tb->common.status |= DB_CATREE_FORCE_SPLIT;
+ else
+ tb->common.status &= ~DB_CATREE_FORCE_SPLIT;
+}
+
+void db_calc_stats_catree(DbTableCATree* tb, DbCATreeStats* stats)
+{
+ DbTableCATreeNode* stack[ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT];
+ DbTableCATreeNode* node;
+ Uint depth = 0;
+
+ stats->route_nodes = 0;
+ stats->base_nodes = 0;
+ stats->max_depth = 0;
+
+ node = GET_ROOT(tb);
+ do {
+ while (!node->is_base_node) {
+ stats->route_nodes++;
+ ASSERT(depth < sizeof(stack)/sizeof(*stack));
+ stack[depth++] = node; /* PUSH parent */
+ if (stats->max_depth < depth)
+ stats->max_depth = depth;
+ node = GET_LEFT(node);
+ }
+ stats->base_nodes++;
+
+ while (depth > 0) {
+ DbTableCATreeNode* parent = stack[depth-1];
+ if (node == GET_LEFT(parent)) {
+ node = GET_RIGHT(parent);
+ break;
+ }
+ else {
+ ASSERT(node == GET_RIGHT(parent));
+ node = parent;
+ depth--; /* POP parent */
+ }
+ }
+ } while (depth > 0);
+}
#ifdef HARDDEBUG