diff options
author | Sverker Eriksson <[email protected]> | 2018-10-16 21:51:34 +0200 |
---|---|---|
committer | Sverker Eriksson <[email protected]> | 2018-10-19 19:44:51 +0200 |
commit | c45856710651882d16686be82626ae6058590005 (patch) | |
tree | 8aea5b69efd1141c2d521812221419d4e173aa5a /erts/emulator/beam/erl_db_catree.c | |
parent | c7ea2f9bde2cc3125a12d820cca36582ee1046c5 (diff) | |
download | otp-c45856710651882d16686be82626ae6058590005.tar.gz otp-c45856710651882d16686be82626ae6058590005.tar.bz2 otp-c45856710651882d16686be82626ae6058590005.zip |
erts: Remove tree merging for ets:select*
Diffstat (limited to 'erts/emulator/beam/erl_db_catree.c')
-rw-r--r-- | erts/emulator/beam/erl_db_catree.c | 404 |
1 files changed, 353 insertions, 51 deletions
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c index a8e48bce1b..2c3b262312 100644 --- a/erts/emulator/beam/erl_db_catree.c +++ b/erts/emulator/beam/erl_db_catree.c @@ -158,6 +158,7 @@ db_lookup_dbterm_catree(Process *, DbTable *, Eterm key, Eterm obj, DbUpdateHandle*); static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *); + /* ** External interface */ @@ -710,6 +711,40 @@ void unlock_route_node(DbTableCATreeNode *route_node) erts_mtx_unlock(&route_node->u.route.lock); } +static ERTS_INLINE +void init_root_iterator(DbTableCATree* tb, CATreeRootIterator* iter, + int read_only) +{ + iter->tb = tb; + iter->read_only = read_only; + iter->locked_bnode = NULL; + iter->next_route_key = THE_NON_VALUE; +} + +static ERTS_INLINE +void lock_iter_base_node(CATreeRootIterator* iter, + DbTableCATreeBaseNode *base_node) +{ + ASSERT(!iter->locked_bnode); + if (iter->read_only) + rlock_base_node(base_node); + else + wlock_base_node(base_node); + iter->locked_bnode = base_node; +} + +static ERTS_INLINE +void unlock_iter_base_node(CATreeRootIterator* iter) +{ + ASSERT(iter->locked_bnode); + if (iter->read_only) + runlock_base_node(iter->locked_bnode); + else + wunlock_base_node(iter->locked_bnode); + iter->locked_bnode = NULL; +} + + /* * The following macros are used to create the ETS functions that only @@ -717,6 +752,32 @@ void unlock_route_node(DbTableCATreeNode *route_node) * db_erase_catree). */ + +typedef struct +{ + DbTableCATreeNode *parent; + int current_level; +} FindBaseNode; + +static ERTS_INLINE +DbTableCATreeBaseNode* find_base_node(DbTableCATree* tb, Eterm key, + FindBaseNode* fbn) +{ + DbTableCATreeNode* ERTS_RESTRICT node = GET_ROOT_ACQB(tb); + fbn->parent = NULL; + fbn->current_level = 0; + while (!node->is_base_node) { + fbn->current_level++; + fbn->parent = node; + if (cmp_key_route(key, node) < 0) { + node = GET_LEFT_ACQB(node); + } else { + node = GET_RIGHT_ACQB(node); + } + } + return &node->u.base; +} + #define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(LOCK,UNLOCK) \ int retry; \ DbTableCATreeNode *current_node; \ @@ -1675,7 +1736,10 @@ void db_initialize_catree(void) int db_create_catree(Process *p, DbTable *tbl) { DbTableCATree *tb = &tbl->catree; - DbTableCATreeNode *root = create_base_node(tb, NULL, NIL); + DbTableCATreeNode *root; + + root = create_base_node(tb, NULL, + NIL);/* lc_key of first base node does not matter */ tb->deletion = 0; tb->base_nodes_to_free_list = NULL; erts_atomic_init_relb(&(tb->root), (erts_aint_t)root); @@ -1781,6 +1845,226 @@ static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) return erl_db_catree_do_operation_get(tb, key, p, ret); } + + +TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator* iter) +{ + FindBaseNode fbn; + DbTableCATreeBaseNode* base_node; + + while (1) { + base_node = find_base_node(iter->tb, key, &fbn); + lock_iter_base_node(iter, base_node); + if (base_node->is_valid) + break; + unlock_iter_base_node(iter); + } + return &base_node->root; +} + + +TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key, + CATreeRootIterator* iter) +{ +#ifdef DEBUG + DbTableCATreeNode *rejected_base = NULL; +#endif + DbTableCATreeNode *node; + DbTableCATreeNode* next_route_node; + + ASSERT(!iter->locked_bnode); + + while (1) { + node = GET_ROOT_ACQB(iter->tb); + next_route_node = NULL; + while (!node->is_base_node) { + if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) <= 0) { + node = GET_LEFT_ACQB(node); + } else { + next_route_node = node; + node = GET_RIGHT_ACQB(node); + } + } + ASSERT(node != rejected_base); + lock_iter_base_node(iter, &node->u.base); + if (node->u.base.is_valid) { + if (node->u.base.root || !next_route_node) { + iter->next_route_key = (next_route_node ? + next_route_node->u.route.key.term : + THE_NON_VALUE); + return &node->u.base.root; + } + unlock_iter_base_node(iter); + iter->next_route_key = next_route_node->u.route.key.term; + return catree_find_next_root(iter); + } + /* Retry */ + unlock_iter_base_node(iter); +#ifdef DEBUG + rejected_base = node; +#endif + } +} + +TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter, int next) +{ +#ifdef DEBUG + DbTableCATreeNode *rejected_base = NULL; +#endif + DbTableCATreeNode *node; + DbTableCATreeNode* next_route_node; + Eterm key = iter->next_route_key; + + if (iter->locked_bnode) + unlock_iter_base_node(iter); + + if (is_non_value(key)) + return NULL; + + while (1) { + node = GET_ROOT_ACQB(iter->tb); + next_route_node = NULL; + while (!node->is_base_node) { + if (next) { + if (cmp_key_route(key,node) < 0) { + next_route_node = node; + node = GET_LEFT_ACQB(node); + } else { + node = GET_RIGHT_ACQB(node); + } + } + else { + if (cmp_key_route(key,node) > 0) { + next_route_node = node; + node = GET_RIGHT_ACQB(node); + } else { + node = GET_LEFT_ACQB(node); + } + } + } + ASSERT(node != rejected_base); + lock_iter_base_node(iter, &node->u.base); + if (node->u.base.is_valid) { + if (node->u.base.root) { + iter->next_route_key = (next_route_node ? + next_route_node->u.route.key.term : + THE_NON_VALUE); + iter->locked_bnode = &node->u.base; + return &node->u.base.root; + } + if (!next_route_node) { + unlock_iter_base_node(iter); + return NULL; + } + key = next_route_node->u.route.key.term; + } + /* Retry */ + unlock_iter_base_node(iter); +#ifdef DEBUG + rejected_base = node; +#endif + } +} + +TreeDbTerm** catree_find_next_root(CATreeRootIterator *iter) +{ + return catree_find_nextprev_root(iter, 1); +} + +TreeDbTerm** catree_find_prev_root(CATreeRootIterator *iter) +{ + return catree_find_nextprev_root(iter, 0); +} + + +TreeDbTerm** catree_find_next_from_pb_key_root(Eterm key, + CATreeRootIterator* iter) +{ +#ifdef DEBUG + DbTableCATreeNode *rejected_base = NULL; +#endif + DbTableCATreeNode *node; + DbTableCATreeNode* next_route_node; + + ASSERT(!iter->locked_bnode); + + while (1) { + node = GET_ROOT_ACQB(iter->tb); + next_route_node = NULL; + while (!node->is_base_node) { + if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) >= 0) { + next_route_node = node; + node = GET_RIGHT_ACQB(node); + } else { + node = GET_LEFT_ACQB(node); + } + } + ASSERT(node != rejected_base); + lock_iter_base_node(iter, &node->u.base); + if (node->u.base.is_valid) { + if (node->u.base.root || !next_route_node) { + iter->next_route_key = (next_route_node ? + next_route_node->u.route.key.term : + THE_NON_VALUE); + iter->locked_bnode = &node->u.base; + return &node->u.base.root; + } + unlock_iter_base_node(iter); + iter->next_route_key = next_route_node->u.route.key.term; + return catree_find_prev_root(iter); + } + /* Retry */ + unlock_iter_base_node(iter); +#ifdef DEBUG + rejected_base = node; +#endif + } +} + +static TreeDbTerm** catree_find_firstlast_root(CATreeRootIterator* iter, + int first) +{ +#ifdef DEBUG + DbTableCATreeNode *rejected_base = NULL; +#endif + DbTableCATreeNode *node; + DbTableCATreeNode* next_route_node; + + while (1) { + node = GET_ROOT_ACQB(iter->tb); + next_route_node = NULL; + while (!node->is_base_node) { + next_route_node = node; + node = first ? GET_LEFT_ACQB(node) : GET_RIGHT_ACQB(node); + } + ASSERT(node != rejected_base); + lock_iter_base_node(iter, &node->u.base); + if (node->u.base.is_valid) { + iter->next_route_key = (next_route_node ? + next_route_node->u.route.key.term : + THE_NON_VALUE); + iter->locked_bnode = &node->u.base; + return &node->u.base.root; + } + /* Retry */ + unlock_iter_base_node(iter); +#ifdef DEBUG + rejected_base = node; +#endif + } +} + +TreeDbTerm** catree_find_first_root(CATreeRootIterator* iter) +{ + return catree_find_firstlast_root(iter, 1); +} + +TreeDbTerm** catree_find_last_root(CATreeRootIterator* iter) +{ + return catree_find_firstlast_root(iter, 0); +} + + #define ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS Eterm *ret ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(member, ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS, @@ -1861,25 +2145,28 @@ static int db_select_continue_catree(Process *p, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_continue_tree_common(p, &tbl->common, &base->u.base.root, - continuation, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_continue_tree_common(p, &tbl->common, + continuation, ret, NULL, &iter); + if (iter.locked_bnode) + runlock_base_node(iter.locked_bnode); + return result; } - static int db_select_catree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, int reverse, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_tree_common(p, &tbl->common, &base->u.base.root, - tid, pattern, reverse, ret, - NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_tree_common(p, tbl, tid, pattern, reverse, ret, + NULL, &iter); + if (iter.locked_bnode) + runlock_base_node(iter.locked_bnode); return result; } @@ -1889,12 +2176,14 @@ static int db_select_count_continue_catree(Process *p, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_count_continue_tree_common(p, &tbl->common, - &base->u.base.root, - continuation, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_count_continue_tree_common(p, tbl, + continuation, ret, NULL, + &iter); + if (iter.locked_bnode) + runlock_base_node(iter.locked_bnode); return result; } @@ -1902,11 +2191,14 @@ static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_count_tree_common(p, &tbl->common, &base->u.base.root, - tid, pattern, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_count_tree_common(p, tbl, + tid, pattern, ret, NULL, &iter); + if (iter.locked_bnode) + runlock_base_node(iter.locked_bnode); + return result; } @@ -1915,11 +2207,14 @@ static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid, int reversed, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_chunk_tree_common(p, &tbl->common, &base->u.base.root, - tid, pattern, chunk_size, reversed, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_chunk_tree_common(p, tbl, + tid, pattern, chunk_size, reversed, ret, + NULL, &iter); + if (iter.locked_bnode) + runlock_base_node(iter.locked_bnode); return result; } @@ -1931,12 +2226,14 @@ static int db_select_delete_continue_catree(Process *p, DbTreeStack stack; TreeDbTerm * stack_array[STACK_NEED]; int result; - DbTableCATreeNode *base; + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 0); init_tree_stack(&stack, stack_array, 0); - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_delete_continue_tree_common(p, tbl, &base->u.base.root, - continuation, ret, &stack); - wunlock_base_node(&(base->u.base)); + result = db_select_delete_continue_tree_common(p, tbl, continuation, ret, + &stack, &iter); + if (iter.locked_bnode) + wunlock_base_node(iter.locked_bnode); return result; } @@ -1946,12 +2243,15 @@ static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid, DbTreeStack stack; TreeDbTerm * stack_array[STACK_NEED]; int result; - DbTableCATreeNode *base; + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 0); init_tree_stack(&stack, stack_array, 0); - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_delete_tree_common(p, tbl, &base->u.base.root, - tid, pattern, ret, &stack); - wunlock_base_node(&(base->u.base)); + result = db_select_delete_tree_common(p, tbl, + tid, pattern, ret, &stack, + &iter); + if (iter.locked_bnode) + wunlock_base_node(iter.locked_bnode); return result; } @@ -1959,12 +2259,13 @@ static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_replace_tree_common(p, &tbl->common, - &base->u.base.root, - tid, pattern, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 0); + result = db_select_replace_tree_common(p, tbl, + tid, pattern, ret, NULL, &iter); + if (iter.locked_bnode) + wunlock_base_node(iter.locked_bnode); return result; } @@ -1972,12 +2273,13 @@ static int db_select_replace_continue_catree(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_replace_continue_tree_common(p, &tbl->common, - &base->u.base.root, - continuation, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 0); + result = db_select_replace_continue_tree_common(p, tbl, continuation, ret, + NULL, &iter); + if (iter.locked_bnode) + wunlock_base_node(iter.locked_bnode); return result; } |