aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_db_catree.c
diff options
context:
space:
mode:
authorSverker Eriksson <[email protected]>2018-10-16 21:51:34 +0200
committerSverker Eriksson <[email protected]>2018-10-19 19:44:51 +0200
commitc45856710651882d16686be82626ae6058590005 (patch)
tree8aea5b69efd1141c2d521812221419d4e173aa5a /erts/emulator/beam/erl_db_catree.c
parentc7ea2f9bde2cc3125a12d820cca36582ee1046c5 (diff)
downloadotp-c45856710651882d16686be82626ae6058590005.tar.gz
otp-c45856710651882d16686be82626ae6058590005.tar.bz2
otp-c45856710651882d16686be82626ae6058590005.zip
erts: Remove tree merging for ets:select*
Diffstat (limited to 'erts/emulator/beam/erl_db_catree.c')
-rw-r--r--erts/emulator/beam/erl_db_catree.c404
1 files changed, 353 insertions, 51 deletions
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c
index a8e48bce1b..2c3b262312 100644
--- a/erts/emulator/beam/erl_db_catree.c
+++ b/erts/emulator/beam/erl_db_catree.c
@@ -158,6 +158,7 @@ db_lookup_dbterm_catree(Process *, DbTable *, Eterm key, Eterm obj,
DbUpdateHandle*);
static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *);
+
/*
** External interface
*/
@@ -710,6 +711,40 @@ void unlock_route_node(DbTableCATreeNode *route_node)
erts_mtx_unlock(&route_node->u.route.lock);
}
+static ERTS_INLINE
+void init_root_iterator(DbTableCATree* tb, CATreeRootIterator* iter,
+ int read_only)
+{
+ iter->tb = tb;
+ iter->read_only = read_only;
+ iter->locked_bnode = NULL;
+ iter->next_route_key = THE_NON_VALUE;
+}
+
+static ERTS_INLINE
+void lock_iter_base_node(CATreeRootIterator* iter,
+ DbTableCATreeBaseNode *base_node)
+{
+ ASSERT(!iter->locked_bnode);
+ if (iter->read_only)
+ rlock_base_node(base_node);
+ else
+ wlock_base_node(base_node);
+ iter->locked_bnode = base_node;
+}
+
+static ERTS_INLINE
+void unlock_iter_base_node(CATreeRootIterator* iter)
+{
+ ASSERT(iter->locked_bnode);
+ if (iter->read_only)
+ runlock_base_node(iter->locked_bnode);
+ else
+ wunlock_base_node(iter->locked_bnode);
+ iter->locked_bnode = NULL;
+}
+
+
/*
* The following macros are used to create the ETS functions that only
@@ -717,6 +752,32 @@ void unlock_route_node(DbTableCATreeNode *route_node)
* db_erase_catree).
*/
+
+typedef struct
+{
+ DbTableCATreeNode *parent;
+ int current_level;
+} FindBaseNode;
+
+static ERTS_INLINE
+DbTableCATreeBaseNode* find_base_node(DbTableCATree* tb, Eterm key,
+ FindBaseNode* fbn)
+{
+ DbTableCATreeNode* ERTS_RESTRICT node = GET_ROOT_ACQB(tb);
+ fbn->parent = NULL;
+ fbn->current_level = 0;
+ while (!node->is_base_node) {
+ fbn->current_level++;
+ fbn->parent = node;
+ if (cmp_key_route(key, node) < 0) {
+ node = GET_LEFT_ACQB(node);
+ } else {
+ node = GET_RIGHT_ACQB(node);
+ }
+ }
+ return &node->u.base;
+}
+
#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(LOCK,UNLOCK) \
int retry; \
DbTableCATreeNode *current_node; \
@@ -1675,7 +1736,10 @@ void db_initialize_catree(void)
int db_create_catree(Process *p, DbTable *tbl)
{
DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *root = create_base_node(tb, NULL, NIL);
+ DbTableCATreeNode *root;
+
+ root = create_base_node(tb, NULL,
+ NIL);/* lc_key of first base node does not matter */
tb->deletion = 0;
tb->base_nodes_to_free_list = NULL;
erts_atomic_init_relb(&(tb->root), (erts_aint_t)root);
@@ -1781,6 +1845,226 @@ static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
return erl_db_catree_do_operation_get(tb, key, p, ret);
}
+
+
+TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator* iter)
+{
+ FindBaseNode fbn;
+ DbTableCATreeBaseNode* base_node;
+
+ while (1) {
+ base_node = find_base_node(iter->tb, key, &fbn);
+ lock_iter_base_node(iter, base_node);
+ if (base_node->is_valid)
+ break;
+ unlock_iter_base_node(iter);
+ }
+ return &base_node->root;
+}
+
+
+TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key,
+ CATreeRootIterator* iter)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode* next_route_node;
+
+ ASSERT(!iter->locked_bnode);
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) <= 0) {
+ node = GET_LEFT_ACQB(node);
+ } else {
+ next_route_node = node;
+ node = GET_RIGHT_ACQB(node);
+ }
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, &node->u.base);
+ if (node->u.base.is_valid) {
+ if (node->u.base.root || !next_route_node) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ return &node->u.base.root;
+ }
+ unlock_iter_base_node(iter);
+ iter->next_route_key = next_route_node->u.route.key.term;
+ return catree_find_next_root(iter);
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter, int next)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode* next_route_node;
+ Eterm key = iter->next_route_key;
+
+ if (iter->locked_bnode)
+ unlock_iter_base_node(iter);
+
+ if (is_non_value(key))
+ return NULL;
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ if (next) {
+ if (cmp_key_route(key,node) < 0) {
+ next_route_node = node;
+ node = GET_LEFT_ACQB(node);
+ } else {
+ node = GET_RIGHT_ACQB(node);
+ }
+ }
+ else {
+ if (cmp_key_route(key,node) > 0) {
+ next_route_node = node;
+ node = GET_RIGHT_ACQB(node);
+ } else {
+ node = GET_LEFT_ACQB(node);
+ }
+ }
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, &node->u.base);
+ if (node->u.base.is_valid) {
+ if (node->u.base.root) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ iter->locked_bnode = &node->u.base;
+ return &node->u.base.root;
+ }
+ if (!next_route_node) {
+ unlock_iter_base_node(iter);
+ return NULL;
+ }
+ key = next_route_node->u.route.key.term;
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+TreeDbTerm** catree_find_next_root(CATreeRootIterator *iter)
+{
+ return catree_find_nextprev_root(iter, 1);
+}
+
+TreeDbTerm** catree_find_prev_root(CATreeRootIterator *iter)
+{
+ return catree_find_nextprev_root(iter, 0);
+}
+
+
+TreeDbTerm** catree_find_next_from_pb_key_root(Eterm key,
+ CATreeRootIterator* iter)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode* next_route_node;
+
+ ASSERT(!iter->locked_bnode);
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) >= 0) {
+ next_route_node = node;
+ node = GET_RIGHT_ACQB(node);
+ } else {
+ node = GET_LEFT_ACQB(node);
+ }
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, &node->u.base);
+ if (node->u.base.is_valid) {
+ if (node->u.base.root || !next_route_node) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ iter->locked_bnode = &node->u.base;
+ return &node->u.base.root;
+ }
+ unlock_iter_base_node(iter);
+ iter->next_route_key = next_route_node->u.route.key.term;
+ return catree_find_prev_root(iter);
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+static TreeDbTerm** catree_find_firstlast_root(CATreeRootIterator* iter,
+ int first)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode* next_route_node;
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ next_route_node = node;
+ node = first ? GET_LEFT_ACQB(node) : GET_RIGHT_ACQB(node);
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, &node->u.base);
+ if (node->u.base.is_valid) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ iter->locked_bnode = &node->u.base;
+ return &node->u.base.root;
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+TreeDbTerm** catree_find_first_root(CATreeRootIterator* iter)
+{
+ return catree_find_firstlast_root(iter, 1);
+}
+
+TreeDbTerm** catree_find_last_root(CATreeRootIterator* iter)
+{
+ return catree_find_firstlast_root(iter, 0);
+}
+
+
#define ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS Eterm *ret
ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(member,
ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS,
@@ -1861,25 +2145,28 @@ static int db_select_continue_catree(Process *p,
Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_continue_tree_common(p, &tbl->common, &base->u.base.root,
- continuation, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_continue_tree_common(p, &tbl->common,
+ continuation, ret, NULL, &iter);
+ if (iter.locked_bnode)
+ runlock_base_node(iter.locked_bnode);
+
return result;
}
-
static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, int reverse, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_tree_common(p, &tbl->common, &base->u.base.root,
- tid, pattern, reverse, ret,
- NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_tree_common(p, tbl, tid, pattern, reverse, ret,
+ NULL, &iter);
+ if (iter.locked_bnode)
+ runlock_base_node(iter.locked_bnode);
return result;
}
@@ -1889,12 +2176,14 @@ static int db_select_count_continue_catree(Process *p,
Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_count_continue_tree_common(p, &tbl->common,
- &base->u.base.root,
- continuation, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_count_continue_tree_common(p, tbl,
+ continuation, ret, NULL,
+ &iter);
+ if (iter.locked_bnode)
+ runlock_base_node(iter.locked_bnode);
return result;
}
@@ -1902,11 +2191,14 @@ static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_count_tree_common(p, &tbl->common, &base->u.base.root,
- tid, pattern, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_count_tree_common(p, tbl,
+ tid, pattern, ret, NULL, &iter);
+ if (iter.locked_bnode)
+ runlock_base_node(iter.locked_bnode);
+
return result;
}
@@ -1915,11 +2207,14 @@ static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
int reversed, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_chunk_tree_common(p, &tbl->common, &base->u.base.root,
- tid, pattern, chunk_size, reversed, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_chunk_tree_common(p, tbl,
+ tid, pattern, chunk_size, reversed, ret,
+ NULL, &iter);
+ if (iter.locked_bnode)
+ runlock_base_node(iter.locked_bnode);
return result;
}
@@ -1931,12 +2226,14 @@ static int db_select_delete_continue_catree(Process *p,
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
int result;
- DbTableCATreeNode *base;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_delete_continue_tree_common(p, tbl, &base->u.base.root,
- continuation, ret, &stack);
- wunlock_base_node(&(base->u.base));
+ result = db_select_delete_continue_tree_common(p, tbl, continuation, ret,
+ &stack, &iter);
+ if (iter.locked_bnode)
+ wunlock_base_node(iter.locked_bnode);
return result;
}
@@ -1946,12 +2243,15 @@ static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
int result;
- DbTableCATreeNode *base;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_delete_tree_common(p, tbl, &base->u.base.root,
- tid, pattern, ret, &stack);
- wunlock_base_node(&(base->u.base));
+ result = db_select_delete_tree_common(p, tbl,
+ tid, pattern, ret, &stack,
+ &iter);
+ if (iter.locked_bnode)
+ wunlock_base_node(iter.locked_bnode);
return result;
}
@@ -1959,12 +2259,13 @@ static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_replace_tree_common(p, &tbl->common,
- &base->u.base.root,
- tid, pattern, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
+ result = db_select_replace_tree_common(p, tbl,
+ tid, pattern, ret, NULL, &iter);
+ if (iter.locked_bnode)
+ wunlock_base_node(iter.locked_bnode);
return result;
}
@@ -1972,12 +2273,13 @@ static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_replace_continue_tree_common(p, &tbl->common,
- &base->u.base.root,
- continuation, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
+ result = db_select_replace_continue_tree_common(p, tbl, continuation, ret,
+ NULL, &iter);
+ if (iter.locked_bnode)
+ wunlock_base_node(iter.locked_bnode);
return result;
}