aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_db_catree.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_db_catree.c')
-rw-r--r--erts/emulator/beam/erl_db_catree.c1939
1 files changed, 1004 insertions, 935 deletions
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c
index 37a299df35..639c7e5e03 100644
--- a/erts/emulator/beam/erl_db_catree.c
+++ b/erts/emulator/beam/erl_db_catree.c
@@ -96,6 +96,12 @@
#include "erl_db_tree.h"
#include "erl_db_tree_util.h"
+#ifdef DEBUG
+# define IF_DEBUG(X) X
+#else
+# define IF_DEBUG(X)
+#endif
+
/*
** Forward declarations
*/
@@ -158,6 +164,14 @@ db_lookup_dbterm_catree(Process *, DbTable *, Eterm key, Eterm obj,
DbUpdateHandle*);
static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *);
+static void split_catree(DbTableCATree *tb,
+ DbTableCATreeNode* ERTS_RESTRICT base,
+ DbTableCATreeNode* ERTS_RESTRICT parent);
+static void join_catree(DbTableCATree *tb,
+ DbTableCATreeNode *thiz,
+ DbTableCATreeNode *parent);
+
+
/*
** External interface
*/
@@ -210,62 +224,37 @@ DbTableMethod db_catree =
* Internal CA tree related helper functions and macros
*/
-#define GET_ROUTE_NODE_KEY(node) (node->baseOrRoute.route.key.tpl[0])
-#define GET_BASE_NODE_LOCK(node) (&(node->baseOrRoute.base.lock))
-#define GET_ROUTE_NODE_LOCK(node) (&(node->baseOrRoute.route.lock))
+#define GET_ROUTE_NODE_KEY(node) (node->u.route.key.term)
+#define GET_BASE_NODE_LOCK(node) (&(node->u.base.lock))
+#define GET_ROUTE_NODE_LOCK(node) (&(node->u.route.lock))
/* Helpers for reading and writing shared atomic variables */
/* No memory barrier */
#define GET_ROOT(tb) ((DbTableCATreeNode*)erts_atomic_read_nob(&(tb->root)))
-#define GET_LEFT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->baseOrRoute.route.left)))
-#define GET_RIGHT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->baseOrRoute.route.right)))
+#define GET_LEFT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->u.route.left)))
+#define GET_RIGHT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->u.route.right)))
#define SET_ROOT(tb, v) erts_atomic_set_nob(&((tb)->root), (erts_aint_t)(v))
-#define SET_LEFT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->baseOrRoute.route.left), (erts_aint_t)(v));
-#define SET_RIGHT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->baseOrRoute.route.right), (erts_aint_t)(v));
+#define SET_LEFT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->u.route.left), (erts_aint_t)(v));
+#define SET_RIGHT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v));
/* Release or acquire barriers */
#define GET_ROOT_ACQB(tb) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(tb->root)))
-#define GET_LEFT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->baseOrRoute.route.left)))
-#define GET_RIGHT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->baseOrRoute.route.right)))
+#define GET_LEFT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->u.route.left)))
+#define GET_RIGHT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->u.route.right)))
#define SET_ROOT_RELB(tb, v) erts_atomic_set_relb(&((tb)->root), (erts_aint_t)(v))
-#define SET_LEFT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->baseOrRoute.route.left), (erts_aint_t)(v));
-#define SET_RIGHT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->baseOrRoute.route.right), (erts_aint_t)(v));
+#define SET_LEFT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.left), (erts_aint_t)(v));
+#define SET_RIGHT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v));
/* Compares a key to the key in a route node */
-static ERTS_INLINE Sint cmp_key_route(DbTableCommon * tb,
- Eterm key,
+static ERTS_INLINE Sint cmp_key_route(Eterm key,
DbTableCATreeNode *obj)
{
return CMP(key, GET_ROUTE_NODE_KEY(obj));
}
-static ERTS_INLINE void push_node_dyn_array(DbTable *tb,
- CATreeNodeStack *stack,
- DbTableCATreeNode *node)
-{
- int i;
- if (stack->pos == stack->size) {
- DbTableCATreeNode **newArray =
- erts_db_alloc(ERTS_ALC_T_DB_STK, tb,
- sizeof(DbTableCATreeNode*) * (stack->size*2));
- for (i = 0; i < stack->pos; i++) {
- newArray[i] = stack->array[i];
- }
- if (stack->size > STACK_NEED) {
- /* Dynamically allocated array that needs to be deallocated */
- erts_db_free(ERTS_ALC_T_DB_STK, tb,
- stack->array,
- sizeof(DbTableCATreeNode *) * stack->size);
- }
- stack->array = newArray;
- stack->size = stack->size*2;
- }
- PUSH_NODE(stack, node);
-}
-
/*
* Used by the split_tree function
*/
@@ -279,7 +268,7 @@ int less_than_two_elements(TreeDbTerm *root)
* Inserts a TreeDbTerm into a tree. Returns the new root.
*/
static ERTS_INLINE
-TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data,
+TreeDbTerm* insert_TreeDbTerm(DbTableCATree *tb,
TreeDbTerm *insert_to_root,
TreeDbTerm *value_to_insert) {
/* Non recursive insertion in AVL tree, building our own stack */
@@ -295,7 +284,7 @@ TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data,
int dir;
TreeDbTerm *p1, *p2, *p;
- key = GETKEY(common_table_data, value_to_insert->dbterm.tpl);
+ key = GETKEY(tb, value_to_insert->dbterm.tpl);
dstack[dpos++] = DIR_END;
for (;;)
@@ -305,7 +294,7 @@ TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data,
(*this)->balance = 0;
(*this)->left = (*this)->right = NULL;
break;
- } else if ((c = cmp_key(common_table_data, key, *this)) < 0) {
+ } else if ((c = cmp_key(&tb->common, key, *this)) < 0) {
/* go lefts */
dstack[dpos++] = DIR_LEFT;
tstack[tpos++] = this;
@@ -392,7 +381,7 @@ TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data,
* left_wb and the tree containing the rest of the keys in the write
* back parameter right_wb.
*/
-static void split_tree(DbTableCommon *tb,
+static void split_tree(DbTableCATree *tb,
TreeDbTerm *root,
TreeDbTerm **split_key_node_wb,
TreeDbTerm **left_wb,
@@ -413,9 +402,7 @@ static void split_tree(DbTableCommon *tb,
split_node->left = NULL;
right_root = split_node->right;
split_node->right = NULL;
- right_root = insert_TreeDbTerm(tb,
- right_root,
- split_node);
+ right_root = insert_TreeDbTerm(tb, right_root, split_node);
*split_key_node_wb = split_node;
*left_wb = left_root;
*right_wb = right_root;
@@ -581,14 +568,14 @@ static TreeDbTerm* join_trees(TreeDbTerm *left_root_param,
p = *this;
if (dir == DIR_LEFT) {
switch (p->balance) {
- case 1:
+ case 1:
p->balance = 0;
state = 0;
break;
- case 0:
+ case 0:
p->balance = -1;
break;
- case -1: /* The icky case */
+ case -1: /* The icky case */
p1 = p->left;
if (p1->balance == -1) { /* Single LL rotation */
p->left = p1->right;
@@ -611,14 +598,14 @@ static TreeDbTerm* join_trees(TreeDbTerm *left_root_param,
}
} else { /* dir == DIR_RIGHT */
switch (p->balance) {
- case -1:
+ case -1:
p->balance = 0;
state = 0;
break;
- case 0:
+ case 0:
p->balance = 1;
break;
- case 1:
+ case 1:
p1 = p->right;
if (p1->balance == 1) { /* Single RR rotation */
p->right = p1->left;
@@ -649,6 +636,31 @@ static TreeDbTerm* join_trees(TreeDbTerm *left_root_param,
}
}
+#ifdef DEBUG
+# define FORCE_RANDOM_SPLIT_JOIN
+#endif
+#ifdef FORCE_RANDOM_SPLIT_JOIN
+static int dbg_fastrand(void)
+{
+ static int g_seed = 648835;
+ g_seed = (214013*g_seed+2531011);
+ return (g_seed>>16)&0x7FFF;
+}
+
+static void dbg_maybe_force_splitjoin(DbTableCATreeNode* base_node)
+{
+ switch (dbg_fastrand() % 8) {
+ case 1:
+ base_node->u.base.lock_statistics = 1+ERL_DB_CATREE_HIGH_CONTENTION_LIMIT;
+ break;
+ case 2:
+ base_node->u.base.lock_statistics = -1+ERL_DB_CATREE_LOW_CONTENTION_LIMIT;
+ break;
+ }
+}
+#else
+# define dbg_maybe_force_splitjoin(N)
+#endif /* FORCE_RANDOM_SPLIT_JOIN */
static ERTS_INLINE
int try_wlock_base_node(DbTableCATreeBaseNode *base_node)
@@ -660,9 +672,10 @@ int try_wlock_base_node(DbTableCATreeBaseNode *base_node)
* Locks a base node without adjusting the lock statistics
*/
static ERTS_INLINE
-void wlock_base_node_no_stats(DbTableCATreeBaseNode *base_node)
+void wlock_base_node_no_stats(DbTableCATreeNode *base_node)
{
- erts_rwmtx_rwlock(&base_node->lock);
+ ASSERT(base_node->is_base_node);
+ erts_rwmtx_rwlock(&base_node->u.base.lock);
}
/*
@@ -670,255 +683,345 @@ void wlock_base_node_no_stats(DbTableCATreeBaseNode *base_node)
* the lock was contended or not
*/
static ERTS_INLINE
-void wlock_base_node(DbTableCATreeBaseNode *base_node)
+void wlock_base_node(DbTableCATreeNode *base_node)
{
- if (try_wlock_base_node(base_node)) {
+ ASSERT(base_node->is_base_node);
+ if (try_wlock_base_node(&base_node->u.base)) {
/* The lock is contended */
wlock_base_node_no_stats(base_node);
- base_node->lock_statistics += ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION;
+ base_node->u.base.lock_statistics += ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION;
} else {
- base_node->lock_statistics += ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION;
+ base_node->u.base.lock_statistics += ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION;
}
}
static ERTS_INLINE
-void wunlock_base_node(DbTableCATreeBaseNode *base_node)
+void wunlock_base_node(DbTableCATreeNode *base_node)
{
- erts_rwmtx_rwunlock(&base_node->lock);
+ erts_rwmtx_rwunlock(&base_node->u.base.lock);
}
static ERTS_INLINE
-void rlock_base_node(DbTableCATreeBaseNode *base_node)
+void wunlock_adapt_base_node(DbTableCATree* tb,
+ DbTableCATreeNode* node,
+ DbTableCATreeNode* parent,
+ int current_level)
+{
+ dbg_maybe_force_splitjoin(node);
+ if ((!node->u.base.root && parent && !(tb->common.status
+ & DB_CATREE_FORCE_SPLIT))
+ || node->u.base.lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) {
+ join_catree(tb, node, parent);
+ }
+ else if (node->u.base.lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT
+ && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) {
+ split_catree(tb, node, parent);
+ }
+ else {
+ wunlock_base_node(node);
+ }
+}
+
+static ERTS_INLINE
+void rlock_base_node(DbTableCATreeNode *base_node)
{
- erts_rwmtx_rlock(&base_node->lock);
+ ASSERT(base_node->is_base_node);
+ erts_rwmtx_rlock(&base_node->u.base.lock);
}
static ERTS_INLINE
-void runlock_base_node(DbTableCATreeBaseNode *base_node)
+void runlock_base_node(DbTableCATreeNode *base_node)
{
- erts_rwmtx_runlock(&base_node->lock);
+ ASSERT(base_node->is_base_node);
+ erts_rwmtx_runlock(&base_node->u.base.lock);
}
static ERTS_INLINE
-void lock_route_node(DbTableCATreeRouteNode *route_node)
+void lock_route_node(DbTableCATreeNode *route_node)
{
- erts_mtx_lock(&route_node->lock);
+ ASSERT(!route_node->is_base_node);
+ erts_mtx_lock(&route_node->u.route.lock);
}
static ERTS_INLINE
-void unlock_route_node(DbTableCATreeRouteNode *route_node)
+void unlock_route_node(DbTableCATreeNode *route_node)
{
- erts_mtx_unlock(&route_node->lock);
+ ASSERT(!route_node->is_base_node);
+ erts_mtx_unlock(&route_node->u.route.lock);
}
+static ERTS_INLINE
+void init_root_iterator(DbTableCATree* tb, CATreeRootIterator* iter,
+ int read_only)
+{
+ iter->tb = tb;
+ iter->read_only = read_only;
+ iter->locked_bnode = NULL;
+ iter->next_route_key = THE_NON_VALUE;
+ iter->search_key = NULL;
+}
-/*
- * The following macros are used to create the ETS functions that only
- * need to access one element (e.g. db_put_catree, db_get_catree and
- * db_erase_catree).
- */
+static ERTS_INLINE
+void lock_iter_base_node(CATreeRootIterator* iter,
+ DbTableCATreeNode *base_node,
+ DbTableCATreeNode *parent,
+ int current_level)
+{
+ ASSERT(!iter->locked_bnode);
+ if (iter->read_only)
+ rlock_base_node(base_node);
+ else {
+ wlock_base_node(base_node);
+ iter->bnode_parent = parent;
+ iter->bnode_level = current_level;
+ }
+ iter->locked_bnode = base_node;
+}
-#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(LOCK,UNLOCK) \
- int retry; \
- DbTableCATreeNode *current_node; \
- DbTableCATreeNode *prev_node; \
- DbTableCommon* common_table_data = &tb->common; \
- DbTableCATreeBaseNode *base_node; \
- int current_level; \
- (void)prev_node; \
- do { \
- retry = 0; \
- current_node = GET_ROOT_ACQB(tb); \
- prev_node = NULL; \
- current_level = 0; \
- while ( ! current_node->is_base_node ) { \
- current_level = current_level + 1; \
- prev_node = current_node; \
- if (cmp_key_route(common_table_data,key,current_node) < 0) { \
- current_node = GET_LEFT_ACQB(current_node); \
- } else { \
- current_node = GET_RIGHT_ACQB(current_node); \
- } \
- } \
- base_node = &current_node->baseOrRoute.base; \
- LOCK(base_node); \
- if ( ! base_node->is_valid ) { \
- /* Retry */ \
- UNLOCK(base_node); \
- retry = 1; \
- } \
- } while(retry);
-
-
-#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \
- if (base_node->lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT \
- && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) { \
- split_catree(&tb->common, prev_node, current_node); \
- } else if (base_node->lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) { \
- join_catree(tb, prev_node, current_node); \
- } else { \
- wunlock_base_node(base_node); \
+static ERTS_INLINE
+void unlock_iter_base_node(CATreeRootIterator* iter)
+{
+ ASSERT(iter->locked_bnode);
+ if (iter->read_only)
+ runlock_base_node(iter->locked_bnode);
+ else if (iter->locked_bnode->u.base.is_valid) {
+ wunlock_adapt_base_node(iter->tb, iter->locked_bnode,
+ iter->bnode_parent, iter->bnode_level);
}
+ else
+ wunlock_base_node(iter->locked_bnode);
+ iter->locked_bnode = NULL;
+}
+
+static ERTS_INLINE
+void destroy_root_iterator(CATreeRootIterator* iter)
+{
+ if (iter->locked_bnode)
+ unlock_iter_base_node(iter);
+ if (iter->search_key)
+ erts_free(ERTS_ALC_T_DB_TMP, iter->search_key);
+}
+
+
+typedef struct
+{
+ DbTableCATreeNode *parent;
+ int current_level;
+} FindBaseNode;
-#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \
- static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \
- Eterm key, \
- PARAMETERS){ \
- int result; \
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node) \
- result = SEQUENTAIL_CALL; \
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \
- return result; \
+static ERTS_INLINE
+DbTableCATreeNode* find_base_node(DbTableCATree* tb, Eterm key,
+ FindBaseNode* fbn)
+{
+ DbTableCATreeNode* ERTS_RESTRICT node = GET_ROOT_ACQB(tb);
+ if (fbn) {
+ fbn->parent = NULL;
+ fbn->current_level = 0;
}
+ while (!node->is_base_node) {
+ if (fbn) {
+ fbn->current_level++;
+ fbn->parent = node;
+ }
+ if (cmp_key_route(key, node) < 0) {
+ node = GET_LEFT_ACQB(node);
+ } else {
+ node = GET_RIGHT_ACQB(node);
+ }
+ }
+ return node;
+}
+static ERTS_INLINE
+DbTableCATreeNode* find_rlock_valid_base_node(DbTableCATree* tb, Eterm key)
+{
+ DbTableCATreeNode* base_node;
-#define ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \
- static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \
- Eterm key, \
- PARAMETERS){ \
- int result; \
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(rlock_base_node,runlock_base_node) \
- result = SEQUENTAIL_CALL; \
- runlock_base_node(base_node); \
- return result; \
+ while (1) {
+ base_node = find_base_node(tb, key, NULL);
+ rlock_base_node(base_node);
+ if (base_node->u.base.is_valid)
+ break;
+ runlock_base_node(base_node);
}
+ return base_node;
+}
+static ERTS_INLINE
+DbTableCATreeNode* find_wlock_valid_base_node(DbTableCATree* tb, Eterm key,
+ FindBaseNode* fbn)
+{
+ DbTableCATreeNode* base_node;
+
+ while (1) {
+ base_node = find_base_node(tb, key, fbn);
+ wlock_base_node(base_node);
+ if (base_node->u.base.is_valid)
+ break;
+ wunlock_base_node(base_node);
+ }
+ return base_node;
+}
+static ERTS_INLINE
+Eterm copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size)
+{
+ dst->size = key_size;
+ if (key_size != 0) {
+ Eterm* hp = &dst->heap[0];
+ ErlOffHeap tmp_offheap;
+ tmp_offheap.first = NULL;
+ dst->term = copy_struct(key, key_size, &hp, &tmp_offheap);
+ dst->oh = tmp_offheap.first;
+ }
+ else {
+ ASSERT(is_immed(key));
+ dst->term = key;
+ dst->oh = NULL;
+ }
+ return dst->term;
+}
-static DbTableCATreeNode *create_catree_base_node(DbTableCATree *tb)
+static ERTS_INLINE
+void destroy_route_key(DbRouteKey* key)
{
- DbTableCATreeNode *new_base_node_container =
- erts_db_alloc(ERTS_ALC_T_DB_TABLE,
- (DbTable *) tb,
- sizeof(DbTableCATreeNode));
- DbTableCATreeBaseNode *new_base_node =
- &new_base_node_container->baseOrRoute.base;
+ if (key->oh) {
+ ErlOffHeap oh;
+ oh.first = key->oh;
+ erts_cleanup_offheap(&oh);
+ }
+}
+
+
+#ifdef ERTS_ENABLE_LOCK_CHECK
+# define LC_ORDER(ORDER) ORDER
+#else
+# define LC_ORDER(ORDER) NIL
+#endif
+
+#define sizeof_base_node() \
+ offsetof(DbTableCATreeNode, u.base.end_of_struct__)
+
+static DbTableCATreeNode *create_base_node(DbTableCATree *tb,
+ TreeDbTerm* root)
+{
+ DbTableCATreeNode *p;
erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
- new_base_node_container->is_base_node = 1;
- new_base_node->root = NULL;
+ p = erts_db_alloc(ERTS_ALC_T_DB_TABLE, (DbTable *) tb,
+ sizeof_base_node());
+
+ p->is_base_node = 1;
+ p->u.base.root = root;
if (tb->common.type & DB_FREQ_READ)
rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
if (erts_ets_rwmtx_spin_count >= 0)
rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
- erts_rwmtx_init_opt(&new_base_node->lock, &rwmtx_opt,
- "erl_db_catree_base_node", tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB);
- new_base_node->lock_statistics = 0;
- new_base_node->is_valid = 1;
- new_base_node->tab = (DbTable *) tb;
- return new_base_node_container;
+
+ erts_rwmtx_init_opt(&p->u.base.lock, &rwmtx_opt,
+ "erl_db_catree_base_node",
+ NIL,
+ ERTS_LOCK_FLAGS_CATEGORY_DB);
+ p->u.base.lock_statistics = ((tb->common.status & DB_CATREE_FORCE_SPLIT)
+ ? INT_MAX : 0);
+ p->u.base.is_valid = 1;
+ return p;
+}
+
+static ERTS_INLINE Uint sizeof_route_node(Uint key_size)
+{
+ return (offsetof(DbTableCATreeNode, u.route.key.heap)
+ + key_size*sizeof(Eterm));
}
static DbTableCATreeNode*
-create_catree_route_node(DbTableCommon * common_table_data,
- DbTableCATreeNode *left,
- DbTableCATreeNode *right,
- DbTerm * keyTerm)
+create_route_node(DbTableCATree *tb,
+ DbTableCATreeNode *left,
+ DbTableCATreeNode *right,
+ DbTerm * keyTerm,
+ DbTableCATreeNode* lc_parent)
{
- Eterm* top;
- Eterm key = GETKEY(common_table_data,keyTerm->tpl);
+ Eterm key = GETKEY(tb,keyTerm->tpl);
int key_size = size_object(key);
- Uint offset = offsetof(DbTableCATreeNode,baseOrRoute) +
- offsetof(DbTableCATreeRouteNode,key);
- size_t route_node_container_size =
- offset +
- sizeof(DbTerm) +
- sizeof(Eterm)*key_size;
- ErlOffHeap tmp_offheap;
- byte* new_route_node_container_bytes =
- erts_db_alloc(ERTS_ALC_T_DB_TABLE,
- (DbTable *) common_table_data,
- route_node_container_size);
- DbTerm* newp = (DbTerm*) (new_route_node_container_bytes + offset);
- DbTableCATreeNode *new_route_node_container =
- (DbTableCATreeNode*)new_route_node_container_bytes;
- DbTableCATreeRouteNode *new_route_node =
- &new_route_node_container->baseOrRoute.route;
- new_route_node->tab = (DbTable *)common_table_data;
- if (key_size != 0) {
- newp->size = key_size;
- top = &newp->tpl[1];
- tmp_offheap.first = NULL;
- newp->tpl[0] = copy_struct(key, key_size, &top, &tmp_offheap);
- newp->first_oh = tmp_offheap.first;
- } else {
- newp->size = key_size;
- newp->first_oh = NULL;
- newp->tpl[0] = key;
- }
- new_route_node_container->is_base_node = 0;
- new_route_node->is_valid = 1;
- erts_atomic_init_nob(&(new_route_node->left), (erts_aint_t)left);
- erts_atomic_init_nob(&(new_route_node->right), (erts_aint_t)right);
- erts_mtx_init(&new_route_node->lock, "erl_db_catree_route_node",
- NIL, ERTS_LOCK_FLAGS_CATEGORY_DB);
- return new_route_node_container;
+ DbTableCATreeNode* p = erts_db_alloc(ERTS_ALC_T_DB_TABLE,
+ (DbTable *) tb,
+ sizeof_route_node(key_size));
+
+ copy_route_key(&p->u.route.key, key, key_size);
+ p->is_base_node = 0;
+ p->u.route.is_valid = 1;
+ erts_atomic_init_nob(&p->u.route.left, (erts_aint_t)left);
+ erts_atomic_init_nob(&p->u.route.right, (erts_aint_t)right);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ /* Route node lock order is inverse tree depth (from leafs toward root) */
+ p->u.route.lc_order = (lc_parent == NULL ? MAX_SMALL :
+ lc_parent->u.route.lc_order - 1);
+ /*
+ * This assert may eventually fail as we don't increase 'lc_order' in join
+ * operations when route nodes move up in the tree.
+ * Tough luck if you run a lock-checking VM for such a long time on 32-bit.
+ */
+ ERTS_LC_ASSERT(p->u.route.lc_order >= 0);
+#endif
+ erts_mtx_init(&p->u.route.lock, "erl_db_catree_route_node",
+ LC_ORDER(make_small(p->u.route.lc_order)),
+ ERTS_LOCK_FLAGS_CATEGORY_DB);
+ return p;
}
-static void free_catree_base_node(void* base_node_container_ptr)
+static void do_free_base_node(void* vptr)
{
- DbTableCATreeNode *base_node_container =
- (DbTableCATreeNode *)base_node_container_ptr;
- DbTableCATreeBaseNode *base_node =
- &base_node_container->baseOrRoute.base;
- erts_rwmtx_destroy(&base_node->lock);
- erts_db_free(ERTS_ALC_T_DB_TABLE,
- base_node->tab, base_node_container,
- sizeof(DbTableCATreeNode));
-}
-
-static void free_catree_routing_node(void *route_node_container_ptr)
-{
- size_t route_node_container_size;
- byte* route_node_container_bytes = route_node_container_ptr;
- DbTableCATreeNode *route_node_container =
- (DbTableCATreeNode *)route_node_container_bytes;
- DbTableCATreeRouteNode *route_node =
- &route_node_container->baseOrRoute.route;
- int key_size = route_node->key.size;
- Uint offset = offsetof(DbTableCATreeNode,baseOrRoute) +
- offsetof(DbTableCATreeRouteNode,key);
- ErlOffHeap tmp_oh;
- DbTerm* db_term = (DbTerm*) (route_node_container_bytes + offset);
- erts_mtx_destroy(&route_node->lock);
- route_node_container_size =
- offset +
- sizeof(DbTerm) +
- sizeof(Eterm)*key_size;
- if (key_size != 0) {
- tmp_oh.first = db_term->first_oh;
- erts_cleanup_offheap(&tmp_oh);
- }
- erts_db_free(ERTS_ALC_T_DB_TABLE,
- route_node->tab,
- route_node_container,
- route_node_container_size);
+ DbTableCATreeNode *p = (DbTableCATreeNode *)vptr;
+ ASSERT(p->is_base_node);
+ erts_rwmtx_destroy(&p->u.base.lock);
+ erts_free(ERTS_ALC_T_DB_TABLE, p);
+}
+
+static void free_catree_base_node(DbTableCATree* tb, DbTableCATreeNode* p)
+{
+ ASSERT(p->is_base_node);
+ ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_base_node(), 0);
+ do_free_base_node(p);
+}
+
+static void do_free_route_node(void *vptr)
+{
+ DbTableCATreeNode *p = (DbTableCATreeNode *)vptr;
+ ASSERT(!p->is_base_node);
+ erts_mtx_destroy(&p->u.route.lock);
+ destroy_route_key(&p->u.route.key);
+ erts_free(ERTS_ALC_T_DB_TABLE, p);
+}
+
+static void free_catree_route_node(DbTableCATree* tb, DbTableCATreeNode* p)
+{
+ ASSERT(!p->is_base_node);
+ ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_route_node(p->u.route.key.size), 0);
+ do_free_route_node(p);
}
+
/*
* Returns the parent routing node of the specified
- * route_node_container if such a routing node exists or NULL if
- * route_node_container is attached to the root
+ * route node 'child' if such a parent exists
+ * or NULL if 'child' is attached to the root.
*/
static ERTS_INLINE DbTableCATreeNode *
parent_of(DbTableCATree *tb,
- DbTableCATreeNode *route_node_container)
+ DbTableCATreeNode *child)
{
+ Eterm key = GET_ROUTE_NODE_KEY(child);
+ DbTableCATreeNode *current = GET_ROOT_ACQB(tb);
+ DbTableCATreeNode *prev = NULL;
- Eterm key = GET_ROUTE_NODE_KEY(route_node_container);
- DbTableCATreeNode *current_node = GET_ROOT_ACQB(tb);
- DbTableCATreeNode *prev_node = NULL;
- if (current_node == route_node_container) {
- return NULL;
- }
- while (current_node != route_node_container) {
- prev_node = current_node;
- if (cmp_key_route((DbTableCommon *)tb, key, current_node) < 0) {
- current_node = GET_LEFT_ACQB(current_node);
+ while (current != child) {
+ prev = current;
+ if (cmp_key_route(key, current) < 0) {
+ current = GET_LEFT_ACQB(current);
} else {
- current_node = GET_RIGHT_ACQB(current_node);
+ current = GET_RIGHT_ACQB(current);
}
}
- return prev_node;
+ return prev;
}
@@ -953,11 +1056,7 @@ leftmost_route_node(DbTableCATreeNode *root)
prev_node = node;
node = GET_LEFT_ACQB(node);
}
- if (prev_node == NULL) {
- return NULL;
- } else {
- return prev_node;
- }
+ return prev_node;
}
static ERTS_INLINE DbTableCATreeNode*
@@ -969,162 +1068,7 @@ rightmost_route_node(DbTableCATreeNode *root)
prev_node = node;
node = GET_RIGHT_ACQB(node);
}
- if (prev_node == NULL) {
- return NULL;
- } else {
- return prev_node;
- }
-}
-
-static ERTS_INLINE DbTableCATreeNode*
-leftmost_base_node_and_path(DbTableCATreeNode *root, CATreeNodeStack * stack)
-{
- DbTableCATreeNode * node = root;
- while (!node->is_base_node) {
- PUSH_NODE(stack, node);
- node = GET_LEFT_ACQB(node);
- }
- return node;
-}
-
-static ERTS_INLINE DbTableCATreeNode*
-get_next_base_node_and_path(DbTableCommon *common_table_data,
- DbTableCATreeNode *base_node,
- CATreeNodeStack *stack)
-{
- if (EMPTY_NODE(stack)) { /* The parent of b is the root */
- return NULL;
- } else {
- if (GET_LEFT(TOP_NODE(stack)) == base_node) {
- return leftmost_base_node_and_path(
- GET_RIGHT_ACQB(TOP_NODE(stack)),
- stack);
- } else {
- Eterm pkey =
- TOP_NODE(stack)->baseOrRoute.route.key.tpl[0]; /* pKey = key of parent */
- POP_NODE(stack);
- while (!EMPTY_NODE(stack)) {
- if (TOP_NODE(stack)->baseOrRoute.route.is_valid &&
- cmp_key_route(common_table_data, pkey, TOP_NODE(stack)) <= 0) {
- return leftmost_base_node_and_path(GET_RIGHT_ACQB(TOP_NODE(stack)), stack);
- } else {
- POP_NODE(stack);
- }
- }
- }
- return NULL;
- }
-}
-
-static ERTS_INLINE void
-clone_stack(CATreeNodeStack *search_stack_ptr,
- CATreeNodeStack *search_stack_copy_ptr)
-{
- int i;
- search_stack_copy_ptr->pos = search_stack_ptr->pos;
- for (i = 0; i < search_stack_ptr->pos; i++) {
- search_stack_copy_ptr->array[i] = search_stack_ptr->array[i];
- }
-}
-
-
-
-static ERTS_INLINE DbTableCATreeNode*
-lock_first_base_node(DbTable *tbl,
- CATreeNodeStack *search_stack_ptr,
- CATreeNodeStack *locked_base_nodes_stack_ptr)
-{
- int retry;
- DbTableCATreeNode *current_node;
- DbTableCATreeBaseNode *base_node;
- DbTableCATree* tb = &tbl->catree;
- do {
- retry = 0;
- current_node = GET_ROOT_ACQB(tb);
- while ( ! current_node->is_base_node ) {
- PUSH_NODE(search_stack_ptr, current_node);
- current_node = GET_LEFT_ACQB(current_node);
- }
- base_node = &current_node->baseOrRoute.base;
- rlock_base_node(base_node);
- if ( ! base_node->is_valid ) {
- /* Retry */
- runlock_base_node(base_node);
- retry = 1;
- }
- } while(retry);
- push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
- return current_node;
-}
-
-static ERTS_INLINE DbTableCATreeBaseNode*
-find_and_lock_next_base_node_and_path(DbTable *tbl,
- CATreeNodeStack **search_stack_ptr_ptr,
- CATreeNodeStack **search_stack_copy_ptr_ptr,
- CATreeNodeStack *locked_base_nodes_stack_ptr)
-{
- DbTableCATreeNode *current_node;
- DbTableCATreeBaseNode *base_node;
- CATreeNodeStack * tmp_stack_ptr;
- DbTableCommon* common_table_data;
- retry_find_and_lock_next_base_node:
- current_node = TOP_NODE(locked_base_nodes_stack_ptr);
- common_table_data = &tbl->common;
- clone_stack(*search_stack_ptr_ptr, *search_stack_copy_ptr_ptr);
- current_node =
- get_next_base_node_and_path(common_table_data, current_node, *search_stack_ptr_ptr);
- if (current_node == NULL) {
- return NULL;
- }
- base_node = &current_node->baseOrRoute.base;
- rlock_base_node(base_node);
- if ( ! base_node->is_valid ) {
- /* Retry */
- runlock_base_node(base_node);
- /* Revert to previous state */
- current_node = TOP_NODE(locked_base_nodes_stack_ptr);
- tmp_stack_ptr = *search_stack_ptr_ptr;
- *search_stack_ptr_ptr = *search_stack_copy_ptr_ptr;
- *search_stack_copy_ptr_ptr = tmp_stack_ptr;
- goto retry_find_and_lock_next_base_node;
- } else {
- push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
- }
- return base_node;
-}
-
-static ERTS_INLINE
-void unlock_and_release_locked_base_node_stack(DbTable *tbl,
- CATreeNodeStack *locked_base_nodes_stack_ptr)
-{
- DbTableCATreeNode *current_node;
- DbTableCATreeBaseNode *base_node;
- int i;
- for (i = 0; i < locked_base_nodes_stack_ptr->pos; i++) {
- current_node = locked_base_nodes_stack_ptr->array[i];
- base_node = &current_node->baseOrRoute.base;
- if (locked_base_nodes_stack_ptr->pos > 1) {
- base_node->lock_statistics = /* This is not atomic which is fine as */
- base_node->lock_statistics + /* correctness does not depend on that. */
- ERL_DB_CATREE_LOCK_MORE_THAN_ONE_CONTRIBUTION;
- }
- runlock_base_node(base_node);
- }
- if (locked_base_nodes_stack_ptr->size > STACK_NEED) {
- erts_db_free(ERTS_ALC_T_DB_STK, tbl,
- locked_base_nodes_stack_ptr->array,
- sizeof(DbTableCATreeNode *) * locked_base_nodes_stack_ptr->size);
- }
-}
-
-static ERTS_INLINE
-void init_stack(CATreeNodeStack *stack,
- DbTableCATreeNode **stack_array,
- Uint init_size)
-{
- stack->array = stack_array;
- stack->pos = 0;
- stack->size = init_size;
+ return prev_node;
}
static ERTS_INLINE
@@ -1137,404 +1081,195 @@ void init_tree_stack(DbTreeStack *stack,
stack->slot = init_slot;
}
-#define DEC_ROUTE_NODE_STACK_AND_ARRAY(STACK_NAME) \
- CATreeNodeStack STACK_NAME; \
- CATreeNodeStack * STACK_NAME##_ptr = &(STACK_NAME); \
- DbTableCATreeNode *STACK_NAME##_array[STACK_NEED];
-
-#define DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS \
-DEC_ROUTE_NODE_STACK_AND_ARRAY(search_stack) \
-DEC_ROUTE_NODE_STACK_AND_ARRAY(search_stack_copy) \
-DEC_ROUTE_NODE_STACK_AND_ARRAY(locked_base_nodes_stack) \
-init_stack(&search_stack, search_stack_array, 0); \
-init_stack(&search_stack_copy, search_stack_copy_array, 0); \
-init_stack(&locked_base_nodes_stack, locked_base_nodes_stack_array, STACK_NEED);/* Abuse as stack array size*/
-
-/*
- * Locks and returns the base node that contains the specified key if
- * it is present. The function saves the search path to the found base
- * node in search_stack_ptr and adds the found base node to
- * locked_base_nodes_stack_ptr.
- */
-static ERTS_INLINE DbTableCATreeBaseNode *
-lock_base_node_with_key(DbTable *tbl,
- Eterm key,
- CATreeNodeStack * search_stack_ptr,
- CATreeNodeStack * locked_base_nodes_stack_ptr)
-{
- int retry;
- DbTableCATreeNode *current_node;
- DbTableCATreeBaseNode *base_node;
- DbTableCATree* tb = &tbl->catree;
- DbTableCommon* common_table_data = &tbl->common;
- do {
- retry = 0;
- current_node = GET_ROOT_ACQB(tb);
- while ( ! current_node->is_base_node ) {
- PUSH_NODE(search_stack_ptr, current_node);
- if( cmp_key_route(common_table_data,key,current_node) < 0 ) {
- current_node = GET_LEFT_ACQB(current_node);
- } else {
- current_node = GET_RIGHT_ACQB(current_node);
- }
- }
- base_node = &current_node->baseOrRoute.base;
- rlock_base_node(base_node);
- if ( ! base_node->is_valid ) {
- /* Retry */
- runlock_base_node(base_node);
- retry = 1;
- }
- } while(retry);
- push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
- return base_node;
-}
-
-/*
- * Joins a base node with it's right neighbor. Returns the base node
- * resulting from the join in locked state or NULL if there is no base
- * node to join with.
- */
-static DbTableCATreeNode*
-erl_db_catree_force_join_right(DbTableCommon *common_table_data,
- DbTableCATreeNode *parent_container,
- DbTableCATreeNode *base_container,
- DbTableCATreeNode **result_parent_wb)
-{
- DbTableCATreeRouteNode *parent;
- DbTableCATreeNode *gparent_container;
- DbTableCATreeRouteNode *gparent;
- DbTableCATreeBaseNode *base = &base_container->baseOrRoute.base;
- DbTableCATree *tb = (DbTableCATree *)common_table_data;
- DbTableCATreeNode *neighbor_base_container;
- DbTableCATreeBaseNode *neighbor_base;
- DbTableCATreeNode *new_neighbor_base;
- DbTableCATreeNode *neighbor_base_parent;
- int neighbour_not_valid;
- if (parent_container == NULL) {
- return NULL;
- }
- parent = &parent_container->baseOrRoute.route;
- do {
- neighbor_base_container = leftmost_base_node(GET_RIGHT_ACQB(parent_container));
- neighbor_base = &neighbor_base_container->baseOrRoute.base;
- wlock_base_node_no_stats(neighbor_base);
- neighbour_not_valid = !neighbor_base->is_valid;
- if (neighbour_not_valid) {
- wunlock_base_node(neighbor_base);
- }
- } while (neighbour_not_valid);
- lock_route_node(parent);
- parent->is_valid = 0;
- neighbor_base->is_valid = 0;
- base->is_valid = 0;
- gparent = NULL;
- gparent_container = NULL;
- do {
- if (gparent != NULL) {
- unlock_route_node(gparent);
- }
- gparent_container = parent_of(tb, parent_container);
- if (gparent_container != NULL) {
- gparent = &gparent_container->baseOrRoute.route;
- lock_route_node(gparent);
- } else {
- gparent = NULL;
- }
- } while (gparent != NULL && !gparent->is_valid);
- if (gparent == NULL) {
- SET_ROOT_RELB(tb, GET_RIGHT(parent_container));
- } else if (GET_LEFT(gparent_container) == parent_container) {
- SET_LEFT_RELB(gparent_container, GET_RIGHT(parent_container));
- } else {
- SET_RIGHT_RELB(gparent_container, GET_RIGHT(parent_container));
- }
- unlock_route_node(parent);
- if (gparent != NULL) {
- unlock_route_node(gparent);
- }
- new_neighbor_base = create_catree_base_node(tb);
- new_neighbor_base->baseOrRoute.base.root =
- join_trees(base->root, neighbor_base->root);
- wlock_base_node_no_stats(&(new_neighbor_base->baseOrRoute.base));
- neighbor_base_parent = NULL;
- if (GET_RIGHT(parent_container) == neighbor_base_container) {
- neighbor_base_parent = gparent_container;
- } else {
- neighbor_base_parent =
- leftmost_route_node(GET_RIGHT(parent_container));
- }
- if(neighbor_base_parent == NULL) {
- SET_ROOT_RELB(tb, new_neighbor_base);
- } else if (GET_LEFT(neighbor_base_parent) == neighbor_base_container) {
- SET_LEFT_RELB(neighbor_base_parent, new_neighbor_base);
- } else {
- SET_RIGHT_RELB(neighbor_base_parent, new_neighbor_base);
- }
- wunlock_base_node(base);
- wunlock_base_node(neighbor_base);
- /* Free the parent and base */
- erts_schedule_thr_prgr_later_op(free_catree_routing_node,
- parent_container,
- &parent->free_item);
- erts_schedule_thr_prgr_later_op(free_catree_base_node,
- base_container,
- &base->free_item);
- erts_schedule_thr_prgr_later_op(free_catree_base_node,
- neighbor_base_container,
- &neighbor_base->free_item);
-
- if (parent_container == neighbor_base_container) {
- *result_parent_wb = gparent_container;
- } else {
- *result_parent_wb = neighbor_base_parent;
- }
- return new_neighbor_base;
-}
-
-/*
- * Used to merge together all base nodes for operations such as last
- * and select_*. Returns the base node resulting from the merge in
- * locked state.
- */
-static DbTableCATreeNode *
-merge_to_one_locked_base_node(DbTableCommon * common_table_data)
-{
- DbTableCATreeNode *parent_container;
- DbTableCATreeNode *new_parent_container;
- DbTableCATree *tb = (DbTableCATree *)common_table_data;
- DbTableCATreeNode *base_container;
- DbTableCATreeNode *new_base_container;
- int is_not_valid;
- /* Find first base node */
- do {
- parent_container = NULL;
- base_container = GET_ROOT_ACQB(tb);
- while ( ! base_container->is_base_node ) {
- parent_container = base_container;
- base_container = GET_LEFT_ACQB(base_container);
- }
- wlock_base_node_no_stats(&(base_container->baseOrRoute.base));
- is_not_valid = ! base_container->baseOrRoute.base.is_valid;
- if (is_not_valid) {
- wunlock_base_node(&(base_container->baseOrRoute.base));
- }
- } while(is_not_valid);
- do {
- new_base_container = erl_db_catree_force_join_right(common_table_data,
- parent_container,
- base_container,
- &new_parent_container);
- if (new_base_container != NULL) {
- base_container = new_base_container;
- parent_container = new_parent_container;
- }
- } while(new_base_container != NULL);
- return base_container;
-}
-
-
static void join_catree(DbTableCATree *tb,
- DbTableCATreeNode *parent_container,
- DbTableCATreeNode *base_container)
-{
- DbTableCATreeRouteNode *parent;
- DbTableCATreeNode *gparent_container;
- DbTableCATreeRouteNode *gparent;
- DbTableCATreeBaseNode *base = &base_container->baseOrRoute.base;
- DbTableCATreeNode *neighbor_base_container;
- DbTableCATreeBaseNode *neighbor_base;
- DbTableCATreeNode *new_neighbor_base;
- DbTableCATreeNode *neighbor_base_parent;
- if (parent_container == NULL) {
- base->lock_statistics = 0;
- wunlock_base_node(base);
+ DbTableCATreeNode *thiz,
+ DbTableCATreeNode *parent)
+{
+ DbTableCATreeNode *gparent;
+ DbTableCATreeNode *neighbor;
+ DbTableCATreeNode *new_neighbor;
+ DbTableCATreeNode *neighbor_parent;
+
+ ASSERT(thiz->is_base_node);
+ if (parent == NULL) {
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(thiz);
return;
}
- parent = &parent_container->baseOrRoute.route;
- if (GET_LEFT(parent_container) == base_container) {
- neighbor_base_container = leftmost_base_node(GET_RIGHT_ACQB(parent_container));
- neighbor_base = &neighbor_base_container->baseOrRoute.base;
- if (try_wlock_base_node(neighbor_base)) {
+ ASSERT(!parent->is_base_node);
+ if (GET_LEFT(parent) == thiz) {
+ neighbor = leftmost_base_node(GET_RIGHT_ACQB(parent));
+ if (try_wlock_base_node(&neighbor->u.base)) {
/* Failed to acquire lock */
- base->lock_statistics = 0;
- wunlock_base_node(base);
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(thiz);
return;
- } else if (!neighbor_base->is_valid) {
- base->lock_statistics = 0;
- wunlock_base_node(base);
- wunlock_base_node(neighbor_base);
+ } else if (!neighbor->u.base.is_valid) {
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(thiz);
+ wunlock_base_node(neighbor);
return;
} else {
lock_route_node(parent);
- parent->is_valid = 0;
- neighbor_base->is_valid = 0;
- base->is_valid = 0;
+ parent->u.route.is_valid = 0;
+ neighbor->u.base.is_valid = 0;
+ thiz->u.base.is_valid = 0;
gparent = NULL;
- gparent_container = NULL;
do {
if (gparent != NULL) {
unlock_route_node(gparent);
}
- gparent_container = parent_of(tb, parent_container);
- if (gparent_container != NULL) {
- gparent = &gparent_container->baseOrRoute.route;
+ gparent = parent_of(tb, parent);
+ if (gparent != NULL)
lock_route_node(gparent);
- } else {
- gparent = NULL;
- }
- } while (gparent != NULL && !gparent->is_valid);
+ } while (gparent != NULL && !gparent->u.route.is_valid);
+
if (gparent == NULL) {
- SET_ROOT_RELB(tb, GET_RIGHT(parent_container));
- } else if (GET_LEFT(gparent_container) == parent_container) {
- SET_LEFT_RELB(gparent_container, GET_RIGHT(parent_container));
+ SET_ROOT_RELB(tb, GET_RIGHT(parent));
+ } else if (GET_LEFT(gparent) == parent) {
+ SET_LEFT_RELB(gparent, GET_RIGHT(parent));
} else {
- SET_RIGHT_RELB(gparent_container, GET_RIGHT(parent_container));
+ SET_RIGHT_RELB(gparent, GET_RIGHT(parent));
}
unlock_route_node(parent);
if (gparent != NULL) {
unlock_route_node(gparent);
}
- new_neighbor_base = create_catree_base_node(tb);
- new_neighbor_base->baseOrRoute.base.root =
- join_trees(base->root, neighbor_base->root);
- neighbor_base_parent = NULL;
- if (GET_RIGHT(parent_container) == neighbor_base_container) {
- neighbor_base_parent = gparent_container;
+ {
+ TreeDbTerm* new_root = join_trees(thiz->u.base.root,
+ neighbor->u.base.root);
+ new_neighbor = create_base_node(tb, new_root);
+ }
+ if (GET_RIGHT(parent) == neighbor) {
+ neighbor_parent = gparent;
} else {
- neighbor_base_parent =
- leftmost_route_node(GET_RIGHT(parent_container));
+ neighbor_parent = leftmost_route_node(GET_RIGHT(parent));
}
}
} else { /* Symetric case */
- neighbor_base_container = rightmost_base_node(GET_LEFT_ACQB(parent_container));
- neighbor_base = &neighbor_base_container->baseOrRoute.base;
- if (try_wlock_base_node(neighbor_base)) {
+ ASSERT(GET_RIGHT(parent) == thiz);
+ neighbor = rightmost_base_node(GET_LEFT_ACQB(parent));
+ if (try_wlock_base_node(&neighbor->u.base)) {
/* Failed to acquire lock */
- base->lock_statistics = 0;
- wunlock_base_node(base);
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(thiz);
return;
- } else if (!neighbor_base->is_valid) {
- base->lock_statistics = 0;
- wunlock_base_node(base);
- wunlock_base_node(neighbor_base);
+ } else if (!neighbor->u.base.is_valid) {
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(thiz);
+ wunlock_base_node(neighbor);
return;
} else {
lock_route_node(parent);
- parent->is_valid = 0;
- neighbor_base->is_valid = 0;
- base->is_valid = 0;
+ parent->u.route.is_valid = 0;
+ neighbor->u.base.is_valid = 0;
+ thiz->u.base.is_valid = 0;
gparent = NULL;
- gparent_container = NULL;
do {
if (gparent != NULL) {
unlock_route_node(gparent);
}
- gparent_container = parent_of(tb, parent_container);
- if (gparent_container != NULL) {
- gparent = &gparent_container->baseOrRoute.route;
+ gparent = parent_of(tb, parent);
+ if (gparent != NULL) {
lock_route_node(gparent);
} else {
gparent = NULL;
}
- } while (gparent != NULL && !gparent->is_valid);
+ } while (gparent != NULL && !gparent->u.route.is_valid);
if (gparent == NULL) {
- SET_ROOT_RELB(tb, GET_LEFT(parent_container));
- } else if (GET_RIGHT(gparent_container) == parent_container) {
- SET_RIGHT_RELB(gparent_container, GET_LEFT(parent_container));
+ SET_ROOT_RELB(tb, GET_LEFT(parent));
+ } else if (GET_RIGHT(gparent) == parent) {
+ SET_RIGHT_RELB(gparent, GET_LEFT(parent));
} else {
- SET_LEFT_RELB(gparent_container, GET_LEFT(parent_container));
+ SET_LEFT_RELB(gparent, GET_LEFT(parent));
}
unlock_route_node(parent);
if (gparent != NULL) {
unlock_route_node(gparent);
}
- new_neighbor_base = create_catree_base_node(tb);
- new_neighbor_base->baseOrRoute.base.root =
- join_trees(neighbor_base->root, base->root);
- neighbor_base_parent = NULL;
- if (GET_LEFT(parent_container) == neighbor_base_container) {
- neighbor_base_parent = gparent_container;
+ {
+ TreeDbTerm* new_root = join_trees(neighbor->u.base.root,
+ thiz->u.base.root);
+ new_neighbor = create_base_node(tb, new_root);
+ }
+ if (GET_LEFT(parent) == neighbor) {
+ neighbor_parent = gparent;
} else {
- neighbor_base_parent =
- rightmost_route_node(GET_LEFT(parent_container));
+ neighbor_parent =
+ rightmost_route_node(GET_LEFT(parent));
}
}
}
/* Link in new neighbor and free nodes that are no longer in the tree */
- if (neighbor_base_parent == NULL) {
- SET_ROOT_RELB(tb, new_neighbor_base);
- } else if (GET_LEFT(neighbor_base_parent) == neighbor_base_container) {
- SET_LEFT_RELB(neighbor_base_parent, new_neighbor_base);
+ if (neighbor_parent == NULL) {
+ SET_ROOT_RELB(tb, new_neighbor);
+ } else if (GET_LEFT(neighbor_parent) == neighbor) {
+ SET_LEFT_RELB(neighbor_parent, new_neighbor);
} else {
- SET_RIGHT_RELB(neighbor_base_parent, new_neighbor_base);
+ SET_RIGHT_RELB(neighbor_parent, new_neighbor);
}
- wunlock_base_node(base);
- wunlock_base_node(neighbor_base);
+ wunlock_base_node(thiz);
+ wunlock_base_node(neighbor);
/* Free the parent and base */
- erts_schedule_thr_prgr_later_op(free_catree_routing_node,
- parent_container,
- &parent->free_item);
- erts_schedule_thr_prgr_later_op(free_catree_base_node,
- base_container,
- &base->free_item);
- erts_schedule_thr_prgr_later_op(free_catree_base_node,
- neighbor_base_container,
- &neighbor_base->free_item);
-}
-
-
-static void split_catree(DbTableCommon *tb,
- DbTableCATreeNode *parent_container,
- DbTableCATreeNode *base_container) {
+ erts_schedule_db_free(&tb->common,
+ do_free_route_node,
+ parent,
+ &parent->u.route.free_item,
+ sizeof_route_node(parent->u.route.key.size));
+ erts_schedule_db_free(&tb->common,
+ do_free_base_node,
+ thiz,
+ &thiz->u.base.free_item,
+ sizeof_base_node());
+ erts_schedule_db_free(&tb->common,
+ do_free_base_node,
+ neighbor,
+ &neighbor->u.base.free_item,
+ sizeof_base_node());
+}
+
+static void split_catree(DbTableCATree *tb,
+ DbTableCATreeNode* ERTS_RESTRICT base,
+ DbTableCATreeNode* ERTS_RESTRICT parent)
+{
TreeDbTerm *splitOutWriteBack;
- TreeDbTerm *leftWriteBack;
- TreeDbTerm *rightWriteBack;
- DbTableCATreeNode *left_base_node;
- DbTableCATreeNode *right_base_node;
- DbTableCATreeNode *routing_node_container;
- DbTableCATreeBaseNode *base = &base_container->baseOrRoute.base;
- DbTableCATreeRouteNode *parent;
- if (parent_container == NULL) {
- parent = NULL;
- } else {
- parent = &parent_container->baseOrRoute.route;
- }
+ DbTableCATreeNode* ERTS_RESTRICT new_left;
+ DbTableCATreeNode* ERTS_RESTRICT new_right;
+ DbTableCATreeNode* ERTS_RESTRICT new_route;
- if (less_than_two_elements(base->root)) {
- base->lock_statistics = 0;
+ if (less_than_two_elements(base->u.base.root)) {
+ if (!(tb->common.status & DB_CATREE_FORCE_SPLIT))
+ base->u.base.lock_statistics = 0;
wunlock_base_node(base);
return;
} else {
- /* Split the tree */
- split_tree(tb,
- base->root,
- &splitOutWriteBack,
- &leftWriteBack,
- &rightWriteBack);
- /* Create new base nodes */
- left_base_node =
- create_catree_base_node((DbTableCATree*)tb);
- right_base_node =
- create_catree_base_node((DbTableCATree*)tb);
- left_base_node->baseOrRoute.base.root = leftWriteBack;
- right_base_node->baseOrRoute.base.root = rightWriteBack;
- routing_node_container = create_catree_route_node(tb,
- left_base_node,
- right_base_node,
- &splitOutWriteBack->dbterm);
+ TreeDbTerm *left_tree;
+ TreeDbTerm *right_tree;
+
+ split_tree(tb, base->u.base.root, &splitOutWriteBack,
+ &left_tree, &right_tree);
+
+ new_left = create_base_node(tb, left_tree);
+ new_right = create_base_node(tb, right_tree);
+ new_route = create_route_node(tb,
+ new_left,
+ new_right,
+ &splitOutWriteBack->dbterm,
+ parent);
if (parent == NULL) {
- SET_ROOT_RELB((DbTableCATree*)tb, routing_node_container);
- } else if(GET_LEFT(parent_container) == base_container) {
- SET_LEFT_RELB(parent_container, routing_node_container);
+ SET_ROOT_RELB(tb, new_route);
+ } else if(GET_LEFT(parent) == base) {
+ SET_LEFT_RELB(parent, new_route);
} else {
- SET_RIGHT_RELB(parent_container, routing_node_container);
+ SET_RIGHT_RELB(parent, new_route);
}
- base->is_valid = 0;
+ base->u.base.is_valid = 0;
wunlock_base_node(base);
- erts_schedule_thr_prgr_later_op(free_catree_base_node,
- base_container,
- &base->free_item);
+ erts_schedule_db_free(&tb->common,
+ do_free_base_node,
+ base,
+ &base->u.base.free_item,
+ sizeof_base_node());
}
}
@@ -1546,7 +1281,7 @@ static void catree_add_base_node_to_free_list(
DbTableCATree *tb,
DbTableCATreeNode *base_node_container)
{
- base_node_container->baseOrRoute.base.next =
+ base_node_container->u.base.next =
tb->base_nodes_to_free_list;
tb->base_nodes_to_free_list = base_node_container;
}
@@ -1557,7 +1292,7 @@ static void catree_deque_base_node_from_free_list(DbTableCATree *tb)
return; /* List empty */
} else {
DbTableCATreeNode *first = tb->base_nodes_to_free_list;
- tb->base_nodes_to_free_list = first->baseOrRoute.base.next;
+ tb->base_nodes_to_free_list = first->u.base.next;
}
}
@@ -1596,7 +1331,7 @@ static SWord do_free_routing_nodes_catree_cont(DbTableCATree *tb, SWord num_left
PUSH_NODE(&tb->free_stack_rnodes, root);
root = p;
} else {
- free_catree_routing_node(root);
+ free_catree_route_node(tb, root);
if (--num_left >= 0) {
break;
} else {
@@ -1637,10 +1372,10 @@ static SWord do_free_base_node_cont(DbTableCATree *tb, SWord num_left)
}
}
catree_deque_base_node_from_free_list(tb);
- free_catree_base_node(base_node_container);
+ free_catree_base_node(tb, base_node_container);
base_node_container = catree_first_base_node_from_free_list(tb);
if (base_node_container != NULL) {
- PUSH_NODE(&tb->free_stack_elems, base_node_container->baseOrRoute.base.root);
+ PUSH_NODE(&tb->free_stack_elems, base_node_container->u.base.root);
}
return num_left;
}
@@ -1662,7 +1397,9 @@ void db_initialize_catree(void)
int db_create_catree(Process *p, DbTable *tbl)
{
DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *root = create_catree_base_node(tb);
+ DbTableCATreeNode *root;
+
+ root = create_base_node(tb, NULL);
tb->deletion = 0;
tb->base_nodes_to_free_list = NULL;
erts_atomic_init_relb(&(tb->root), (erts_aint_t)root);
@@ -1671,177 +1408,447 @@ int db_create_catree(Process *p, DbTable *tbl)
static int db_first_catree(Process *p, DbTable *tbl, Eterm *ret)
{
- DbTableCATreeBaseNode *base_node;
+ TreeDbTerm *root;
+ CATreeRootIterator iter;
int result;
- DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS;
- /* Find first base node */
- base_node = &(lock_first_base_node(tbl, &search_stack, &locked_base_nodes_stack)->baseOrRoute.base);
- /* Find next base node until non-empty base node is found */
- while (base_node != NULL && base_node->root == NULL) {
- base_node = find_and_lock_next_base_node_and_path(tbl, &search_stack_ptr, &search_stack_copy_ptr, locked_base_nodes_stack_ptr);
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ root = *catree_find_first_root(&iter);
+ if (!root) {
+ TreeDbTerm **pp = catree_find_next_root(&iter, NULL);
+ root = pp ? *pp : NULL;
}
- /* Get the return value */
- result = db_first_tree_common(p, tbl, (base_node == NULL ? NULL : base_node->root), ret, NULL);
- /* Unlock base nodes */
- unlock_and_release_locked_base_node_stack(tbl, locked_base_nodes_stack_ptr);
+
+ result = db_first_tree_common(p, tbl, root, ret, NULL);
+
+ destroy_root_iterator(&iter);
return result;
}
static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
- DbTreeStack next_search_stack;
- TreeDbTerm *next_search_stack_array[STACK_NEED];
- DbTableCATreeBaseNode *base_node;
- int result = 0;
- DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS;
- init_tree_stack(&next_search_stack, next_search_stack_array, 0);
- /* Lock base node with key */
- base_node = lock_base_node_with_key(tbl, key, &search_stack, &locked_base_nodes_stack);
- /* Continue until key is not >= greatest key in base_node */
- while (base_node != NULL) {
- result = db_next_tree_common(p, tbl, base_node->root, key,
- ret, &next_search_stack);
- if (result != DB_ERROR_NONE || *ret != am_EOT) {
+ DbTreeStack stack;
+ TreeDbTerm * stack_array[STACK_NEED];
+ TreeDbTerm **rootp;
+ CATreeRootIterator iter;
+ int result;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ iter.next_route_key = key;
+ rootp = catree_find_next_root(&iter, NULL);
+
+ do {
+ init_tree_stack(&stack, stack_array, 0);
+ result = db_next_tree_common(p, tbl, (rootp ? *rootp : NULL), key, ret, &stack);
+ if (result != DB_ERROR_NONE || *ret != am_EOT)
break;
- }
- base_node =
- find_and_lock_next_base_node_and_path(tbl,
- &search_stack_ptr,
- &search_stack_copy_ptr,
- locked_base_nodes_stack_ptr);
- }
- unlock_and_release_locked_base_node_stack(tbl, &locked_base_nodes_stack);
+
+ rootp = catree_find_next_root(&iter, NULL);
+ } while (rootp);
+
+ destroy_root_iterator(&iter);
return result;
}
static int db_last_catree(Process *p, DbTable *tbl, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common);
- int result = db_last_tree_common(p, tbl, base->baseOrRoute.base.root, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ TreeDbTerm *root;
+ CATreeRootIterator iter;
+ int result;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ root = *catree_find_last_root(&iter);
+ if (!root) {
+ TreeDbTerm **pp = catree_find_prev_root(&iter, NULL);
+ root = pp ? *pp : NULL;
+ }
+
+ result = db_last_tree_common(p, tbl, root, ret, NULL);
+
+ destroy_root_iterator(&iter);
return result;
-
}
static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
- DbTableCATree *tb = &tbl->catree;
+ TreeDbTerm **rootp;
+ CATreeRootIterator iter;
int result;
- DbTableCATreeNode *base;
- init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_prev_tree_common(p, tbl, base->baseOrRoute.base.root, key, ret, &stack);
- wunlock_base_node(&(base->baseOrRoute.base));
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ iter.next_route_key = key;
+ rootp = catree_find_prev_root(&iter, NULL);
+
+ do {
+ init_tree_stack(&stack, stack_array, 0);
+ result = db_prev_tree_common(p, tbl, (rootp ? *rootp : NULL), key, ret,
+ &stack);
+ if (result != DB_ERROR_NONE || *ret != am_EOT)
+ break;
+ rootp = catree_find_prev_root(&iter, NULL);
+ } while (rootp);
+
+ destroy_root_iterator(&iter);
return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_PUT_PARAMS Eterm obj, int key_clash_fail
-ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(put,
- ERL_DB_CATREE_DO_OPERATION_PUT_PARAMS,
- db_put_tree_common(&tb->common,
- &base_node->root,
- obj,
- key_clash_fail,
- NULL);)
-
static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail)
{
DbTableCATree *tb = &tbl->catree;
Eterm key = GETKEY(&tb->common, tuple_val(obj));
- return erl_db_catree_do_operation_put(tb,
- key,
- obj,
- key_clash_fail);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_put_tree_common(&tb->common, &node->u.base.root, obj,
+ key_clash_fail, NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_GET_PARAMS Process *p, Eterm *ret
-ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(get,
- ERL_DB_CATREE_DO_OPERATION_GET_PARAMS,
- db_get_tree_common(p, &tb->common,
- base_node->root,
- key, ret, NULL);)
-
static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_get(tb, key, p, ret);
+ DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
+ int result = db_get_tree_common(p, &tb->common,
+ node->u.base.root,
+ key, ret, NULL);
+ runlock_base_node(node);
+ return result;
+}
+
+TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator* iter)
+{
+ FindBaseNode fbn;
+ DbTableCATreeNode* base_node;
+
+ while (1) {
+ base_node = find_base_node(iter->tb, key, &fbn);
+ lock_iter_base_node(iter, base_node, fbn.parent, fbn.current_level);
+ if (base_node->u.base.is_valid)
+ break;
+ unlock_iter_base_node(iter);
+ }
+ return &base_node->u.base.root;
}
-#define ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS Eterm *ret
-ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(member,
- ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS,
- db_member_tree_common(&tb->common,
- base_node->root,
- key,
- ret,
- NULL);)
+static Eterm save_iter_search_key(CATreeRootIterator* iter, Eterm key)
+{
+ Uint key_size;
+
+ if (is_immed(key))
+ return key;
+
+ if (iter->search_key) {
+ if (key == iter->search_key->term)
+ return key; /* already saved */
+ destroy_route_key(iter->search_key);
+ }
+ key_size = size_object(key);
+ if (!iter->search_key || key_size > iter->search_key->size) {
+ iter->search_key = erts_realloc(ERTS_ALC_T_DB_TMP,
+ iter->search_key,
+ (offsetof(DbRouteKey, heap)
+ + key_size*sizeof(Eterm)));
+ }
+ return copy_route_key(iter->search_key, key, key_size);
+}
+
+TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter,
+ int forward,
+ Eterm *search_keyp)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_invalid = NULL;
+ DbTableCATreeNode *rejected_empty = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode *parent;
+ DbTableCATreeNode* next_route_node;
+ Eterm route_key = iter->next_route_key;
+ int current_level;
+
+ if (iter->locked_bnode) {
+ if (search_keyp)
+ *search_keyp = save_iter_search_key(iter, *search_keyp);
+ unlock_iter_base_node(iter);
+ }
+
+ if (is_non_value(route_key))
+ return NULL;
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ parent = NULL;
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ current_level++;
+ parent = node;
+ if (forward) {
+ if (cmp_key_route(route_key,node) < 0) {
+ next_route_node = node;
+ node = GET_LEFT_ACQB(node);
+ } else {
+ node = GET_RIGHT_ACQB(node);
+ }
+ }
+ else {
+ if (cmp_key_route(route_key,node) > 0) {
+ next_route_node = node;
+ node = GET_RIGHT_ACQB(node);
+ } else {
+ node = GET_LEFT_ACQB(node);
+ }
+ }
+ }
+ ASSERT(node != rejected_invalid);
+ lock_iter_base_node(iter, node, parent, current_level);
+ if (node->u.base.is_valid) {
+ ASSERT(node != rejected_empty);
+ if (node->u.base.root) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ iter->locked_bnode = node;
+ return &node->u.base.root;
+ }
+ if (!next_route_node) {
+ unlock_iter_base_node(iter);
+ return NULL;
+ }
+ route_key = next_route_node->u.route.key.term;
+ IF_DEBUG(rejected_empty = node);
+ }
+ else
+ IF_DEBUG(rejected_invalid = node);
+
+ /* Retry */
+ unlock_iter_base_node(iter);
+ }
+}
+
+TreeDbTerm** catree_find_next_root(CATreeRootIterator *iter, Eterm* keyp)
+{
+ return catree_find_nextprev_root(iter, 1, keyp);
+}
+
+TreeDbTerm** catree_find_prev_root(CATreeRootIterator *iter, Eterm* keyp)
+{
+ return catree_find_nextprev_root(iter, 0, keyp);
+}
+
+/* @brief Find root of tree where object with smallest key of all larger than
+ * partially bound key may reside. Can be used as a starting point for
+ * a reverse iteration with pb_key.
+ *
+ * @param pb_key The partially bound key. Example {42, '$1'}
+ * @param iter An initialized root iterator.
+ *
+ * @return Pointer to found root pointer. May not be NULL.
+ */
+TreeDbTerm** catree_find_next_from_pb_key_root(Eterm pb_key,
+ CATreeRootIterator* iter)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode *parent;
+ DbTableCATreeNode* next_route_node;
+ int current_level;
+
+ ASSERT(!iter->locked_bnode);
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ parent = NULL;
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ current_level++;
+ parent = node;
+ if (cmp_partly_bound(pb_key, GET_ROUTE_NODE_KEY(node)) >= 0) {
+ next_route_node = node;
+ node = GET_RIGHT_ACQB(node);
+ } else {
+ node = GET_LEFT_ACQB(node);
+ }
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, node, parent, current_level);
+ if (node->u.base.is_valid) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ return &node->u.base.root;
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+/* @brief Find root of tree where object with largest key of all smaller than
+ * partially bound key may reside. Can be used as a starting point for
+ * a forward iteration with pb_key.
+ *
+ * @param pb_key The partially bound key. Example {42, '$1'}
+ * @param iter An initialized root iterator.
+ *
+ * @return Pointer to found root pointer. May not be NULL.
+ */
+TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key,
+ CATreeRootIterator* iter)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode *parent;
+ DbTableCATreeNode* next_route_node;
+ int current_level;
+
+ ASSERT(!iter->locked_bnode);
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ parent = NULL;
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ current_level++;
+ parent = node;
+ if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) <= 0) {
+ next_route_node = node;
+ node = GET_LEFT_ACQB(node);
+ } else {
+ node = GET_RIGHT_ACQB(node);
+ }
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, node, parent, current_level);
+ if (node->u.base.is_valid) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ return &node->u.base.root;
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+static TreeDbTerm** catree_find_firstlast_root(CATreeRootIterator* iter,
+ int first)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode* next_route_node;
+ int current_level;
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ current_level++;
+ next_route_node = node;
+ node = first ? GET_LEFT_ACQB(node) : GET_RIGHT_ACQB(node);
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, node, next_route_node, current_level);
+ if (node->u.base.is_valid) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ return &node->u.base.root;
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+TreeDbTerm** catree_find_first_root(CATreeRootIterator* iter)
+{
+ return catree_find_firstlast_root(iter, 1);
+}
+
+TreeDbTerm** catree_find_last_root(CATreeRootIterator* iter)
+{
+ return catree_find_firstlast_root(iter, 0);
+}
static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_member(tb, key, ret);
+ DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
+ int result = db_member_tree_common(&tb->common,
+ node->u.base.root,
+ key, ret, NULL);
+ runlock_base_node(node);
+ return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_GET_ELEMENT_PARAMS Process *p, int ndex, Eterm *ret
-ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(get_element,
- ERL_DB_CATREE_DO_OPERATION_GET_ELEMENT_PARAMS,
- db_get_element_tree_common(p, &tb->common,
- base_node->root,
- key, ndex,
- ret, NULL))
-
static int db_get_element_catree(Process *p, DbTable *tbl,
Eterm key, int ndex, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_get_element(tb, key, p, ndex, ret);
+ DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
+ int result = db_get_element_tree_common(p, &tb->common,
+ node->u.base.root,
+ key, ndex, ret, NULL);
+ runlock_base_node(node);
+ return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_ERASE_PARAMS DbTable *tbl, Eterm *ret
-ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(erase,
- ERL_DB_CATREE_DO_OPERATION_ERASE_PARAMS,
- db_erase_tree_common(tbl,
- &base_node->root,
- key,
- ret,
- NULL);)
-
static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_erase(tb, key, tbl, ret);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_erase_tree_common(tbl, &node->u.base.root, key,
+ ret, NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_ERASE_OBJECT_PARAMS DbTable *tbl, Eterm object, Eterm *ret
-ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(erase_object,
- ERL_DB_CATREE_DO_OPERATION_ERASE_OBJECT_PARAMS,
- db_erase_object_tree_common(tbl,
- &base_node->root,
- object,
- ret,
- NULL);)
-
static int db_erase_object_catree(DbTable *tbl, Eterm object, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
Eterm key = GETKEY(&tb->common, tuple_val(object));
- return erl_db_catree_do_operation_erase_object(tb, key, tbl, object, ret);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_erase_object_tree_common(tbl,
+ &node->u.base.root,
+ object,
+ ret,
+ NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
}
static int db_slot_catree(Process *p, DbTable *tbl,
Eterm slot_term, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_slot_tree_common(p, tbl, base->baseOrRoute.base.root,
- slot_term, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_slot_tree_common(p, tbl, *catree_find_first_root(&iter),
+ slot_term, ret, NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1850,28 +1857,26 @@ static int db_select_continue_catree(Process *p,
Eterm continuation,
Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_continue_tree_common(p, &tb->common, &base->baseOrRoute.base.root,
- continuation, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_continue_tree_common(p, &tbl->common,
+ continuation, ret, NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
-
static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, int reverse, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_tree_common(p, &tb->common, &base->baseOrRoute.base.root,
- tid, pattern, reverse, ret,
- NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_tree_common(p, tbl, tid, pattern, reverse, ret,
+ NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1880,27 +1885,27 @@ static int db_select_count_continue_catree(Process *p,
Eterm continuation,
Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_count_continue_tree_common(p, &tb->common,
- &base->baseOrRoute.base.root,
- continuation, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_count_continue_tree_common(p, tbl,
+ continuation, ret, NULL,
+ &iter);
+ destroy_root_iterator(&iter);
return result;
}
static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_count_tree_common(p, &tb->common, &base->baseOrRoute.base.root,
- tid, pattern, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_count_tree_common(p, tbl,
+ tid, pattern, ret, NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1908,13 +1913,14 @@ static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Sint chunk_size,
int reversed, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_chunk_tree_common(p, &tb->common, &base->baseOrRoute.base.root,
- tid, pattern, chunk_size, reversed, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_chunk_tree_common(p, tbl,
+ tid, pattern, chunk_size, reversed, ret,
+ NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1923,74 +1929,71 @@ static int db_select_delete_continue_catree(Process *p,
Eterm continuation,
Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
int result;
- DbTableCATreeNode *base;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_delete_continue_tree_common(p, tbl, &base->baseOrRoute.base.root,
- continuation, ret, &stack);
- wunlock_base_node(&(base->baseOrRoute.base));
+ result = db_select_delete_continue_tree_common(p, tbl, continuation, ret,
+ &stack, &iter);
+ destroy_root_iterator(&iter);
return result;
}
static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
int result;
- DbTableCATreeNode *base;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_delete_tree_common(p, tbl, &base->baseOrRoute.base.root,
- tid, pattern, ret, &stack);
- wunlock_base_node(&(base->baseOrRoute.base));
+ result = db_select_delete_tree_common(p, tbl,
+ tid, pattern, ret, &stack,
+ &iter);
+ destroy_root_iterator(&iter);
return result;
}
static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_replace_tree_common(p, &tb->common,
- &base->baseOrRoute.base.root,
- tid, pattern, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
+ result = db_select_replace_tree_common(p, tbl,
+ tid, pattern, ret, NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_replace_continue_tree_common(p, &tb->common,
- &base->baseOrRoute.base.root,
- continuation, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
+ result = db_select_replace_continue_tree_common(p, tbl, continuation, ret,
+ NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_TAKE_PARAMS Process *p, Eterm *ret, DbTable *tbl
-ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(take,
- ERL_DB_CATREE_DO_OPERATION_TAKE_PARAMS,
- db_take_tree_common(p, tbl,
- &base_node->root,
- key, ret, NULL);)
-
static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_take(tb, key, p, ret, tbl);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_take_tree_common(p, tbl, &node->u.base.root, key,
+ ret, NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
}
/*
@@ -2002,10 +2005,16 @@ static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
static void db_print_catree(fmtfn_t to, void *to_arg,
int show, DbTable *tbl)
{
- DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common);
- db_print_tree_common(to, to_arg, show, base->baseOrRoute.base.root, tbl);
- wunlock_base_node(&(base->baseOrRoute.base));
+ CATreeRootIterator iter;
+ TreeDbTerm** root;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ root = catree_find_first_root(&iter);
+ do {
+ db_print_tree_common(to, to_arg, show, *root, tbl);
+ root = catree_find_next_root(&iter, NULL);
+ } while (root);
+ destroy_root_iterator(&iter);
}
/* Release all memory occupied by a single table */
@@ -2045,7 +2054,7 @@ static SWord db_free_table_continue_catree(DbTable *tbl, SWord reds)
} else {
tb->is_routing_nodes_freed = 1; /* Ready with the routing nodes */
first_base_node = catree_first_base_node_from_free_list(tb);
- PUSH_NODE(&tb->free_stack_elems, first_base_node->baseOrRoute.base.root);
+ PUSH_NODE(&tb->free_stack_elems, first_base_node->u.base.root);
}
}
while (catree_first_base_node_from_free_list(tb) != NULL) {
@@ -2081,26 +2090,33 @@ static void db_foreach_offheap_catree(DbTable *tbl,
void (*func)(ErlOffHeap *, void *),
void *arg)
{
- DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common);
- db_foreach_offheap_tree_common(base->baseOrRoute.base.root, func, arg);
- wunlock_base_node(&(base->baseOrRoute.base));
+ CATreeRootIterator iter;
+ TreeDbTerm** root;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ root = catree_find_first_root(&iter);
+ do {
+ db_foreach_offheap_tree_common(*root, func, arg);
+ root = catree_find_next_root(&iter, NULL);
+ } while (root);
+ destroy_root_iterator(&iter);
}
static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
DbUpdateHandle *handle)
{
DbTableCATree *tb = &tbl->catree;
- int res;
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node);
- res = db_lookup_dbterm_tree_common(p, tbl, &base_node->root, key, obj, handle, NULL);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int res = db_lookup_dbterm_tree_common(p, tbl, &node->u.base.root, key,
+ obj, handle, NULL);
if (res == 0) {
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART;
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
} else {
/* db_finalize_dbterm_catree will unlock */
- handle->lck = prev_node;
- handle->lck2 = current_node;
- handle->current_level = current_level;
+ handle->u.catree.base_node = node;
+ handle->u.catree.parent = fbn.parent;
+ handle->u.catree.current_level = fbn.current_level;
}
return res;
}
@@ -2108,12 +2124,10 @@ static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm ob
static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *handle)
{
DbTableCATree *tb = &(handle->tb->catree);
- DbTableCATreeNode *prev_node = handle->lck;
- DbTableCATreeNode *current_node = handle->lck2;
- int current_level = handle->current_level;
- DbTableCATreeBaseNode *base_node = &current_node->baseOrRoute.base;
db_finalize_dbterm_tree_common(cret, handle, NULL);
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART;
+ wunlock_adapt_base_node(tb, handle->u.catree.base_node,
+ handle->u.catree.parent,
+ handle->u.catree.current_level);
return;
}
@@ -2147,6 +2161,61 @@ void erts_lcnt_enable_db_catree_lock_count(DbTableCATree *tb, int enable)
}
#endif /* ERTS_ENABLE_LOCK_COUNT */
+void db_catree_force_split(DbTableCATree* tb, int on)
+{
+ CATreeRootIterator iter;
+ TreeDbTerm** root;
+
+ init_root_iterator(tb, &iter, 1);
+ root = catree_find_first_root(&iter);
+ do {
+ iter.locked_bnode->u.base.lock_statistics = (on ? INT_MAX : 0);
+ root = catree_find_next_root(&iter, NULL);
+ } while (root);
+ destroy_root_iterator(&iter);
+
+ if (on)
+ tb->common.status |= DB_CATREE_FORCE_SPLIT;
+ else
+ tb->common.status &= ~DB_CATREE_FORCE_SPLIT;
+}
+
+void db_calc_stats_catree(DbTableCATree* tb, DbCATreeStats* stats)
+{
+ DbTableCATreeNode* stack[ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT];
+ DbTableCATreeNode* node;
+ Uint depth = 0;
+
+ stats->route_nodes = 0;
+ stats->base_nodes = 0;
+ stats->max_depth = 0;
+
+ node = GET_ROOT(tb);
+ do {
+ while (!node->is_base_node) {
+ stats->route_nodes++;
+ ASSERT(depth < sizeof(stack)/sizeof(*stack));
+ stack[depth++] = node; /* PUSH parent */
+ if (stats->max_depth < depth)
+ stats->max_depth = depth;
+ node = GET_LEFT(node);
+ }
+ stats->base_nodes++;
+
+ while (depth > 0) {
+ DbTableCATreeNode* parent = stack[depth-1];
+ if (node == GET_LEFT(parent)) {
+ node = GET_RIGHT(parent);
+ break;
+ }
+ else {
+ ASSERT(node == GET_RIGHT(parent));
+ node = parent;
+ depth--; /* POP parent */
+ }
+ }
+ } while (depth > 0);
+}
#ifdef HARDDEBUG