aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/emulator/beam/erl_db_catree.c315
-rw-r--r--erts/emulator/beam/erl_db_catree.h11
-rw-r--r--erts/emulator/beam/erl_db_tree.c201
-rw-r--r--lib/stdlib/test/ets_SUITE.erl4
4 files changed, 318 insertions, 213 deletions
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c
index 975e19a878..072858f3f5 100644
--- a/erts/emulator/beam/erl_db_catree.c
+++ b/erts/emulator/beam/erl_db_catree.c
@@ -158,6 +158,13 @@ db_lookup_dbterm_catree(Process *, DbTable *, Eterm key, Eterm obj,
DbUpdateHandle*);
static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *);
+static void split_catree(DbTableCATree *tb,
+ DbTableCATreeNode* ERTS_RESTRICT base,
+ DbTableCATreeNode* ERTS_RESTRICT parent);
+static void join_catree(DbTableCATree *tb,
+ DbTableCATreeNode *thiz,
+ DbTableCATreeNode *parent);
+
/*
** External interface
@@ -634,9 +641,10 @@ int try_wlock_base_node(DbTableCATreeBaseNode *base_node)
* Locks a base node without adjusting the lock statistics
*/
static ERTS_INLINE
-void wlock_base_node_no_stats(DbTableCATreeBaseNode *base_node)
+void wlock_base_node_no_stats(DbTableCATreeNode *base_node)
{
- erts_rwmtx_rwlock(&base_node->lock);
+ ASSERT(base_node->is_base_node);
+ erts_rwmtx_rwlock(&base_node->u.base.lock);
}
/*
@@ -644,33 +652,54 @@ void wlock_base_node_no_stats(DbTableCATreeBaseNode *base_node)
* the lock was contended or not
*/
static ERTS_INLINE
-void wlock_base_node(DbTableCATreeBaseNode *base_node)
+void wlock_base_node(DbTableCATreeNode *base_node)
{
- if (try_wlock_base_node(base_node)) {
+ ASSERT(base_node->is_base_node);
+ if (try_wlock_base_node(&base_node->u.base)) {
/* The lock is contended */
wlock_base_node_no_stats(base_node);
- base_node->lock_statistics += ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION;
+ base_node->u.base.lock_statistics += ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION;
} else {
- base_node->lock_statistics += ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION;
+ base_node->u.base.lock_statistics += ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION;
}
}
static ERTS_INLINE
-void wunlock_base_node(DbTableCATreeBaseNode *base_node)
+void wunlock_base_node(DbTableCATreeNode *base_node)
{
- erts_rwmtx_rwunlock(&base_node->lock);
+ erts_rwmtx_rwunlock(&base_node->u.base.lock);
+}
+
+static ERTS_INLINE
+void wunlock_adapt_base_node(DbTableCATree* tb,
+ DbTableCATreeNode* base_node,
+ DbTableCATreeNode* parent,
+ int current_level)
+{
+ if (base_node->u.base.lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT
+ && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) {
+ split_catree(tb, base_node, parent);
+ }
+ else if (base_node->u.base.lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) {
+ join_catree(tb, base_node, parent);
+ }
+ else {
+ wunlock_base_node(base_node);
+ }
}
static ERTS_INLINE
-void rlock_base_node(DbTableCATreeBaseNode *base_node)
+void rlock_base_node(DbTableCATreeNode *base_node)
{
- erts_rwmtx_rlock(&base_node->lock);
+ ASSERT(base_node->is_base_node);
+ erts_rwmtx_rlock(&base_node->u.base.lock);
}
static ERTS_INLINE
-void runlock_base_node(DbTableCATreeBaseNode *base_node)
+void runlock_base_node(DbTableCATreeNode *base_node)
{
- erts_rwmtx_runlock(&base_node->lock);
+ ASSERT(base_node->is_base_node);
+ erts_rwmtx_runlock(&base_node->u.base.lock);
}
static ERTS_INLINE
@@ -695,17 +724,23 @@ void init_root_iterator(DbTableCATree* tb, CATreeRootIterator* iter,
iter->read_only = read_only;
iter->locked_bnode = NULL;
iter->next_route_key = THE_NON_VALUE;
+ iter->search_key = NULL;
}
static ERTS_INLINE
void lock_iter_base_node(CATreeRootIterator* iter,
- DbTableCATreeBaseNode *base_node)
+ DbTableCATreeNode *base_node,
+ DbTableCATreeNode *parent,
+ int current_level)
{
ASSERT(!iter->locked_bnode);
if (iter->read_only)
rlock_base_node(base_node);
- else
+ else {
wlock_base_node(base_node);
+ iter->bnode_parent = parent;
+ iter->bnode_level = current_level;
+ }
iter->locked_bnode = base_node;
}
@@ -715,12 +750,23 @@ void unlock_iter_base_node(CATreeRootIterator* iter)
ASSERT(iter->locked_bnode);
if (iter->read_only)
runlock_base_node(iter->locked_bnode);
+ else if (iter->locked_bnode->u.base.is_valid) {
+ wunlock_adapt_base_node(iter->tb, iter->locked_bnode,
+ iter->bnode_parent, iter->bnode_level);
+ }
else
wunlock_base_node(iter->locked_bnode);
iter->locked_bnode = NULL;
}
-
+static ERTS_INLINE
+void destroy_root_iterator(CATreeRootIterator* iter)
+{
+ if (iter->locked_bnode)
+ unlock_iter_base_node(iter);
+ if (iter->search_key)
+ erts_free(ERTS_ALC_T_DB_TMP, iter->search_key);
+}
/*
* The following macros are used to create the ETS functions that only
@@ -736,8 +782,8 @@ typedef struct
} FindBaseNode;
static ERTS_INLINE
-DbTableCATreeBaseNode* find_base_node(DbTableCATree* tb, Eterm key,
- FindBaseNode* fbn)
+DbTableCATreeNode* find_base_node(DbTableCATree* tb, Eterm key,
+ FindBaseNode* fbn)
{
DbTableCATreeNode* ERTS_RESTRICT node = GET_ROOT_ACQB(tb);
fbn->parent = NULL;
@@ -751,7 +797,7 @@ DbTableCATreeBaseNode* find_base_node(DbTableCATree* tb, Eterm key,
node = GET_RIGHT_ACQB(node);
}
}
- return &node->u.base;
+ return node;
}
#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(LOCK,UNLOCK) \
@@ -776,25 +822,14 @@ DbTableCATreeBaseNode* find_base_node(DbTableCATree* tb, Eterm key,
} \
} \
base_node = &current_node->u.base; \
- LOCK(base_node); \
+ LOCK(current_node); \
if ( ! base_node->is_valid ) { \
/* Retry */ \
- UNLOCK(base_node); \
+ UNLOCK(current_node); \
retry = 1; \
} \
} while(retry);
-
-#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \
- if (base_node->lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT \
- && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) { \
- split_catree(tb, prev_node, current_node); \
- } else if (base_node->lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) { \
- join_catree(tb, prev_node, current_node); \
- } else { \
- wunlock_base_node(base_node); \
- }
-
#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \
static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \
Eterm key, \
@@ -802,7 +837,7 @@ DbTableCATreeBaseNode* find_base_node(DbTableCATree* tb, Eterm key,
int result; \
ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node) \
result = SEQUENTAIL_CALL; \
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \
+ wunlock_adapt_base_node(tb, current_node, prev_node, current_level);\
return result; \
}
@@ -814,7 +849,7 @@ DbTableCATreeBaseNode* find_base_node(DbTableCATree* tb, Eterm key,
int result; \
ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(rlock_base_node,runlock_base_node) \
result = SEQUENTAIL_CALL; \
- runlock_base_node(base_node); \
+ runlock_base_node(current_node); \
return result; \
}
@@ -1049,8 +1084,8 @@ void init_tree_stack(DbTreeStack *stack,
}
static void join_catree(DbTableCATree *tb,
- DbTableCATreeNode *parent,
- DbTableCATreeNode *thiz)
+ DbTableCATreeNode *thiz,
+ DbTableCATreeNode *parent)
{
DbTableCATreeNode *gparent;
DbTableCATreeNode *neighbor;
@@ -1060,7 +1095,7 @@ static void join_catree(DbTableCATree *tb,
ASSERT(thiz->is_base_node);
if (parent == NULL) {
thiz->u.base.lock_statistics = 0;
- wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(thiz);
return;
}
ASSERT(!parent->is_base_node);
@@ -1069,12 +1104,12 @@ static void join_catree(DbTableCATree *tb,
if (try_wlock_base_node(&neighbor->u.base)) {
/* Failed to acquire lock */
thiz->u.base.lock_statistics = 0;
- wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(thiz);
return;
} else if (!neighbor->u.base.is_valid) {
thiz->u.base.lock_statistics = 0;
- wunlock_base_node(&thiz->u.base);
- wunlock_base_node(&neighbor->u.base);
+ wunlock_base_node(thiz);
+ wunlock_base_node(neighbor);
return;
} else {
lock_route_node(parent);
@@ -1120,12 +1155,12 @@ static void join_catree(DbTableCATree *tb,
if (try_wlock_base_node(&neighbor->u.base)) {
/* Failed to acquire lock */
thiz->u.base.lock_statistics = 0;
- wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(thiz);
return;
} else if (!neighbor->u.base.is_valid) {
thiz->u.base.lock_statistics = 0;
- wunlock_base_node(&thiz->u.base);
- wunlock_base_node(&neighbor->u.base);
+ wunlock_base_node(thiz);
+ wunlock_base_node(neighbor);
return;
} else {
lock_route_node(parent);
@@ -1177,8 +1212,8 @@ static void join_catree(DbTableCATree *tb,
} else {
SET_RIGHT_RELB(neighbor_parent, new_neighbor);
}
- wunlock_base_node(&thiz->u.base);
- wunlock_base_node(&neighbor->u.base);
+ wunlock_base_node(thiz);
+ wunlock_base_node(neighbor);
/* Free the parent and base */
erts_schedule_db_free(&tb->common,
do_free_route_node,
@@ -1198,8 +1233,9 @@ static void join_catree(DbTableCATree *tb,
}
static void split_catree(DbTableCATree *tb,
- DbTableCATreeNode* ERTS_RESTRICT parent,
- DbTableCATreeNode* ERTS_RESTRICT base) {
+ DbTableCATreeNode* ERTS_RESTRICT base,
+ DbTableCATreeNode* ERTS_RESTRICT parent)
+{
TreeDbTerm *splitOutWriteBack;
DbTableCATreeNode* ERTS_RESTRICT new_left;
DbTableCATreeNode* ERTS_RESTRICT new_right;
@@ -1208,7 +1244,7 @@ static void split_catree(DbTableCATree *tb,
if (less_than_two_elements(base->u.base.root)) {
if (!(tb->common.status & DB_CATREE_FORCE_SPLIT))
base->u.base.lock_statistics = 0;
- wunlock_base_node(&base->u.base);
+ wunlock_base_node(base);
return;
} else {
TreeDbTerm *left_tree;
@@ -1234,7 +1270,7 @@ static void split_catree(DbTableCATree *tb,
SET_RIGHT_RELB(parent, new_route);
}
base->u.base.is_valid = 0;
- wunlock_base_node(&base->u.base);
+ wunlock_base_node(base);
erts_schedule_db_free(&tb->common,
do_free_base_node,
base,
@@ -1386,14 +1422,13 @@ static int db_first_catree(Process *p, DbTable *tbl, Eterm *ret)
init_root_iterator(&tbl->catree, &iter, 1);
root = *catree_find_first_root(&iter);
if (!root) {
- TreeDbTerm **pp = catree_find_next_root(&iter);
+ TreeDbTerm **pp = catree_find_next_root(&iter, NULL);
root = pp ? *pp : NULL;
}
result = db_first_tree_common(p, tbl, root, ret, NULL);
- if (iter.locked_bnode)
- runlock_base_node(iter.locked_bnode);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1407,7 +1442,7 @@ static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
init_root_iterator(&tbl->catree, &iter, 1);
iter.next_route_key = key;
- rootp = catree_find_next_root(&iter);
+ rootp = catree_find_next_root(&iter, NULL);
do {
init_tree_stack(&stack, stack_array, 0);
@@ -1415,11 +1450,10 @@ static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
if (result != DB_ERROR_NONE || *ret != am_EOT)
break;
- rootp = catree_find_next_root(&iter);
+ rootp = catree_find_next_root(&iter, NULL);
} while (rootp);
- if (iter.locked_bnode)
- runlock_base_node(iter.locked_bnode);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1432,14 +1466,13 @@ static int db_last_catree(Process *p, DbTable *tbl, Eterm *ret)
init_root_iterator(&tbl->catree, &iter, 1);
root = *catree_find_last_root(&iter);
if (!root) {
- TreeDbTerm **pp = catree_find_prev_root(&iter);
+ TreeDbTerm **pp = catree_find_prev_root(&iter, NULL);
root = pp ? *pp : NULL;
}
result = db_last_tree_common(p, tbl, root, ret, NULL);
- if (iter.locked_bnode)
- runlock_base_node(iter.locked_bnode);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1453,7 +1486,7 @@ static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
init_root_iterator(&tbl->catree, &iter, 1);
iter.next_route_key = key;
- rootp = catree_find_prev_root(&iter);
+ rootp = catree_find_prev_root(&iter, NULL);
do {
init_tree_stack(&stack, stack_array, 0);
@@ -1461,11 +1494,10 @@ static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
&stack);
if (result != DB_ERROR_NONE || *ret != am_EOT)
break;
- rootp = catree_find_prev_root(&iter);
+ rootp = catree_find_prev_root(&iter, NULL);
} while (rootp);
- if (iter.locked_bnode)
- runlock_base_node(iter.locked_bnode);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1506,16 +1538,16 @@ static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator* iter)
{
FindBaseNode fbn;
- DbTableCATreeBaseNode* base_node;
+ DbTableCATreeNode* base_node;
while (1) {
base_node = find_base_node(iter->tb, key, &fbn);
- lock_iter_base_node(iter, base_node);
- if (base_node->is_valid)
+ lock_iter_base_node(iter, base_node, fbn.parent, fbn.current_level);
+ if (base_node->u.base.is_valid)
break;
unlock_iter_base_node(iter);
}
- return &base_node->root;
+ return &base_node->u.base.root;
}
@@ -1526,33 +1558,34 @@ TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key,
DbTableCATreeNode *rejected_base = NULL;
#endif
DbTableCATreeNode *node;
+ DbTableCATreeNode *parent;
DbTableCATreeNode* next_route_node;
+ int current_level;
ASSERT(!iter->locked_bnode);
while (1) {
node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ parent = NULL;
next_route_node = NULL;
while (!node->is_base_node) {
+ current_level++;
+ parent = node;
if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) <= 0) {
+ next_route_node = node;
node = GET_LEFT_ACQB(node);
} else {
- next_route_node = node;
node = GET_RIGHT_ACQB(node);
}
}
ASSERT(node != rejected_base);
- lock_iter_base_node(iter, &node->u.base);
+ lock_iter_base_node(iter, node, parent, current_level);
if (node->u.base.is_valid) {
- if (node->u.base.root || !next_route_node) {
- iter->next_route_key = (next_route_node ?
- next_route_node->u.route.key.term :
- THE_NON_VALUE);
- return &node->u.base.root;
- }
- unlock_iter_base_node(iter);
- iter->next_route_key = next_route_node->u.route.key.term;
- return catree_find_next_root(iter);
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ return &node->u.base.root;
}
/* Retry */
unlock_iter_base_node(iter);
@@ -1562,25 +1595,57 @@ TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key,
}
}
-TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter, int next)
+static Eterm copy_iter_search_key(CATreeRootIterator* iter, Eterm key)
+{
+ Uint key_size;
+
+ if (is_immed(key))
+ return key;
+
+ if (iter->search_key) {
+ ASSERT(key != iter->search_key->term);
+ destroy_route_key(iter->search_key);
+ }
+ key_size = size_object(key);
+ if (!iter->search_key || key_size > iter->search_key->size) {
+ iter->search_key = erts_realloc(ERTS_ALC_T_DB_TMP,
+ iter->search_key,
+ (offsetof(DbRouteKey, heap)
+ + key_size*sizeof(Eterm)));
+ }
+ copy_route_key(iter->search_key, key, key_size);
+ return iter->search_key->term;
+}
+
+TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter, int next,
+ Eterm *keyp)
{
#ifdef DEBUG
DbTableCATreeNode *rejected_base = NULL;
#endif
DbTableCATreeNode *node;
+ DbTableCATreeNode *parent;
DbTableCATreeNode* next_route_node;
Eterm key = iter->next_route_key;
+ int current_level;
- if (iter->locked_bnode)
+ if (iter->locked_bnode) {
+ if (keyp)
+ *keyp = copy_iter_search_key(iter, *keyp);
unlock_iter_base_node(iter);
+ }
if (is_non_value(key))
return NULL;
while (1) {
node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ parent = NULL;
next_route_node = NULL;
while (!node->is_base_node) {
+ current_level++;
+ parent = node;
if (next) {
if (cmp_key_route(key,node) < 0) {
next_route_node = node;
@@ -1599,13 +1664,13 @@ TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter, int next)
}
}
ASSERT(node != rejected_base);
- lock_iter_base_node(iter, &node->u.base);
+ lock_iter_base_node(iter, node, parent, current_level);
if (node->u.base.is_valid) {
if (node->u.base.root) {
iter->next_route_key = (next_route_node ?
next_route_node->u.route.key.term :
THE_NON_VALUE);
- iter->locked_bnode = &node->u.base;
+ iter->locked_bnode = node;
return &node->u.base.root;
}
if (!next_route_node) {
@@ -1622,14 +1687,14 @@ TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter, int next)
}
}
-TreeDbTerm** catree_find_next_root(CATreeRootIterator *iter)
+TreeDbTerm** catree_find_next_root(CATreeRootIterator *iter, Eterm* keyp)
{
- return catree_find_nextprev_root(iter, 1);
+ return catree_find_nextprev_root(iter, 1, keyp);
}
-TreeDbTerm** catree_find_prev_root(CATreeRootIterator *iter)
+TreeDbTerm** catree_find_prev_root(CATreeRootIterator *iter, Eterm* keyp)
{
- return catree_find_nextprev_root(iter, 0);
+ return catree_find_nextprev_root(iter, 0, keyp);
}
@@ -1640,14 +1705,20 @@ TreeDbTerm** catree_find_next_from_pb_key_root(Eterm key,
DbTableCATreeNode *rejected_base = NULL;
#endif
DbTableCATreeNode *node;
+ DbTableCATreeNode *parent;
DbTableCATreeNode* next_route_node;
+ int current_level;
ASSERT(!iter->locked_bnode);
while (1) {
node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ parent = NULL;
next_route_node = NULL;
while (!node->is_base_node) {
+ current_level++;
+ parent = node;
if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) >= 0) {
next_route_node = node;
node = GET_RIGHT_ACQB(node);
@@ -1656,18 +1727,12 @@ TreeDbTerm** catree_find_next_from_pb_key_root(Eterm key,
}
}
ASSERT(node != rejected_base);
- lock_iter_base_node(iter, &node->u.base);
+ lock_iter_base_node(iter, node, parent, current_level);
if (node->u.base.is_valid) {
- if (node->u.base.root || !next_route_node) {
- iter->next_route_key = (next_route_node ?
- next_route_node->u.route.key.term :
- THE_NON_VALUE);
- iter->locked_bnode = &node->u.base;
- return &node->u.base.root;
- }
- unlock_iter_base_node(iter);
- iter->next_route_key = next_route_node->u.route.key.term;
- return catree_find_prev_root(iter);
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ return &node->u.base.root;
}
/* Retry */
unlock_iter_base_node(iter);
@@ -1685,21 +1750,23 @@ static TreeDbTerm** catree_find_firstlast_root(CATreeRootIterator* iter,
#endif
DbTableCATreeNode *node;
DbTableCATreeNode* next_route_node;
+ int current_level;
while (1) {
node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
next_route_node = NULL;
while (!node->is_base_node) {
+ current_level++;
next_route_node = node;
node = first ? GET_LEFT_ACQB(node) : GET_RIGHT_ACQB(node);
}
ASSERT(node != rejected_base);
- lock_iter_base_node(iter, &node->u.base);
+ lock_iter_base_node(iter, node, next_route_node, current_level);
if (node->u.base.is_valid) {
iter->next_route_key = (next_route_node ?
next_route_node->u.route.key.term :
THE_NON_VALUE);
- iter->locked_bnode = &node->u.base;
return &node->u.base.root;
}
/* Retry */
@@ -1792,8 +1859,7 @@ static int db_slot_catree(Process *p, DbTable *tbl,
init_root_iterator(&tbl->catree, &iter, 1);
result = db_slot_tree_common(p, tbl, *catree_find_first_root(&iter),
slot_term, ret, NULL, &iter);
- if (iter.locked_bnode)
- runlock_base_node(iter.locked_bnode);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1808,9 +1874,7 @@ static int db_select_continue_catree(Process *p,
init_root_iterator(&tbl->catree, &iter, 1);
result = db_select_continue_tree_common(p, &tbl->common,
continuation, ret, NULL, &iter);
- if (iter.locked_bnode)
- runlock_base_node(iter.locked_bnode);
-
+ destroy_root_iterator(&iter);
return result;
}
@@ -1823,8 +1887,7 @@ static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
init_root_iterator(&tbl->catree, &iter, 1);
result = db_select_tree_common(p, tbl, tid, pattern, reverse, ret,
NULL, &iter);
- if (iter.locked_bnode)
- runlock_base_node(iter.locked_bnode);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1840,8 +1903,7 @@ static int db_select_count_continue_catree(Process *p,
result = db_select_count_continue_tree_common(p, tbl,
continuation, ret, NULL,
&iter);
- if (iter.locked_bnode)
- runlock_base_node(iter.locked_bnode);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1854,9 +1916,7 @@ static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
init_root_iterator(&tbl->catree, &iter, 1);
result = db_select_count_tree_common(p, tbl,
tid, pattern, ret, NULL, &iter);
- if (iter.locked_bnode)
- runlock_base_node(iter.locked_bnode);
-
+ destroy_root_iterator(&iter);
return result;
}
@@ -1871,8 +1931,7 @@ static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
result = db_select_chunk_tree_common(p, tbl,
tid, pattern, chunk_size, reversed, ret,
NULL, &iter);
- if (iter.locked_bnode)
- runlock_base_node(iter.locked_bnode);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1890,8 +1949,7 @@ static int db_select_delete_continue_catree(Process *p,
init_tree_stack(&stack, stack_array, 0);
result = db_select_delete_continue_tree_common(p, tbl, continuation, ret,
&stack, &iter);
- if (iter.locked_bnode)
- wunlock_base_node(iter.locked_bnode);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1908,8 +1966,7 @@ static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
result = db_select_delete_tree_common(p, tbl,
tid, pattern, ret, &stack,
&iter);
- if (iter.locked_bnode)
- wunlock_base_node(iter.locked_bnode);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1922,8 +1979,7 @@ static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
init_root_iterator(&tbl->catree, &iter, 0);
result = db_select_replace_tree_common(p, tbl,
tid, pattern, ret, NULL, &iter);
- if (iter.locked_bnode)
- wunlock_base_node(iter.locked_bnode);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1936,8 +1992,7 @@ static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
init_root_iterator(&tbl->catree, &iter, 0);
result = db_select_replace_continue_tree_common(p, tbl, continuation, ret,
NULL, &iter);
- if (iter.locked_bnode)
- wunlock_base_node(iter.locked_bnode);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1970,9 +2025,9 @@ static void db_print_catree(fmtfn_t to, void *to_arg,
root = catree_find_first_root(&iter);
do {
db_print_tree_common(to, to_arg, show, *root, tbl);
- root = catree_find_next_root(&iter);
+ root = catree_find_next_root(&iter, NULL);
} while (root);
- ASSERT(!iter.locked_bnode);
+ destroy_root_iterator(&iter);
}
/* Release all memory occupied by a single table */
@@ -2055,9 +2110,9 @@ static void db_foreach_offheap_catree(DbTable *tbl,
root = catree_find_first_root(&iter);
do {
db_foreach_offheap_tree_common(*root, func, arg);
- root = catree_find_next_root(&iter);
+ root = catree_find_next_root(&iter, NULL);
} while (root);
- ASSERT(!iter.locked_bnode);
+ destroy_root_iterator(&iter);
}
static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
@@ -2068,7 +2123,7 @@ static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm ob
ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node);
res = db_lookup_dbterm_tree_common(p, tbl, &base_node->root, key, obj, handle, NULL);
if (res == 0) {
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART;
+ wunlock_adapt_base_node(tb, current_node, prev_node, current_level);
} else {
/* db_finalize_dbterm_catree will unlock */
handle->lck = prev_node;
@@ -2083,10 +2138,8 @@ static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *handle)
DbTableCATree *tb = &(handle->tb->catree);
DbTableCATreeNode *prev_node = handle->lck;
DbTableCATreeNode *current_node = handle->lck2;
- int current_level = handle->current_level;
- DbTableCATreeBaseNode *base_node = &current_node->u.base;
db_finalize_dbterm_tree_common(cret, handle, NULL);
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART;
+ wunlock_adapt_base_node(tb, current_node, prev_node, handle->current_level);
return;
}
@@ -2128,10 +2181,10 @@ void db_catree_force_split(DbTableCATree* tb, int on)
init_root_iterator(tb, &iter, 1);
root = catree_find_first_root(&iter);
do {
- iter.locked_bnode->lock_statistics = (on ? INT_MAX : 0);
- root = catree_find_next_root(&iter);
+ iter.locked_bnode->u.base.lock_statistics = (on ? INT_MAX : 0);
+ root = catree_find_next_root(&iter, NULL);
} while (root);
- ASSERT(!iter.locked_bnode);
+ destroy_root_iterator(&iter);
if (on)
tb->common.status |= DB_CATREE_FORCE_SPLIT;
diff --git a/erts/emulator/beam/erl_db_catree.h b/erts/emulator/beam/erl_db_catree.h
index feb6f27980..6913609bf8 100644
--- a/erts/emulator/beam/erl_db_catree.h
+++ b/erts/emulator/beam/erl_db_catree.h
@@ -95,8 +95,11 @@ typedef struct db_table_catree {
typedef struct {
DbTableCATree* tb;
Eterm next_route_key;
- DbTableCATreeBaseNode* locked_bnode;
+ DbTableCATreeNode* locked_bnode;
+ DbTableCATreeNode* bnode_parent;
+ int bnode_level;
int read_only;
+ DbRouteKey* search_key;
} CATreeRootIterator;
@@ -109,9 +112,9 @@ TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator*);
TreeDbTerm** catree_find_next_from_pb_key_root(Eterm key, CATreeRootIterator*);
TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key, CATreeRootIterator*);
-TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator*, int next);
-TreeDbTerm** catree_find_next_root(CATreeRootIterator*);
-TreeDbTerm** catree_find_prev_root(CATreeRootIterator*);
+TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator*, int next, Eterm* keyp);
+TreeDbTerm** catree_find_next_root(CATreeRootIterator*, Eterm* keyp);
+TreeDbTerm** catree_find_prev_root(CATreeRootIterator*, Eterm* keyp);
TreeDbTerm** catree_find_first_root(CATreeRootIterator*);
TreeDbTerm** catree_find_last_root(CATreeRootIterator*);
diff --git a/erts/emulator/beam/erl_db_tree.c b/erts/emulator/beam/erl_db_tree.c
index 250d90e189..854b8d6329 100644
--- a/erts/emulator/beam/erl_db_tree.c
+++ b/erts/emulator/beam/erl_db_tree.c
@@ -1046,7 +1046,7 @@ int db_select_continue_tree_common(Process *p,
if (iter) {
iter->next_route_key = lastkey;
- sc.common.root = catree_find_nextprev_root(iter, !!reverse != !!chunk_size);
+ sc.common.root = catree_find_nextprev_root(iter, !!reverse != !!chunk_size, NULL);
}
else
sc.common.root = &((DbTableTree*)tb)->root;
@@ -1354,7 +1354,7 @@ int db_select_count_continue_tree_common(Process *p,
if (iter) {
iter->next_route_key = lastkey;
- sc.common.root = catree_find_prev_root(iter);
+ sc.common.root = catree_find_prev_root(iter, NULL);
}
else {
sc.common.root = &tb->tree.root;
@@ -1765,7 +1765,7 @@ int db_select_delete_continue_tree_common(Process *p,
if (iter) {
iter->next_route_key = lastkey;
- sc.common.root = catree_find_prev_root(iter);
+ sc.common.root = catree_find_prev_root(iter, NULL);
}
else {
sc.common.root = &tbl->tree.root;
@@ -2001,7 +2001,7 @@ int db_select_replace_continue_tree_common(Process *p,
if (iter) {
iter->next_route_key = lastkey;
- sc.common.root = catree_find_prev_root(iter);
+ sc.common.root = catree_find_prev_root(iter, NULL);
}
else {
sc.common.root = &tbl->tree.root;
@@ -2734,8 +2734,22 @@ static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root,
{
TreeDbTerm *this;
TreeDbTerm *tmp;
+ TreeDbTerm *lastobj;
+ Eterm lastkey;
TreeDbTerm **pp;
- DbTreeStack* stack = get_any_stack(tb,stack_container);
+ DbTreeStack* stack;
+
+ if (iter) {
+ /* Find first non-empty tree */
+ while (!root) {
+ TreeDbTerm** pp = catree_find_next_root(iter, NULL);
+ if (!pp)
+ return NULL;
+ root = *pp;
+ }
+ }
+
+ stack = get_any_stack(tb,stack_container);
ASSERT(stack != NULL);
if (slot == 1) { /* Don't search from where we are if we are
@@ -2762,6 +2776,7 @@ static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root,
this = TOP_NODE(stack);
while (stack->slot != slot) {
ASSERT(this);
+ lastobj = this;
if (slot > stack->slot) {
if (this->right != NULL) {
this = this->right;
@@ -2810,14 +2825,19 @@ next_root:
if (!iter)
break; /* EOT */
- pp = catree_find_next_root(iter);
+ ASSERT(slot > stack->slot);
+ if (lastobj) {
+ lastkey = GETKEY(tb, lastobj->dbterm.tpl);
+ lastobj = NULL;
+ }
+ pp = catree_find_next_root(iter, &lastkey);
if (!pp)
break; /* EOT */
root = *pp;
stack->pos = 0;
+ find_next(&tb->common, root, stack, lastkey);
}
-
release_stack(tb,stack_container,stack);
return this;
}
@@ -2952,7 +2972,8 @@ static TreeDbTerm *find_next_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp,
if (iter) {
*rootpp = catree_find_next_from_pb_key_root(key, iter);
- root = *rootpp ? **rootpp : NULL;
+ ASSERT(*rootpp);
+ root = **rootpp;
}
else {
*rootpp = &tbl->tree.root;
@@ -2991,7 +3012,8 @@ static TreeDbTerm *find_prev_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp,
if (iter) {
*rootpp = catree_find_prev_from_pb_key_root(key, iter);
- root = *rootpp ? **rootpp : NULL;
+ ASSERT(*rootpp);
+ root = **rootpp;
}
else {
*rootpp = &tbl->tree.root;
@@ -3212,36 +3234,45 @@ static void traverse_backwards(DbTableCommon *tb,
TreeDbTerm *this, *next;
TreeDbTerm** root = context->root;
- do {
- if (lastkey == THE_NON_VALUE) {
- stack->pos = stack->slot = 0;
- if (( this = *root ) == NULL) {
- goto next_root;
- }
- while (this != NULL) {
- PUSH_NODE(stack, this);
- this = this->right;
+ if (lastkey == THE_NON_VALUE) {
+ if (iter) {
+ while (*root == NULL) {
+ root = catree_find_prev_root(iter, NULL);
+ if (!root)
+ return;
}
- this = TOP_NODE(stack);
- next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 0)))
- return;
- } else {
- next = find_prev(tb, *root, stack, lastkey);
+ context->root = root;
}
+ stack->pos = stack->slot = 0;
+ next = *root;
+ while (next != NULL) {
+ PUSH_NODE(stack, next);
+ next = next->right;
+ }
+ next = TOP_NODE(stack);
+ } else {
+ next = find_prev(tb, *root, stack, lastkey);
+ }
- while ((this = next) != NULL) {
- next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
+ while (1) {
+ while (next) {
+ this = next;
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ next = find_prev(tb, *root, stack, lastkey);
if (!((*doit)(tb, this, context, 0)))
return;
}
-next_root:
if (!iter)
return;
- root = context->root = catree_find_prev_root(iter);
- lastkey = THE_NON_VALUE;
- } while (root);
+ ASSERT(is_value(lastkey));
+ root = catree_find_prev_root(iter, &lastkey);
+ if (!root)
+ return;
+ context->root = root;
+ stack->pos = stack->slot = 0;
+ next = find_prev(tb, *root, stack, lastkey);
+ }
}
/*
@@ -3257,36 +3288,45 @@ static void traverse_forward(DbTableCommon *tb,
TreeDbTerm *this, *next;
TreeDbTerm **root = context->root;
- do {
- if (lastkey == THE_NON_VALUE) {
- stack->pos = stack->slot = 0;
- if (( this = *root ) == NULL) {
- goto next_root;
+ if (lastkey == THE_NON_VALUE) {
+ if (iter) {
+ while (*root == NULL) {
+ root = catree_find_next_root(iter, NULL);
+ if (!root)
+ return;
}
- while (this != NULL) {
- PUSH_NODE(stack, this);
- this = this->left;
- }
- this = TOP_NODE(stack);
- next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 1)))
- return;
- } else {
- next = find_next(tb, *root, stack, lastkey);
+ context->root = root;
}
+ stack->pos = stack->slot = 0;
+ next = *root;
+ while (next != NULL) {
+ PUSH_NODE(stack, next);
+ next = next->left;
+ }
+ next = TOP_NODE(stack);
+ } else {
+ next = find_next(tb, *root, stack, lastkey);
+ }
- while ((this = next) != NULL) {
- next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
+ while (1) {
+ while (next) {
+ this = next;
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ next = find_next(tb, *root, stack, lastkey);
if (!((*doit)(tb, this, context, 1)))
return;
}
-next_root:
if (!iter)
return;
- root = context->root = catree_find_next_root(iter);
- lastkey = THE_NON_VALUE;
- } while(root);
+ ASSERT(is_value(lastkey));
+ root = catree_find_next_root(iter, &lastkey);
+ if (!root)
+ return;
+ context->root = root;
+ stack->pos = stack->slot = 0;
+ next = find_next(tb, *root, stack, lastkey);
+ }
}
/*
@@ -3306,44 +3346,51 @@ static void traverse_update_backwards(DbTableCommon *tb,
TreeDbTerm *this, *next, **this_ptr;
TreeDbTerm** root = context->root;
- do {
- if (lastkey == THE_NON_VALUE) {
- stack->pos = stack->slot = 0;
- if (( this = *root ) == NULL) {
- goto next_root;
+ if (lastkey == THE_NON_VALUE) {
+ if (iter) {
+ while (*root == NULL) {
+ root = catree_find_prev_root(iter, NULL);
+ if (!root)
+ return;
+ context->root = root;
}
- while (this != NULL) {
- PUSH_NODE(stack, this);
- this = this->right;
- }
- this = TOP_NODE(stack);
- this_ptr = find_ptr(tb, root, stack, this);
- ASSERT(this_ptr != NULL);
- res = (*doit)(tb, this_ptr, context, 0);
- REPLACE_TOP_NODE(stack, *this_ptr);
- next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
- if (!res)
- return;
- } else {
- next = find_prev(tb, *root, stack, lastkey);
}
+ stack->pos = stack->slot = 0;
+ next = *root;
+ while (next) {
+ PUSH_NODE(stack, next);
+ next = next->right;
+ }
+ next = TOP_NODE(stack);
+ }
+ else
+ next = find_prev(tb, *root, stack, lastkey);
- while ((this = next) != NULL) {
+
+ while (1) {
+ while (next) {
+ this = next;
this_ptr = find_ptr(tb, root, stack, this);
ASSERT(this_ptr != NULL);
res = (*doit)(tb, this_ptr, context, 0);
- REPLACE_TOP_NODE(stack, *this_ptr);
- next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
+ this = *this_ptr;
+ REPLACE_TOP_NODE(stack, this);
if (!res)
return;
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ next = find_prev(tb, *root, stack, lastkey);
}
-next_root:
if (!iter)
return;
- root = context->root = catree_find_prev_root(iter);
- lastkey = THE_NON_VALUE;
- } while (root);
+ ASSERT(is_value(lastkey));
+ root = catree_find_prev_root(iter, &lastkey);
+ if (!root)
+ return;
+ context->root = root;
+ stack->pos = stack->slot = 0;
+ next = find_prev(tb, *root, stack, lastkey);
+ }
}
static enum ms_key_boundness key_boundness(DbTableCommon *tb,
diff --git a/lib/stdlib/test/ets_SUITE.erl b/lib/stdlib/test/ets_SUITE.erl
index 5352524ee2..de7c647610 100644
--- a/lib/stdlib/test/ets_SUITE.erl
+++ b/lib/stdlib/test/ets_SUITE.erl
@@ -6017,7 +6017,9 @@ smp_select_replace_do(Opts) ->
1 -> Cnt0+1;
0 ->
ets:insert_new(T, {CounterId, 0}),
- Cnt0
+ Cnt0;
+ _ ->
+ erlang:display("Nooooo!")
end,
receive stop ->
[end_of_work | Cnt1]