diff options
author | Sverker Eriksson <[email protected]> | 2018-10-18 11:29:06 +0200 |
---|---|---|
committer | Sverker Eriksson <[email protected]> | 2018-10-23 12:36:29 +0200 |
commit | 77d3d262c5e7d784204a6d91b79ed2f46b4013ad (patch) | |
tree | 14a1b8f87c9ef617a966f95665e253af25a9e8ec /erts | |
parent | 8cd07dae15569deea3a0b539057299a238c8ddc1 (diff) | |
download | otp-77d3d262c5e7d784204a6d91b79ed2f46b4013ad.tar.gz otp-77d3d262c5e7d784204a6d91b79ed2f46b4013ad.tar.bz2 otp-77d3d262c5e7d784204a6d91b79ed2f46b4013ad.zip |
erts: Do contention adaptions during (updating) iterations
Once an iteration key has been found, never fall back to first/last key in
next/prev tree as trees may split or join under our feet. I.e we must always
use previous key when searching for the next key.
Diffstat (limited to 'erts')
-rw-r--r-- | erts/emulator/beam/erl_db_catree.c | 315 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_catree.h | 11 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_tree.c | 201 |
3 files changed, 315 insertions, 212 deletions
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c index 975e19a878..072858f3f5 100644 --- a/erts/emulator/beam/erl_db_catree.c +++ b/erts/emulator/beam/erl_db_catree.c @@ -158,6 +158,13 @@ db_lookup_dbterm_catree(Process *, DbTable *, Eterm key, Eterm obj, DbUpdateHandle*); static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *); +static void split_catree(DbTableCATree *tb, + DbTableCATreeNode* ERTS_RESTRICT base, + DbTableCATreeNode* ERTS_RESTRICT parent); +static void join_catree(DbTableCATree *tb, + DbTableCATreeNode *thiz, + DbTableCATreeNode *parent); + /* ** External interface @@ -634,9 +641,10 @@ int try_wlock_base_node(DbTableCATreeBaseNode *base_node) * Locks a base node without adjusting the lock statistics */ static ERTS_INLINE -void wlock_base_node_no_stats(DbTableCATreeBaseNode *base_node) +void wlock_base_node_no_stats(DbTableCATreeNode *base_node) { - erts_rwmtx_rwlock(&base_node->lock); + ASSERT(base_node->is_base_node); + erts_rwmtx_rwlock(&base_node->u.base.lock); } /* @@ -644,33 +652,54 @@ void wlock_base_node_no_stats(DbTableCATreeBaseNode *base_node) * the lock was contended or not */ static ERTS_INLINE -void wlock_base_node(DbTableCATreeBaseNode *base_node) +void wlock_base_node(DbTableCATreeNode *base_node) { - if (try_wlock_base_node(base_node)) { + ASSERT(base_node->is_base_node); + if (try_wlock_base_node(&base_node->u.base)) { /* The lock is contended */ wlock_base_node_no_stats(base_node); - base_node->lock_statistics += ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION; + base_node->u.base.lock_statistics += ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION; } else { - base_node->lock_statistics += ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION; + base_node->u.base.lock_statistics += ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION; } } static ERTS_INLINE -void wunlock_base_node(DbTableCATreeBaseNode *base_node) +void wunlock_base_node(DbTableCATreeNode *base_node) { - erts_rwmtx_rwunlock(&base_node->lock); + erts_rwmtx_rwunlock(&base_node->u.base.lock); +} + +static ERTS_INLINE +void wunlock_adapt_base_node(DbTableCATree* tb, + DbTableCATreeNode* base_node, + DbTableCATreeNode* parent, + int current_level) +{ + if (base_node->u.base.lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT + && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) { + split_catree(tb, base_node, parent); + } + else if (base_node->u.base.lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) { + join_catree(tb, base_node, parent); + } + else { + wunlock_base_node(base_node); + } } static ERTS_INLINE -void rlock_base_node(DbTableCATreeBaseNode *base_node) +void rlock_base_node(DbTableCATreeNode *base_node) { - erts_rwmtx_rlock(&base_node->lock); + ASSERT(base_node->is_base_node); + erts_rwmtx_rlock(&base_node->u.base.lock); } static ERTS_INLINE -void runlock_base_node(DbTableCATreeBaseNode *base_node) +void runlock_base_node(DbTableCATreeNode *base_node) { - erts_rwmtx_runlock(&base_node->lock); + ASSERT(base_node->is_base_node); + erts_rwmtx_runlock(&base_node->u.base.lock); } static ERTS_INLINE @@ -695,17 +724,23 @@ void init_root_iterator(DbTableCATree* tb, CATreeRootIterator* iter, iter->read_only = read_only; iter->locked_bnode = NULL; iter->next_route_key = THE_NON_VALUE; + iter->search_key = NULL; } static ERTS_INLINE void lock_iter_base_node(CATreeRootIterator* iter, - DbTableCATreeBaseNode *base_node) + DbTableCATreeNode *base_node, + DbTableCATreeNode *parent, + int current_level) { ASSERT(!iter->locked_bnode); if (iter->read_only) rlock_base_node(base_node); - else + else { wlock_base_node(base_node); + iter->bnode_parent = parent; + iter->bnode_level = current_level; + } iter->locked_bnode = base_node; } @@ -715,12 +750,23 @@ void unlock_iter_base_node(CATreeRootIterator* iter) ASSERT(iter->locked_bnode); if (iter->read_only) runlock_base_node(iter->locked_bnode); + else if (iter->locked_bnode->u.base.is_valid) { + wunlock_adapt_base_node(iter->tb, iter->locked_bnode, + iter->bnode_parent, iter->bnode_level); + } else wunlock_base_node(iter->locked_bnode); iter->locked_bnode = NULL; } - +static ERTS_INLINE +void destroy_root_iterator(CATreeRootIterator* iter) +{ + if (iter->locked_bnode) + unlock_iter_base_node(iter); + if (iter->search_key) + erts_free(ERTS_ALC_T_DB_TMP, iter->search_key); +} /* * The following macros are used to create the ETS functions that only @@ -736,8 +782,8 @@ typedef struct } FindBaseNode; static ERTS_INLINE -DbTableCATreeBaseNode* find_base_node(DbTableCATree* tb, Eterm key, - FindBaseNode* fbn) +DbTableCATreeNode* find_base_node(DbTableCATree* tb, Eterm key, + FindBaseNode* fbn) { DbTableCATreeNode* ERTS_RESTRICT node = GET_ROOT_ACQB(tb); fbn->parent = NULL; @@ -751,7 +797,7 @@ DbTableCATreeBaseNode* find_base_node(DbTableCATree* tb, Eterm key, node = GET_RIGHT_ACQB(node); } } - return &node->u.base; + return node; } #define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(LOCK,UNLOCK) \ @@ -776,25 +822,14 @@ DbTableCATreeBaseNode* find_base_node(DbTableCATree* tb, Eterm key, } \ } \ base_node = ¤t_node->u.base; \ - LOCK(base_node); \ + LOCK(current_node); \ if ( ! base_node->is_valid ) { \ /* Retry */ \ - UNLOCK(base_node); \ + UNLOCK(current_node); \ retry = 1; \ } \ } while(retry); - -#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \ - if (base_node->lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT \ - && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) { \ - split_catree(tb, prev_node, current_node); \ - } else if (base_node->lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) { \ - join_catree(tb, prev_node, current_node); \ - } else { \ - wunlock_base_node(base_node); \ - } - #define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \ static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \ Eterm key, \ @@ -802,7 +837,7 @@ DbTableCATreeBaseNode* find_base_node(DbTableCATree* tb, Eterm key, int result; \ ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node) \ result = SEQUENTAIL_CALL; \ - ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \ + wunlock_adapt_base_node(tb, current_node, prev_node, current_level);\ return result; \ } @@ -814,7 +849,7 @@ DbTableCATreeBaseNode* find_base_node(DbTableCATree* tb, Eterm key, int result; \ ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(rlock_base_node,runlock_base_node) \ result = SEQUENTAIL_CALL; \ - runlock_base_node(base_node); \ + runlock_base_node(current_node); \ return result; \ } @@ -1049,8 +1084,8 @@ void init_tree_stack(DbTreeStack *stack, } static void join_catree(DbTableCATree *tb, - DbTableCATreeNode *parent, - DbTableCATreeNode *thiz) + DbTableCATreeNode *thiz, + DbTableCATreeNode *parent) { DbTableCATreeNode *gparent; DbTableCATreeNode *neighbor; @@ -1060,7 +1095,7 @@ static void join_catree(DbTableCATree *tb, ASSERT(thiz->is_base_node); if (parent == NULL) { thiz->u.base.lock_statistics = 0; - wunlock_base_node(&thiz->u.base); + wunlock_base_node(thiz); return; } ASSERT(!parent->is_base_node); @@ -1069,12 +1104,12 @@ static void join_catree(DbTableCATree *tb, if (try_wlock_base_node(&neighbor->u.base)) { /* Failed to acquire lock */ thiz->u.base.lock_statistics = 0; - wunlock_base_node(&thiz->u.base); + wunlock_base_node(thiz); return; } else if (!neighbor->u.base.is_valid) { thiz->u.base.lock_statistics = 0; - wunlock_base_node(&thiz->u.base); - wunlock_base_node(&neighbor->u.base); + wunlock_base_node(thiz); + wunlock_base_node(neighbor); return; } else { lock_route_node(parent); @@ -1120,12 +1155,12 @@ static void join_catree(DbTableCATree *tb, if (try_wlock_base_node(&neighbor->u.base)) { /* Failed to acquire lock */ thiz->u.base.lock_statistics = 0; - wunlock_base_node(&thiz->u.base); + wunlock_base_node(thiz); return; } else if (!neighbor->u.base.is_valid) { thiz->u.base.lock_statistics = 0; - wunlock_base_node(&thiz->u.base); - wunlock_base_node(&neighbor->u.base); + wunlock_base_node(thiz); + wunlock_base_node(neighbor); return; } else { lock_route_node(parent); @@ -1177,8 +1212,8 @@ static void join_catree(DbTableCATree *tb, } else { SET_RIGHT_RELB(neighbor_parent, new_neighbor); } - wunlock_base_node(&thiz->u.base); - wunlock_base_node(&neighbor->u.base); + wunlock_base_node(thiz); + wunlock_base_node(neighbor); /* Free the parent and base */ erts_schedule_db_free(&tb->common, do_free_route_node, @@ -1198,8 +1233,9 @@ static void join_catree(DbTableCATree *tb, } static void split_catree(DbTableCATree *tb, - DbTableCATreeNode* ERTS_RESTRICT parent, - DbTableCATreeNode* ERTS_RESTRICT base) { + DbTableCATreeNode* ERTS_RESTRICT base, + DbTableCATreeNode* ERTS_RESTRICT parent) +{ TreeDbTerm *splitOutWriteBack; DbTableCATreeNode* ERTS_RESTRICT new_left; DbTableCATreeNode* ERTS_RESTRICT new_right; @@ -1208,7 +1244,7 @@ static void split_catree(DbTableCATree *tb, if (less_than_two_elements(base->u.base.root)) { if (!(tb->common.status & DB_CATREE_FORCE_SPLIT)) base->u.base.lock_statistics = 0; - wunlock_base_node(&base->u.base); + wunlock_base_node(base); return; } else { TreeDbTerm *left_tree; @@ -1234,7 +1270,7 @@ static void split_catree(DbTableCATree *tb, SET_RIGHT_RELB(parent, new_route); } base->u.base.is_valid = 0; - wunlock_base_node(&base->u.base); + wunlock_base_node(base); erts_schedule_db_free(&tb->common, do_free_base_node, base, @@ -1386,14 +1422,13 @@ static int db_first_catree(Process *p, DbTable *tbl, Eterm *ret) init_root_iterator(&tbl->catree, &iter, 1); root = *catree_find_first_root(&iter); if (!root) { - TreeDbTerm **pp = catree_find_next_root(&iter); + TreeDbTerm **pp = catree_find_next_root(&iter, NULL); root = pp ? *pp : NULL; } result = db_first_tree_common(p, tbl, root, ret, NULL); - if (iter.locked_bnode) - runlock_base_node(iter.locked_bnode); + destroy_root_iterator(&iter); return result; } @@ -1407,7 +1442,7 @@ static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) init_root_iterator(&tbl->catree, &iter, 1); iter.next_route_key = key; - rootp = catree_find_next_root(&iter); + rootp = catree_find_next_root(&iter, NULL); do { init_tree_stack(&stack, stack_array, 0); @@ -1415,11 +1450,10 @@ static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) if (result != DB_ERROR_NONE || *ret != am_EOT) break; - rootp = catree_find_next_root(&iter); + rootp = catree_find_next_root(&iter, NULL); } while (rootp); - if (iter.locked_bnode) - runlock_base_node(iter.locked_bnode); + destroy_root_iterator(&iter); return result; } @@ -1432,14 +1466,13 @@ static int db_last_catree(Process *p, DbTable *tbl, Eterm *ret) init_root_iterator(&tbl->catree, &iter, 1); root = *catree_find_last_root(&iter); if (!root) { - TreeDbTerm **pp = catree_find_prev_root(&iter); + TreeDbTerm **pp = catree_find_prev_root(&iter, NULL); root = pp ? *pp : NULL; } result = db_last_tree_common(p, tbl, root, ret, NULL); - if (iter.locked_bnode) - runlock_base_node(iter.locked_bnode); + destroy_root_iterator(&iter); return result; } @@ -1453,7 +1486,7 @@ static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) init_root_iterator(&tbl->catree, &iter, 1); iter.next_route_key = key; - rootp = catree_find_prev_root(&iter); + rootp = catree_find_prev_root(&iter, NULL); do { init_tree_stack(&stack, stack_array, 0); @@ -1461,11 +1494,10 @@ static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) &stack); if (result != DB_ERROR_NONE || *ret != am_EOT) break; - rootp = catree_find_prev_root(&iter); + rootp = catree_find_prev_root(&iter, NULL); } while (rootp); - if (iter.locked_bnode) - runlock_base_node(iter.locked_bnode); + destroy_root_iterator(&iter); return result; } @@ -1506,16 +1538,16 @@ static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator* iter) { FindBaseNode fbn; - DbTableCATreeBaseNode* base_node; + DbTableCATreeNode* base_node; while (1) { base_node = find_base_node(iter->tb, key, &fbn); - lock_iter_base_node(iter, base_node); - if (base_node->is_valid) + lock_iter_base_node(iter, base_node, fbn.parent, fbn.current_level); + if (base_node->u.base.is_valid) break; unlock_iter_base_node(iter); } - return &base_node->root; + return &base_node->u.base.root; } @@ -1526,33 +1558,34 @@ TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key, DbTableCATreeNode *rejected_base = NULL; #endif DbTableCATreeNode *node; + DbTableCATreeNode *parent; DbTableCATreeNode* next_route_node; + int current_level; ASSERT(!iter->locked_bnode); while (1) { node = GET_ROOT_ACQB(iter->tb); + current_level = 0; + parent = NULL; next_route_node = NULL; while (!node->is_base_node) { + current_level++; + parent = node; if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) <= 0) { + next_route_node = node; node = GET_LEFT_ACQB(node); } else { - next_route_node = node; node = GET_RIGHT_ACQB(node); } } ASSERT(node != rejected_base); - lock_iter_base_node(iter, &node->u.base); + lock_iter_base_node(iter, node, parent, current_level); if (node->u.base.is_valid) { - if (node->u.base.root || !next_route_node) { - iter->next_route_key = (next_route_node ? - next_route_node->u.route.key.term : - THE_NON_VALUE); - return &node->u.base.root; - } - unlock_iter_base_node(iter); - iter->next_route_key = next_route_node->u.route.key.term; - return catree_find_next_root(iter); + iter->next_route_key = (next_route_node ? + next_route_node->u.route.key.term : + THE_NON_VALUE); + return &node->u.base.root; } /* Retry */ unlock_iter_base_node(iter); @@ -1562,25 +1595,57 @@ TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key, } } -TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter, int next) +static Eterm copy_iter_search_key(CATreeRootIterator* iter, Eterm key) +{ + Uint key_size; + + if (is_immed(key)) + return key; + + if (iter->search_key) { + ASSERT(key != iter->search_key->term); + destroy_route_key(iter->search_key); + } + key_size = size_object(key); + if (!iter->search_key || key_size > iter->search_key->size) { + iter->search_key = erts_realloc(ERTS_ALC_T_DB_TMP, + iter->search_key, + (offsetof(DbRouteKey, heap) + + key_size*sizeof(Eterm))); + } + copy_route_key(iter->search_key, key, key_size); + return iter->search_key->term; +} + +TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter, int next, + Eterm *keyp) { #ifdef DEBUG DbTableCATreeNode *rejected_base = NULL; #endif DbTableCATreeNode *node; + DbTableCATreeNode *parent; DbTableCATreeNode* next_route_node; Eterm key = iter->next_route_key; + int current_level; - if (iter->locked_bnode) + if (iter->locked_bnode) { + if (keyp) + *keyp = copy_iter_search_key(iter, *keyp); unlock_iter_base_node(iter); + } if (is_non_value(key)) return NULL; while (1) { node = GET_ROOT_ACQB(iter->tb); + current_level = 0; + parent = NULL; next_route_node = NULL; while (!node->is_base_node) { + current_level++; + parent = node; if (next) { if (cmp_key_route(key,node) < 0) { next_route_node = node; @@ -1599,13 +1664,13 @@ TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter, int next) } } ASSERT(node != rejected_base); - lock_iter_base_node(iter, &node->u.base); + lock_iter_base_node(iter, node, parent, current_level); if (node->u.base.is_valid) { if (node->u.base.root) { iter->next_route_key = (next_route_node ? next_route_node->u.route.key.term : THE_NON_VALUE); - iter->locked_bnode = &node->u.base; + iter->locked_bnode = node; return &node->u.base.root; } if (!next_route_node) { @@ -1622,14 +1687,14 @@ TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter, int next) } } -TreeDbTerm** catree_find_next_root(CATreeRootIterator *iter) +TreeDbTerm** catree_find_next_root(CATreeRootIterator *iter, Eterm* keyp) { - return catree_find_nextprev_root(iter, 1); + return catree_find_nextprev_root(iter, 1, keyp); } -TreeDbTerm** catree_find_prev_root(CATreeRootIterator *iter) +TreeDbTerm** catree_find_prev_root(CATreeRootIterator *iter, Eterm* keyp) { - return catree_find_nextprev_root(iter, 0); + return catree_find_nextprev_root(iter, 0, keyp); } @@ -1640,14 +1705,20 @@ TreeDbTerm** catree_find_next_from_pb_key_root(Eterm key, DbTableCATreeNode *rejected_base = NULL; #endif DbTableCATreeNode *node; + DbTableCATreeNode *parent; DbTableCATreeNode* next_route_node; + int current_level; ASSERT(!iter->locked_bnode); while (1) { node = GET_ROOT_ACQB(iter->tb); + current_level = 0; + parent = NULL; next_route_node = NULL; while (!node->is_base_node) { + current_level++; + parent = node; if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) >= 0) { next_route_node = node; node = GET_RIGHT_ACQB(node); @@ -1656,18 +1727,12 @@ TreeDbTerm** catree_find_next_from_pb_key_root(Eterm key, } } ASSERT(node != rejected_base); - lock_iter_base_node(iter, &node->u.base); + lock_iter_base_node(iter, node, parent, current_level); if (node->u.base.is_valid) { - if (node->u.base.root || !next_route_node) { - iter->next_route_key = (next_route_node ? - next_route_node->u.route.key.term : - THE_NON_VALUE); - iter->locked_bnode = &node->u.base; - return &node->u.base.root; - } - unlock_iter_base_node(iter); - iter->next_route_key = next_route_node->u.route.key.term; - return catree_find_prev_root(iter); + iter->next_route_key = (next_route_node ? + next_route_node->u.route.key.term : + THE_NON_VALUE); + return &node->u.base.root; } /* Retry */ unlock_iter_base_node(iter); @@ -1685,21 +1750,23 @@ static TreeDbTerm** catree_find_firstlast_root(CATreeRootIterator* iter, #endif DbTableCATreeNode *node; DbTableCATreeNode* next_route_node; + int current_level; while (1) { node = GET_ROOT_ACQB(iter->tb); + current_level = 0; next_route_node = NULL; while (!node->is_base_node) { + current_level++; next_route_node = node; node = first ? GET_LEFT_ACQB(node) : GET_RIGHT_ACQB(node); } ASSERT(node != rejected_base); - lock_iter_base_node(iter, &node->u.base); + lock_iter_base_node(iter, node, next_route_node, current_level); if (node->u.base.is_valid) { iter->next_route_key = (next_route_node ? next_route_node->u.route.key.term : THE_NON_VALUE); - iter->locked_bnode = &node->u.base; return &node->u.base.root; } /* Retry */ @@ -1792,8 +1859,7 @@ static int db_slot_catree(Process *p, DbTable *tbl, init_root_iterator(&tbl->catree, &iter, 1); result = db_slot_tree_common(p, tbl, *catree_find_first_root(&iter), slot_term, ret, NULL, &iter); - if (iter.locked_bnode) - runlock_base_node(iter.locked_bnode); + destroy_root_iterator(&iter); return result; } @@ -1808,9 +1874,7 @@ static int db_select_continue_catree(Process *p, init_root_iterator(&tbl->catree, &iter, 1); result = db_select_continue_tree_common(p, &tbl->common, continuation, ret, NULL, &iter); - if (iter.locked_bnode) - runlock_base_node(iter.locked_bnode); - + destroy_root_iterator(&iter); return result; } @@ -1823,8 +1887,7 @@ static int db_select_catree(Process *p, DbTable *tbl, Eterm tid, init_root_iterator(&tbl->catree, &iter, 1); result = db_select_tree_common(p, tbl, tid, pattern, reverse, ret, NULL, &iter); - if (iter.locked_bnode) - runlock_base_node(iter.locked_bnode); + destroy_root_iterator(&iter); return result; } @@ -1840,8 +1903,7 @@ static int db_select_count_continue_catree(Process *p, result = db_select_count_continue_tree_common(p, tbl, continuation, ret, NULL, &iter); - if (iter.locked_bnode) - runlock_base_node(iter.locked_bnode); + destroy_root_iterator(&iter); return result; } @@ -1854,9 +1916,7 @@ static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid, init_root_iterator(&tbl->catree, &iter, 1); result = db_select_count_tree_common(p, tbl, tid, pattern, ret, NULL, &iter); - if (iter.locked_bnode) - runlock_base_node(iter.locked_bnode); - + destroy_root_iterator(&iter); return result; } @@ -1871,8 +1931,7 @@ static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid, result = db_select_chunk_tree_common(p, tbl, tid, pattern, chunk_size, reversed, ret, NULL, &iter); - if (iter.locked_bnode) - runlock_base_node(iter.locked_bnode); + destroy_root_iterator(&iter); return result; } @@ -1890,8 +1949,7 @@ static int db_select_delete_continue_catree(Process *p, init_tree_stack(&stack, stack_array, 0); result = db_select_delete_continue_tree_common(p, tbl, continuation, ret, &stack, &iter); - if (iter.locked_bnode) - wunlock_base_node(iter.locked_bnode); + destroy_root_iterator(&iter); return result; } @@ -1908,8 +1966,7 @@ static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid, result = db_select_delete_tree_common(p, tbl, tid, pattern, ret, &stack, &iter); - if (iter.locked_bnode) - wunlock_base_node(iter.locked_bnode); + destroy_root_iterator(&iter); return result; } @@ -1922,8 +1979,7 @@ static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid, init_root_iterator(&tbl->catree, &iter, 0); result = db_select_replace_tree_common(p, tbl, tid, pattern, ret, NULL, &iter); - if (iter.locked_bnode) - wunlock_base_node(iter.locked_bnode); + destroy_root_iterator(&iter); return result; } @@ -1936,8 +1992,7 @@ static int db_select_replace_continue_catree(Process *p, DbTable *tbl, init_root_iterator(&tbl->catree, &iter, 0); result = db_select_replace_continue_tree_common(p, tbl, continuation, ret, NULL, &iter); - if (iter.locked_bnode) - wunlock_base_node(iter.locked_bnode); + destroy_root_iterator(&iter); return result; } @@ -1970,9 +2025,9 @@ static void db_print_catree(fmtfn_t to, void *to_arg, root = catree_find_first_root(&iter); do { db_print_tree_common(to, to_arg, show, *root, tbl); - root = catree_find_next_root(&iter); + root = catree_find_next_root(&iter, NULL); } while (root); - ASSERT(!iter.locked_bnode); + destroy_root_iterator(&iter); } /* Release all memory occupied by a single table */ @@ -2055,9 +2110,9 @@ static void db_foreach_offheap_catree(DbTable *tbl, root = catree_find_first_root(&iter); do { db_foreach_offheap_tree_common(*root, func, arg); - root = catree_find_next_root(&iter); + root = catree_find_next_root(&iter, NULL); } while (root); - ASSERT(!iter.locked_bnode); + destroy_root_iterator(&iter); } static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm obj, @@ -2068,7 +2123,7 @@ static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm ob ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node); res = db_lookup_dbterm_tree_common(p, tbl, &base_node->root, key, obj, handle, NULL); if (res == 0) { - ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART; + wunlock_adapt_base_node(tb, current_node, prev_node, current_level); } else { /* db_finalize_dbterm_catree will unlock */ handle->lck = prev_node; @@ -2083,10 +2138,8 @@ static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *handle) DbTableCATree *tb = &(handle->tb->catree); DbTableCATreeNode *prev_node = handle->lck; DbTableCATreeNode *current_node = handle->lck2; - int current_level = handle->current_level; - DbTableCATreeBaseNode *base_node = ¤t_node->u.base; db_finalize_dbterm_tree_common(cret, handle, NULL); - ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART; + wunlock_adapt_base_node(tb, current_node, prev_node, handle->current_level); return; } @@ -2128,10 +2181,10 @@ void db_catree_force_split(DbTableCATree* tb, int on) init_root_iterator(tb, &iter, 1); root = catree_find_first_root(&iter); do { - iter.locked_bnode->lock_statistics = (on ? INT_MAX : 0); - root = catree_find_next_root(&iter); + iter.locked_bnode->u.base.lock_statistics = (on ? INT_MAX : 0); + root = catree_find_next_root(&iter, NULL); } while (root); - ASSERT(!iter.locked_bnode); + destroy_root_iterator(&iter); if (on) tb->common.status |= DB_CATREE_FORCE_SPLIT; diff --git a/erts/emulator/beam/erl_db_catree.h b/erts/emulator/beam/erl_db_catree.h index feb6f27980..6913609bf8 100644 --- a/erts/emulator/beam/erl_db_catree.h +++ b/erts/emulator/beam/erl_db_catree.h @@ -95,8 +95,11 @@ typedef struct db_table_catree { typedef struct { DbTableCATree* tb; Eterm next_route_key; - DbTableCATreeBaseNode* locked_bnode; + DbTableCATreeNode* locked_bnode; + DbTableCATreeNode* bnode_parent; + int bnode_level; int read_only; + DbRouteKey* search_key; } CATreeRootIterator; @@ -109,9 +112,9 @@ TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator*); TreeDbTerm** catree_find_next_from_pb_key_root(Eterm key, CATreeRootIterator*); TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key, CATreeRootIterator*); -TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator*, int next); -TreeDbTerm** catree_find_next_root(CATreeRootIterator*); -TreeDbTerm** catree_find_prev_root(CATreeRootIterator*); +TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator*, int next, Eterm* keyp); +TreeDbTerm** catree_find_next_root(CATreeRootIterator*, Eterm* keyp); +TreeDbTerm** catree_find_prev_root(CATreeRootIterator*, Eterm* keyp); TreeDbTerm** catree_find_first_root(CATreeRootIterator*); TreeDbTerm** catree_find_last_root(CATreeRootIterator*); diff --git a/erts/emulator/beam/erl_db_tree.c b/erts/emulator/beam/erl_db_tree.c index 250d90e189..854b8d6329 100644 --- a/erts/emulator/beam/erl_db_tree.c +++ b/erts/emulator/beam/erl_db_tree.c @@ -1046,7 +1046,7 @@ int db_select_continue_tree_common(Process *p, if (iter) { iter->next_route_key = lastkey; - sc.common.root = catree_find_nextprev_root(iter, !!reverse != !!chunk_size); + sc.common.root = catree_find_nextprev_root(iter, !!reverse != !!chunk_size, NULL); } else sc.common.root = &((DbTableTree*)tb)->root; @@ -1354,7 +1354,7 @@ int db_select_count_continue_tree_common(Process *p, if (iter) { iter->next_route_key = lastkey; - sc.common.root = catree_find_prev_root(iter); + sc.common.root = catree_find_prev_root(iter, NULL); } else { sc.common.root = &tb->tree.root; @@ -1765,7 +1765,7 @@ int db_select_delete_continue_tree_common(Process *p, if (iter) { iter->next_route_key = lastkey; - sc.common.root = catree_find_prev_root(iter); + sc.common.root = catree_find_prev_root(iter, NULL); } else { sc.common.root = &tbl->tree.root; @@ -2001,7 +2001,7 @@ int db_select_replace_continue_tree_common(Process *p, if (iter) { iter->next_route_key = lastkey; - sc.common.root = catree_find_prev_root(iter); + sc.common.root = catree_find_prev_root(iter, NULL); } else { sc.common.root = &tbl->tree.root; @@ -2734,8 +2734,22 @@ static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root, { TreeDbTerm *this; TreeDbTerm *tmp; + TreeDbTerm *lastobj; + Eterm lastkey; TreeDbTerm **pp; - DbTreeStack* stack = get_any_stack(tb,stack_container); + DbTreeStack* stack; + + if (iter) { + /* Find first non-empty tree */ + while (!root) { + TreeDbTerm** pp = catree_find_next_root(iter, NULL); + if (!pp) + return NULL; + root = *pp; + } + } + + stack = get_any_stack(tb,stack_container); ASSERT(stack != NULL); if (slot == 1) { /* Don't search from where we are if we are @@ -2762,6 +2776,7 @@ static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root, this = TOP_NODE(stack); while (stack->slot != slot) { ASSERT(this); + lastobj = this; if (slot > stack->slot) { if (this->right != NULL) { this = this->right; @@ -2810,14 +2825,19 @@ next_root: if (!iter) break; /* EOT */ - pp = catree_find_next_root(iter); + ASSERT(slot > stack->slot); + if (lastobj) { + lastkey = GETKEY(tb, lastobj->dbterm.tpl); + lastobj = NULL; + } + pp = catree_find_next_root(iter, &lastkey); if (!pp) break; /* EOT */ root = *pp; stack->pos = 0; + find_next(&tb->common, root, stack, lastkey); } - release_stack(tb,stack_container,stack); return this; } @@ -2952,7 +2972,8 @@ static TreeDbTerm *find_next_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp, if (iter) { *rootpp = catree_find_next_from_pb_key_root(key, iter); - root = *rootpp ? **rootpp : NULL; + ASSERT(*rootpp); + root = **rootpp; } else { *rootpp = &tbl->tree.root; @@ -2991,7 +3012,8 @@ static TreeDbTerm *find_prev_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp, if (iter) { *rootpp = catree_find_prev_from_pb_key_root(key, iter); - root = *rootpp ? **rootpp : NULL; + ASSERT(*rootpp); + root = **rootpp; } else { *rootpp = &tbl->tree.root; @@ -3212,36 +3234,45 @@ static void traverse_backwards(DbTableCommon *tb, TreeDbTerm *this, *next; TreeDbTerm** root = context->root; - do { - if (lastkey == THE_NON_VALUE) { - stack->pos = stack->slot = 0; - if (( this = *root ) == NULL) { - goto next_root; - } - while (this != NULL) { - PUSH_NODE(stack, this); - this = this->right; + if (lastkey == THE_NON_VALUE) { + if (iter) { + while (*root == NULL) { + root = catree_find_prev_root(iter, NULL); + if (!root) + return; } - this = TOP_NODE(stack); - next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); - if (!((*doit)(tb, this, context, 0))) - return; - } else { - next = find_prev(tb, *root, stack, lastkey); + context->root = root; } + stack->pos = stack->slot = 0; + next = *root; + while (next != NULL) { + PUSH_NODE(stack, next); + next = next->right; + } + next = TOP_NODE(stack); + } else { + next = find_prev(tb, *root, stack, lastkey); + } - while ((this = next) != NULL) { - next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); + while (1) { + while (next) { + this = next; + lastkey = GETKEY(tb, this->dbterm.tpl); + next = find_prev(tb, *root, stack, lastkey); if (!((*doit)(tb, this, context, 0))) return; } -next_root: if (!iter) return; - root = context->root = catree_find_prev_root(iter); - lastkey = THE_NON_VALUE; - } while (root); + ASSERT(is_value(lastkey)); + root = catree_find_prev_root(iter, &lastkey); + if (!root) + return; + context->root = root; + stack->pos = stack->slot = 0; + next = find_prev(tb, *root, stack, lastkey); + } } /* @@ -3257,36 +3288,45 @@ static void traverse_forward(DbTableCommon *tb, TreeDbTerm *this, *next; TreeDbTerm **root = context->root; - do { - if (lastkey == THE_NON_VALUE) { - stack->pos = stack->slot = 0; - if (( this = *root ) == NULL) { - goto next_root; + if (lastkey == THE_NON_VALUE) { + if (iter) { + while (*root == NULL) { + root = catree_find_next_root(iter, NULL); + if (!root) + return; } - while (this != NULL) { - PUSH_NODE(stack, this); - this = this->left; - } - this = TOP_NODE(stack); - next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); - if (!((*doit)(tb, this, context, 1))) - return; - } else { - next = find_next(tb, *root, stack, lastkey); + context->root = root; } + stack->pos = stack->slot = 0; + next = *root; + while (next != NULL) { + PUSH_NODE(stack, next); + next = next->left; + } + next = TOP_NODE(stack); + } else { + next = find_next(tb, *root, stack, lastkey); + } - while ((this = next) != NULL) { - next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); + while (1) { + while (next) { + this = next; + lastkey = GETKEY(tb, this->dbterm.tpl); + next = find_next(tb, *root, stack, lastkey); if (!((*doit)(tb, this, context, 1))) return; } -next_root: if (!iter) return; - root = context->root = catree_find_next_root(iter); - lastkey = THE_NON_VALUE; - } while(root); + ASSERT(is_value(lastkey)); + root = catree_find_next_root(iter, &lastkey); + if (!root) + return; + context->root = root; + stack->pos = stack->slot = 0; + next = find_next(tb, *root, stack, lastkey); + } } /* @@ -3306,44 +3346,51 @@ static void traverse_update_backwards(DbTableCommon *tb, TreeDbTerm *this, *next, **this_ptr; TreeDbTerm** root = context->root; - do { - if (lastkey == THE_NON_VALUE) { - stack->pos = stack->slot = 0; - if (( this = *root ) == NULL) { - goto next_root; + if (lastkey == THE_NON_VALUE) { + if (iter) { + while (*root == NULL) { + root = catree_find_prev_root(iter, NULL); + if (!root) + return; + context->root = root; } - while (this != NULL) { - PUSH_NODE(stack, this); - this = this->right; - } - this = TOP_NODE(stack); - this_ptr = find_ptr(tb, root, stack, this); - ASSERT(this_ptr != NULL); - res = (*doit)(tb, this_ptr, context, 0); - REPLACE_TOP_NODE(stack, *this_ptr); - next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl)); - if (!res) - return; - } else { - next = find_prev(tb, *root, stack, lastkey); } + stack->pos = stack->slot = 0; + next = *root; + while (next) { + PUSH_NODE(stack, next); + next = next->right; + } + next = TOP_NODE(stack); + } + else + next = find_prev(tb, *root, stack, lastkey); - while ((this = next) != NULL) { + + while (1) { + while (next) { + this = next; this_ptr = find_ptr(tb, root, stack, this); ASSERT(this_ptr != NULL); res = (*doit)(tb, this_ptr, context, 0); - REPLACE_TOP_NODE(stack, *this_ptr); - next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl)); + this = *this_ptr; + REPLACE_TOP_NODE(stack, this); if (!res) return; + lastkey = GETKEY(tb, this->dbterm.tpl); + next = find_prev(tb, *root, stack, lastkey); } -next_root: if (!iter) return; - root = context->root = catree_find_prev_root(iter); - lastkey = THE_NON_VALUE; - } while (root); + ASSERT(is_value(lastkey)); + root = catree_find_prev_root(iter, &lastkey); + if (!root) + return; + context->root = root; + stack->pos = stack->slot = 0; + next = find_prev(tb, *root, stack, lastkey); + } } static enum ms_key_boundness key_boundness(DbTableCommon *tb, |