diff options
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r-- | erts/emulator/beam/erl_db_catree.c | 404 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_catree.h | 19 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_tree.c | 658 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_tree_util.h | 46 |
4 files changed, 805 insertions, 322 deletions
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c index a8e48bce1b..2c3b262312 100644 --- a/erts/emulator/beam/erl_db_catree.c +++ b/erts/emulator/beam/erl_db_catree.c @@ -158,6 +158,7 @@ db_lookup_dbterm_catree(Process *, DbTable *, Eterm key, Eterm obj, DbUpdateHandle*); static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *); + /* ** External interface */ @@ -710,6 +711,40 @@ void unlock_route_node(DbTableCATreeNode *route_node) erts_mtx_unlock(&route_node->u.route.lock); } +static ERTS_INLINE +void init_root_iterator(DbTableCATree* tb, CATreeRootIterator* iter, + int read_only) +{ + iter->tb = tb; + iter->read_only = read_only; + iter->locked_bnode = NULL; + iter->next_route_key = THE_NON_VALUE; +} + +static ERTS_INLINE +void lock_iter_base_node(CATreeRootIterator* iter, + DbTableCATreeBaseNode *base_node) +{ + ASSERT(!iter->locked_bnode); + if (iter->read_only) + rlock_base_node(base_node); + else + wlock_base_node(base_node); + iter->locked_bnode = base_node; +} + +static ERTS_INLINE +void unlock_iter_base_node(CATreeRootIterator* iter) +{ + ASSERT(iter->locked_bnode); + if (iter->read_only) + runlock_base_node(iter->locked_bnode); + else + wunlock_base_node(iter->locked_bnode); + iter->locked_bnode = NULL; +} + + /* * The following macros are used to create the ETS functions that only @@ -717,6 +752,32 @@ void unlock_route_node(DbTableCATreeNode *route_node) * db_erase_catree). */ + +typedef struct +{ + DbTableCATreeNode *parent; + int current_level; +} FindBaseNode; + +static ERTS_INLINE +DbTableCATreeBaseNode* find_base_node(DbTableCATree* tb, Eterm key, + FindBaseNode* fbn) +{ + DbTableCATreeNode* ERTS_RESTRICT node = GET_ROOT_ACQB(tb); + fbn->parent = NULL; + fbn->current_level = 0; + while (!node->is_base_node) { + fbn->current_level++; + fbn->parent = node; + if (cmp_key_route(key, node) < 0) { + node = GET_LEFT_ACQB(node); + } else { + node = GET_RIGHT_ACQB(node); + } + } + return &node->u.base; +} + #define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(LOCK,UNLOCK) \ int retry; \ DbTableCATreeNode *current_node; \ @@ -1675,7 +1736,10 @@ void db_initialize_catree(void) int db_create_catree(Process *p, DbTable *tbl) { DbTableCATree *tb = &tbl->catree; - DbTableCATreeNode *root = create_base_node(tb, NULL, NIL); + DbTableCATreeNode *root; + + root = create_base_node(tb, NULL, + NIL);/* lc_key of first base node does not matter */ tb->deletion = 0; tb->base_nodes_to_free_list = NULL; erts_atomic_init_relb(&(tb->root), (erts_aint_t)root); @@ -1781,6 +1845,226 @@ static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) return erl_db_catree_do_operation_get(tb, key, p, ret); } + + +TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator* iter) +{ + FindBaseNode fbn; + DbTableCATreeBaseNode* base_node; + + while (1) { + base_node = find_base_node(iter->tb, key, &fbn); + lock_iter_base_node(iter, base_node); + if (base_node->is_valid) + break; + unlock_iter_base_node(iter); + } + return &base_node->root; +} + + +TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key, + CATreeRootIterator* iter) +{ +#ifdef DEBUG + DbTableCATreeNode *rejected_base = NULL; +#endif + DbTableCATreeNode *node; + DbTableCATreeNode* next_route_node; + + ASSERT(!iter->locked_bnode); + + while (1) { + node = GET_ROOT_ACQB(iter->tb); + next_route_node = NULL; + while (!node->is_base_node) { + if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) <= 0) { + node = GET_LEFT_ACQB(node); + } else { + next_route_node = node; + node = GET_RIGHT_ACQB(node); + } + } + ASSERT(node != rejected_base); + lock_iter_base_node(iter, &node->u.base); + if (node->u.base.is_valid) { + if (node->u.base.root || !next_route_node) { + iter->next_route_key = (next_route_node ? + next_route_node->u.route.key.term : + THE_NON_VALUE); + return &node->u.base.root; + } + unlock_iter_base_node(iter); + iter->next_route_key = next_route_node->u.route.key.term; + return catree_find_next_root(iter); + } + /* Retry */ + unlock_iter_base_node(iter); +#ifdef DEBUG + rejected_base = node; +#endif + } +} + +TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter, int next) +{ +#ifdef DEBUG + DbTableCATreeNode *rejected_base = NULL; +#endif + DbTableCATreeNode *node; + DbTableCATreeNode* next_route_node; + Eterm key = iter->next_route_key; + + if (iter->locked_bnode) + unlock_iter_base_node(iter); + + if (is_non_value(key)) + return NULL; + + while (1) { + node = GET_ROOT_ACQB(iter->tb); + next_route_node = NULL; + while (!node->is_base_node) { + if (next) { + if (cmp_key_route(key,node) < 0) { + next_route_node = node; + node = GET_LEFT_ACQB(node); + } else { + node = GET_RIGHT_ACQB(node); + } + } + else { + if (cmp_key_route(key,node) > 0) { + next_route_node = node; + node = GET_RIGHT_ACQB(node); + } else { + node = GET_LEFT_ACQB(node); + } + } + } + ASSERT(node != rejected_base); + lock_iter_base_node(iter, &node->u.base); + if (node->u.base.is_valid) { + if (node->u.base.root) { + iter->next_route_key = (next_route_node ? + next_route_node->u.route.key.term : + THE_NON_VALUE); + iter->locked_bnode = &node->u.base; + return &node->u.base.root; + } + if (!next_route_node) { + unlock_iter_base_node(iter); + return NULL; + } + key = next_route_node->u.route.key.term; + } + /* Retry */ + unlock_iter_base_node(iter); +#ifdef DEBUG + rejected_base = node; +#endif + } +} + +TreeDbTerm** catree_find_next_root(CATreeRootIterator *iter) +{ + return catree_find_nextprev_root(iter, 1); +} + +TreeDbTerm** catree_find_prev_root(CATreeRootIterator *iter) +{ + return catree_find_nextprev_root(iter, 0); +} + + +TreeDbTerm** catree_find_next_from_pb_key_root(Eterm key, + CATreeRootIterator* iter) +{ +#ifdef DEBUG + DbTableCATreeNode *rejected_base = NULL; +#endif + DbTableCATreeNode *node; + DbTableCATreeNode* next_route_node; + + ASSERT(!iter->locked_bnode); + + while (1) { + node = GET_ROOT_ACQB(iter->tb); + next_route_node = NULL; + while (!node->is_base_node) { + if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) >= 0) { + next_route_node = node; + node = GET_RIGHT_ACQB(node); + } else { + node = GET_LEFT_ACQB(node); + } + } + ASSERT(node != rejected_base); + lock_iter_base_node(iter, &node->u.base); + if (node->u.base.is_valid) { + if (node->u.base.root || !next_route_node) { + iter->next_route_key = (next_route_node ? + next_route_node->u.route.key.term : + THE_NON_VALUE); + iter->locked_bnode = &node->u.base; + return &node->u.base.root; + } + unlock_iter_base_node(iter); + iter->next_route_key = next_route_node->u.route.key.term; + return catree_find_prev_root(iter); + } + /* Retry */ + unlock_iter_base_node(iter); +#ifdef DEBUG + rejected_base = node; +#endif + } +} + +static TreeDbTerm** catree_find_firstlast_root(CATreeRootIterator* iter, + int first) +{ +#ifdef DEBUG + DbTableCATreeNode *rejected_base = NULL; +#endif + DbTableCATreeNode *node; + DbTableCATreeNode* next_route_node; + + while (1) { + node = GET_ROOT_ACQB(iter->tb); + next_route_node = NULL; + while (!node->is_base_node) { + next_route_node = node; + node = first ? GET_LEFT_ACQB(node) : GET_RIGHT_ACQB(node); + } + ASSERT(node != rejected_base); + lock_iter_base_node(iter, &node->u.base); + if (node->u.base.is_valid) { + iter->next_route_key = (next_route_node ? + next_route_node->u.route.key.term : + THE_NON_VALUE); + iter->locked_bnode = &node->u.base; + return &node->u.base.root; + } + /* Retry */ + unlock_iter_base_node(iter); +#ifdef DEBUG + rejected_base = node; +#endif + } +} + +TreeDbTerm** catree_find_first_root(CATreeRootIterator* iter) +{ + return catree_find_firstlast_root(iter, 1); +} + +TreeDbTerm** catree_find_last_root(CATreeRootIterator* iter) +{ + return catree_find_firstlast_root(iter, 0); +} + + #define ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS Eterm *ret ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(member, ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS, @@ -1861,25 +2145,28 @@ static int db_select_continue_catree(Process *p, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_continue_tree_common(p, &tbl->common, &base->u.base.root, - continuation, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_continue_tree_common(p, &tbl->common, + continuation, ret, NULL, &iter); + if (iter.locked_bnode) + runlock_base_node(iter.locked_bnode); + return result; } - static int db_select_catree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, int reverse, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_tree_common(p, &tbl->common, &base->u.base.root, - tid, pattern, reverse, ret, - NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_tree_common(p, tbl, tid, pattern, reverse, ret, + NULL, &iter); + if (iter.locked_bnode) + runlock_base_node(iter.locked_bnode); return result; } @@ -1889,12 +2176,14 @@ static int db_select_count_continue_catree(Process *p, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_count_continue_tree_common(p, &tbl->common, - &base->u.base.root, - continuation, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_count_continue_tree_common(p, tbl, + continuation, ret, NULL, + &iter); + if (iter.locked_bnode) + runlock_base_node(iter.locked_bnode); return result; } @@ -1902,11 +2191,14 @@ static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_count_tree_common(p, &tbl->common, &base->u.base.root, - tid, pattern, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_count_tree_common(p, tbl, + tid, pattern, ret, NULL, &iter); + if (iter.locked_bnode) + runlock_base_node(iter.locked_bnode); + return result; } @@ -1915,11 +2207,14 @@ static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid, int reversed, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_chunk_tree_common(p, &tbl->common, &base->u.base.root, - tid, pattern, chunk_size, reversed, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_chunk_tree_common(p, tbl, + tid, pattern, chunk_size, reversed, ret, + NULL, &iter); + if (iter.locked_bnode) + runlock_base_node(iter.locked_bnode); return result; } @@ -1931,12 +2226,14 @@ static int db_select_delete_continue_catree(Process *p, DbTreeStack stack; TreeDbTerm * stack_array[STACK_NEED]; int result; - DbTableCATreeNode *base; + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 0); init_tree_stack(&stack, stack_array, 0); - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_delete_continue_tree_common(p, tbl, &base->u.base.root, - continuation, ret, &stack); - wunlock_base_node(&(base->u.base)); + result = db_select_delete_continue_tree_common(p, tbl, continuation, ret, + &stack, &iter); + if (iter.locked_bnode) + wunlock_base_node(iter.locked_bnode); return result; } @@ -1946,12 +2243,15 @@ static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid, DbTreeStack stack; TreeDbTerm * stack_array[STACK_NEED]; int result; - DbTableCATreeNode *base; + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 0); init_tree_stack(&stack, stack_array, 0); - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_delete_tree_common(p, tbl, &base->u.base.root, - tid, pattern, ret, &stack); - wunlock_base_node(&(base->u.base)); + result = db_select_delete_tree_common(p, tbl, + tid, pattern, ret, &stack, + &iter); + if (iter.locked_bnode) + wunlock_base_node(iter.locked_bnode); return result; } @@ -1959,12 +2259,13 @@ static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_replace_tree_common(p, &tbl->common, - &base->u.base.root, - tid, pattern, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 0); + result = db_select_replace_tree_common(p, tbl, + tid, pattern, ret, NULL, &iter); + if (iter.locked_bnode) + wunlock_base_node(iter.locked_bnode); return result; } @@ -1972,12 +2273,13 @@ static int db_select_replace_continue_catree(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret) { int result; - DbTableCATreeNode *base; - base = merge_to_one_locked_base_node(&tbl->catree); - result = db_select_replace_continue_tree_common(p, &tbl->common, - &base->u.base.root, - continuation, ret, NULL); - wunlock_base_node(&(base->u.base)); + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 0); + result = db_select_replace_continue_tree_common(p, tbl, continuation, ret, + NULL, &iter); + if (iter.locked_bnode) + wunlock_base_node(iter.locked_bnode); return result; } diff --git a/erts/emulator/beam/erl_db_catree.h b/erts/emulator/beam/erl_db_catree.h index f9c0784289..510a9e81d3 100644 --- a/erts/emulator/beam/erl_db_catree.h +++ b/erts/emulator/beam/erl_db_catree.h @@ -92,11 +92,30 @@ typedef struct db_table_catree { int is_routing_nodes_freed; } DbTableCATree; +typedef struct { + DbTableCATree* tb; + Eterm next_route_key; + DbTableCATreeBaseNode* locked_bnode; + int read_only; +} CATreeRootIterator; + + void db_initialize_catree(void); int db_create_catree(Process *p, DbTable *tbl); +TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator*); + +TreeDbTerm** catree_find_next_from_pb_key_root(Eterm key, CATreeRootIterator*); +TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key, CATreeRootIterator*); +TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator*, int next); +TreeDbTerm** catree_find_next_root(CATreeRootIterator*); +TreeDbTerm** catree_find_prev_root(CATreeRootIterator*); +TreeDbTerm** catree_find_first_root(CATreeRootIterator*); +TreeDbTerm** catree_find_last_root(CATreeRootIterator*); + + #ifdef ERTS_ENABLE_LOCK_COUNT void erts_lcnt_enable_db_catree_lock_count(DbTableCATree *tb, int enable); #endif diff --git a/erts/emulator/beam/erl_db_tree.c b/erts/emulator/beam/erl_db_tree.c index 4c58c40b9c..71ec6e28e5 100644 --- a/erts/emulator/beam/erl_db_tree.c +++ b/erts/emulator/beam/erl_db_tree.c @@ -212,10 +212,16 @@ struct mp_info { Binary *mp; /* The compiled match program */ }; +struct select_common { + TreeDbTerm **root; +}; + + /* * Used by doit_select(_chunk) */ struct select_context { + struct select_common common; Process *p; Eterm accum; Binary *mp; @@ -231,6 +237,7 @@ struct select_context { * Used by doit_select_count */ struct select_count_context { + struct select_common common; Process *p; Binary *mp; Eterm end_condition; @@ -244,9 +251,9 @@ struct select_count_context { * Used by doit_select_delete */ struct select_delete_context { + struct select_common common; Process *p; DbTableCommon *tb; - TreeDbTerm **root; DbTreeStack *stack; Uint accum; Binary *mp; @@ -261,9 +268,9 @@ struct select_delete_context { * Used by doit_select_replace */ struct select_replace_context { + struct select_common common; Process *p; DbTableCommon *tb; - TreeDbTerm **root; Binary *mp; Eterm end_condition; Eterm *lastobj; @@ -298,64 +305,62 @@ static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root, DbTreeStack* stack, Eterm key); static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root, DbTreeStack* stack, Eterm key); -static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, - DbTreeStack* stack, Eterm key); -static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, - DbTreeStack* stack, Eterm key); +static TreeDbTerm *find_next_from_pb_key(DbTable*, TreeDbTerm*** rootpp, + DbTreeStack* stack, Eterm key, + CATreeRootIterator*); +static TreeDbTerm *find_prev_from_pb_key(DbTable*, TreeDbTerm*** rootpp, + DbTreeStack* stack, Eterm key, + CATreeRootIterator*); +typedef int traverse_doit_funcT(DbTableCommon*, TreeDbTerm*, + struct select_common*, int forward); + static void traverse_backwards(DbTableCommon *tb, - TreeDbTerm **root, DbTreeStack*, Eterm lastkey, - int (*doit)(DbTableCommon *, - TreeDbTerm *, - void *, - int), - void *context); + traverse_doit_funcT*, + struct select_common *context, + CATreeRootIterator*); static void traverse_forward(DbTableCommon *tb, - TreeDbTerm **root, DbTreeStack*, Eterm lastkey, - int (*doit)(DbTableCommon *tb, - TreeDbTerm *, - void *, - int), - void *context); + traverse_doit_funcT*, + struct select_common *context, + CATreeRootIterator*); static void traverse_update_backwards(DbTableCommon *tb, - TreeDbTerm **root, DbTreeStack*, Eterm lastkey, int (*doit)(DbTableCommon *tb, TreeDbTerm **, // out - void *, + struct select_common*, int), - void *context); + struct select_common*, + CATreeRootIterator*); static enum ms_key_boundness key_boundness(DbTableCommon *tb, Eterm pattern, Eterm *keyp); -static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key); static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done); static int analyze_pattern(DbTableCommon *tb, Eterm pattern, extra_match_validator_t extra_validator, /* Optional callback */ struct mp_info *mpi); static int doit_select(DbTableCommon *tb, - TreeDbTerm *this, - void *ptr, + TreeDbTerm *this, + struct select_common* ptr, int forward); static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, - void *ptr, + struct select_common*, int forward); static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, - void *ptr, + struct select_common*, int forward); static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, - void *ptr, + struct select_common*, int forward); static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this_ptr, - void *ptr, + struct select_common*, int forward); static int partly_bound_can_match_lesser(Eterm partly_bound_1, @@ -987,10 +992,10 @@ static BIF_RETTYPE bif_trap3(Export *bif, int db_select_continue_tree_common(Process *p, DbTableCommon *tb, - TreeDbTerm **root, Eterm continuation, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { DbTreeStack* stack; struct select_context sc; @@ -1004,7 +1009,6 @@ int db_select_continue_tree_common(Process *p, Sint chunk_size; Sint reverse; - #define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0); /* Decode continuation. We know it's a tuple but not the arity or @@ -1038,23 +1042,32 @@ int db_select_continue_tree_common(Process *p, reverse = unsigned_val(tptr[7]); sc.got = signed_val(tptr[8]); - stack = get_any_stack((DbTable*)tb, stack_container); - if (chunk_size) { - if (reverse) { - traverse_backwards(tb, root, stack, lastkey, &doit_select_chunk, &sc); - } else { - traverse_forward(tb, root, stack, lastkey, &doit_select_chunk, &sc); - } - } else { - if (reverse) { - traverse_forward(tb, root, stack, lastkey, &doit_select, &sc); - } else { - traverse_backwards(tb, root, stack, lastkey, &doit_select, &sc); - } + if (iter) { + iter->next_route_key = lastkey; + sc.common.root = catree_find_nextprev_root(iter, !!reverse != !!chunk_size); } - release_stack((DbTable*)tb,stack_container,stack); + else + sc.common.root = &((DbTableTree*)tb)->root; + + if (sc.common.root) { + stack = get_any_stack((DbTable*)tb, stack_container); + if (chunk_size) { + if (reverse) { + traverse_backwards(tb, stack, lastkey, &doit_select_chunk, &sc.common, iter); + } else { + traverse_forward(tb, stack, lastkey, &doit_select_chunk, &sc.common, iter); + } + } else { + if (reverse) { + traverse_forward(tb, stack, lastkey, &doit_select, &sc.common, iter); + } else { + traverse_backwards(tb, stack, lastkey, &doit_select, &sc.common, iter); + } + } + release_stack((DbTable*)tb,stack_container,stack); - BUMP_REDS(p, 1000 - sc.max); + BUMP_REDS(p, 1000 - sc.max); + } if (sc.max > 0 || (chunk_size && sc.got == chunk_size)) { if (chunk_size) { @@ -1148,13 +1161,14 @@ static int db_select_continue_tree(Process *p, Eterm *ret) { DbTableTree *tb = &tbl->tree; - return db_select_continue_tree_common(p, &tb->common, &tb->root, - continuation, ret, tb); + return db_select_continue_tree_common(p, &tb->common, + continuation, ret, tb, NULL); } -int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, +int db_select_tree_common(Process *p, DbTable *tb, Eterm tid, Eterm pattern, int reverse, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { /* Strategy: Traverse backwards to build resulting list from tail to head */ DbTreeStack* stack; @@ -1185,11 +1199,11 @@ int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, sc.p = p; sc.max = 1000; sc.end_condition = NIL; - sc.keypos = tb->keypos; + sc.keypos = tb->common.keypos; sc.got = 0; sc.chunk_size = 0; - if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } @@ -1202,29 +1216,47 @@ int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, if (mpi.key_boundness == MS_KEY_BOUND) { ASSERT(CMP_EQ(mpi.least, mpi.most)); - this = find_node(tb, *root, mpi.least, NULL); + if (iter) + sc.common.root = catree_find_root(mpi.least, iter); + else + sc.common.root = &tb->tree.root; + this = find_node(&tb->common, *sc.common.root, mpi.least, NULL); if (this) - doit_select(tb, this, &sc, 0 /* direction doesn't matter */); + doit_select(&tb->common, this, &sc.common, 0 /* direction doesn't matter */); RET_TO_BIF(sc.accum,DB_ERROR_NONE); } stack = get_any_stack((DbTable*)tb,stack_container); if (reverse) { if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { - if ((this = find_prev_from_pb_key(tb, *root, stack, mpi.least)) != NULL) { + this = find_prev_from_pb_key(tb, &sc.common.root, stack, mpi.least, iter); + if (this) lastkey = GETKEY(tb, this->dbterm.tpl); - } sc.end_condition = mpi.most; } - traverse_forward(tb, root, stack, lastkey, &doit_select, &sc); + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_first_root(iter); + else + sc.common.root = &tb->tree.root; + } + traverse_forward(&tb->common, stack, lastkey, &doit_select, &sc.common, iter); } else { if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { - if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) { - lastkey = GETKEY(tb, this->dbterm.tpl); - } + this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter); + if (this) + lastkey = GETKEY(tb, this->dbterm.tpl); sc.end_condition = mpi.least; } - traverse_backwards(tb, root, stack, lastkey, &doit_select, &sc); + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_last_root(iter); + else + sc.common.root = &tb->tree.root; + } + traverse_backwards(&tb->common, stack, lastkey, &doit_select, &sc.common, iter); } release_stack((DbTable*)tb,stack_container,stack); #ifdef HARDDEBUG @@ -1265,17 +1297,16 @@ int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, static int db_select_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, int reverse, Eterm *ret) { - DbTableTree *tb = &tbl->tree; - return db_select_tree_common(p, &tb->common, &tb->root, tid, - pattern, reverse, ret, tb); + return db_select_tree_common(p, tbl, tid, + pattern, reverse, ret, &tbl->tree, NULL); } int db_select_count_continue_tree_common(Process *p, - DbTableCommon *tb, - TreeDbTerm **root, + DbTable *tb, Eterm continuation, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { DbTreeStack* stack; struct select_count_context sc; @@ -1288,7 +1319,6 @@ int db_select_count_continue_tree_common(Process *p, Eterm *tptr; Eterm egot; - #define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0); /* Decode continuation. We know it's a tuple and everything else as @@ -1313,18 +1343,28 @@ int db_select_count_continue_tree_common(Process *p, sc.end_condition = NIL; sc.lastobj = NULL; sc.max = 1000; - sc.keypos = tb->keypos; + sc.keypos = tb->common.keypos; if (is_big(tptr[5])) { sc.got = big_to_uint32(tptr[5]); } else { sc.got = unsigned_val(tptr[5]); } - stack = get_any_stack((DbTable*)tb, stack_container); - traverse_backwards(tb, root, stack, lastkey, &doit_select_count, &sc); - release_stack((DbTable*)tb,stack_container,stack); + if (iter) { + iter->next_route_key = lastkey; + sc.common.root = catree_find_prev_root(iter); + } + else { + sc.common.root = &tb->tree.root; + } - BUMP_REDS(p, 1000 - sc.max); + if (sc.common.root) { + stack = get_any_stack(tb, stack_container); + traverse_backwards(&tb->common, stack, lastkey, &doit_select_count, &sc.common, iter); + release_stack(tb,stack_container,stack); + + BUMP_REDS(p, 1000 - sc.max); + } if (sc.max > 0) { RET_TO_BIF(erts_make_integer(sc.got,p), DB_ERROR_NONE); @@ -1369,14 +1409,15 @@ static int db_select_count_continue_tree(Process *p, Eterm *ret) { DbTableTree *tb = &tbl->tree; - return db_select_count_continue_tree_common(p, &tb->common, &tb->root, - continuation, ret, tb); + return db_select_count_continue_tree_common(p, tbl, + continuation, ret, tb, NULL); } -int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, +int db_select_count_tree_common(Process *p, DbTable *tb, Eterm tid, Eterm pattern, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { DbTreeStack* stack; struct select_count_context sc; @@ -1391,7 +1432,6 @@ int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root Eterm egot; Eterm mpb; - #define RET_TO_BIF(Term,RetVal) do { \ if (mpi.mp != NULL) { \ erts_bin_free(mpi.mp); \ @@ -1406,10 +1446,10 @@ int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root sc.p = p; sc.max = 1000; sc.end_condition = NIL; - sc.keypos = tb->keypos; + sc.keypos = tb->common.keypos; sc.got = 0; - if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } @@ -1422,21 +1462,32 @@ int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root if (mpi.key_boundness == MS_KEY_BOUND) { ASSERT(CMP_EQ(mpi.least, mpi.most)); - this = find_node(tb, *root, mpi.least, NULL); + if (iter) + sc.common.root = catree_find_root(mpi.least, iter); + else + sc.common.root = &((DbTable*)tb)->tree.root; + this = find_node(&tb->common, *sc.common.root, mpi.least, NULL); if (this) - doit_select_count(tb, this, &sc, 0 /* dummy */); + doit_select_count(&tb->common, this, &sc.common, 0 /* dummy */); RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE); } stack = get_any_stack((DbTable*)tb, stack_container); if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { - if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) { - lastkey = GETKEY(tb, this->dbterm.tpl); - } + this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter); + if (this) + lastkey = GETKEY(tb, this->dbterm.tpl); sc.end_condition = mpi.least; } + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_last_root(iter); + else + sc.common.root = &tb->tree.root; + } - traverse_backwards(tb, root, stack, lastkey, &doit_select_count, &sc); + traverse_backwards(&tb->common, stack, lastkey, &doit_select_count, &sc.common, iter); release_stack((DbTable*)tb,stack_container,stack); BUMP_REDS(p, 1000 - sc.max); if (sc.max > 0) { @@ -1477,15 +1528,16 @@ static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { DbTableTree *tb = &tbl->tree; - return db_select_count_tree_common(p, &tb->common, &tb->root, - tid, pattern, ret, tb); + return db_select_count_tree_common(p, tbl, + tid, pattern, ret, tb, NULL); } -int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, +int db_select_chunk_tree_common(Process *p, DbTable *tb, Eterm tid, Eterm pattern, Sint chunk_size, int reverse, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { DbTreeStack* stack; struct select_context sc; @@ -1499,7 +1551,6 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root int errcode; Eterm mpb; - #define RET_TO_BIF(Term,RetVal) do { \ if (mpi.mp != NULL) { \ erts_bin_free(mpi.mp); \ @@ -1515,11 +1566,11 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root sc.p = p; sc.max = 1000; sc.end_condition = NIL; - sc.keypos = tb->keypos; + sc.keypos = tb->common.keypos; sc.got = 0; sc.chunk_size = chunk_size; - if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } @@ -1532,9 +1583,13 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root if (mpi.key_boundness == MS_KEY_BOUND) { ASSERT(CMP_EQ(mpi.least, mpi.most)); - this = find_node(tb, *root, mpi.least, NULL); + if (iter) + sc.common.root = catree_find_root(mpi.least, iter); + else + sc.common.root = &tb->tree.root; + this = find_node(&tb->common, *sc.common.root, mpi.least, NULL); if (this) - doit_select(tb, this, &sc, 0 /* direction doesn't matter */); + doit_select(&tb->common, this, &sc.common, 0 /* direction doesn't matter */); if (sc.accum != NIL) { hp=HAlloc(p, 3); RET_TO_BIF(TUPLE2(hp,sc.accum,am_EOT),DB_ERROR_NONE); @@ -1546,20 +1601,34 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root stack = get_any_stack((DbTable*)tb,stack_container); if (reverse) { if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { - if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) { - lastkey = GETKEY(tb, this->dbterm.tpl); - } + this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter); + if (this) + lastkey = GETKEY(tb, this->dbterm.tpl); sc.end_condition = mpi.least; } - traverse_backwards(tb, root, stack, lastkey, &doit_select_chunk, &sc); + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_last_root(iter); + else + sc.common.root = &tb->tree.root; + } + traverse_backwards(&tb->common, stack, lastkey, &doit_select_chunk, &sc.common, iter); } else { if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { - if ((this = find_prev_from_pb_key(tb, *root, stack, mpi.least)) != NULL) { - lastkey = GETKEY(tb, this->dbterm.tpl); - } + this = find_prev_from_pb_key(tb, &sc.common.root, stack, mpi.least, iter); + if (this) + lastkey = GETKEY(tb, this->dbterm.tpl); sc.end_condition = mpi.most; } - traverse_forward(tb, root, stack, lastkey, &doit_select_chunk, &sc); + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_first_root(iter); + else + sc.common.root = &tb->tree.root; + } + traverse_forward(&tb->common, stack, lastkey, &doit_select_chunk, &sc.common, iter); } release_stack((DbTable*)tb,stack_container,stack); @@ -1636,18 +1705,18 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid, Eterm *ret) { DbTableTree *tb = &tbl->tree; - return db_select_chunk_tree_common(p, &tb->common, &tb->root, + return db_select_chunk_tree_common(p, tbl, tid, pattern, chunk_size, - reverse, ret, tb); + reverse, ret, tb, NULL); } int db_select_delete_continue_tree_common(Process *p, DbTable *tbl, - TreeDbTerm **root, Eterm continuation, Eterm *ret, - DbTreeStack* stack) + DbTreeStack* stack, + CATreeRootIterator* iter) { struct select_delete_context sc; unsigned sz; @@ -1659,7 +1728,6 @@ int db_select_delete_continue_tree_common(Process *p, Eterm *tptr; Eterm eaccsum; - #define RET_TO_BIF(Term, State) do { \ if (sc.erase_lastterm) { \ free_term(tbl, sc.lastterm); \ @@ -1682,7 +1750,6 @@ int db_select_delete_continue_tree_common(Process *p, mp = erts_db_get_match_prog_binary_unchecked(tptr[4]); sc.p = p; sc.tb = &tbl->common; - sc.root = root; sc.stack = stack; if (is_big(tptr[5])) { sc.accum = big_to_uint32(tptr[5]); @@ -1694,9 +1761,19 @@ int db_select_delete_continue_tree_common(Process *p, sc.max = 1000; sc.keypos = tbl->common.keypos; - traverse_backwards(&tbl->common, root, stack, lastkey, &doit_select_delete, &sc); + if (iter) { + iter->next_route_key = lastkey; + sc.common.root = catree_find_prev_root(iter); + } + else { + sc.common.root = &tbl->tree.root; + } - BUMP_REDS(p, 1000 - sc.max); + if (sc.common.root) { + traverse_backwards(&tbl->common, stack, lastkey, &doit_select_delete, &sc.common, iter); + + BUMP_REDS(p, 1000 - sc.max); + } if (sc.max > 0) { RET_TO_BIF(erts_make_integer(sc.accum, p), DB_ERROR_NONE); @@ -1738,16 +1815,15 @@ static int db_select_delete_continue_tree(Process *p, { DbTableTree *tb = &tbl->tree; ASSERT(!erts_atomic_read_nob(&tb->is_stack_busy)); - return db_select_delete_continue_tree_common(p, tbl, &tb->root, - continuation, ret, - &tb->static_stack); + return db_select_delete_continue_tree_common(p, tbl, continuation, ret, + &tb->static_stack, NULL); } int db_select_delete_tree_common(Process *p, DbTable *tbl, - TreeDbTerm **root, Eterm tid, Eterm pattern, Eterm *ret, - DbTreeStack* stack) + DbTreeStack* stack, + CATreeRootIterator* iter) { struct select_delete_context sc; struct mp_info mpi; @@ -1782,7 +1858,6 @@ int db_select_delete_tree_common(Process *p, DbTable *tbl, sc.end_condition = NIL; sc.keypos = tbl->common.keypos; sc.tb = &tbl->common; - sc.root = root; sc.stack = stack; if ((errcode = analyze_pattern(&tbl->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) { @@ -1798,21 +1873,33 @@ int db_select_delete_tree_common(Process *p, DbTable *tbl, if (mpi.key_boundness == MS_KEY_BOUND) { ASSERT(CMP_EQ(mpi.least, mpi.most)); - this = find_node(&tbl->common, *root, mpi.least, NULL); + if (iter) + sc.common.root = catree_find_root(mpi.least, iter); + else + sc.common.root = &tbl->tree.root; + this = find_node(&tbl->common, *sc.common.root, mpi.least, NULL); if (this) - doit_select_delete(&tbl->common, this, &sc, 0 /* direction doesn't + doit_select_delete(&tbl->common, this, &sc.common, 0 /* direction doesn't matter */); RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE); } if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { - if ((this = find_next_from_pb_key(&tbl->common, *root, stack, mpi.most)) != NULL) { - lastkey = GETKEY(&tbl->common, this->dbterm.tpl); - } + this = find_next_from_pb_key(tbl, &sc.common.root, stack, mpi.most, iter); + if (this) + lastkey = GETKEY(&tbl->common, this->dbterm.tpl); sc.end_condition = mpi.least; } + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_last_root(iter); + else + sc.common.root = &tbl->tree.root; + } - traverse_backwards(&tbl->common, root, stack, lastkey, &doit_select_delete, &sc); + traverse_backwards(&tbl->common, stack, lastkey, + &doit_select_delete, &sc.common, iter); BUMP_REDS(p, 1000 - sc.max); if (sc.max > 0) { @@ -1856,16 +1943,16 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { DbTableTree *tb = &tbl->tree; - return db_select_delete_tree_common(p, tbl, &tb->root, tid, pattern, ret, - &tb->static_stack); + return db_select_delete_tree_common(p, tbl, tid, pattern, ret, + &tb->static_stack, NULL); } int db_select_replace_continue_tree_common(Process *p, - DbTableCommon *tb, - TreeDbTerm **root, + DbTable *tbl, Eterm continuation, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { DbTreeStack* stack; struct select_replace_context sc; @@ -1902,7 +1989,7 @@ int db_select_replace_continue_tree_common(Process *p, sc.end_condition = NIL; sc.lastobj = NULL; sc.max = 1000; - sc.keypos = tb->keypos; + sc.keypos = tbl->common.keypos; if (is_big(tptr[5])) { sc.replaced = big_to_uint32(tptr[5]); } else { @@ -1910,9 +1997,18 @@ int db_select_replace_continue_tree_common(Process *p, } prev_replaced = sc.replaced; - stack = get_any_stack((DbTable*)tb,stack_container); - traverse_update_backwards(tb, root, stack, lastkey, &doit_select_replace, &sc); - release_stack((DbTable*)tb,stack_container,stack); + if (iter) { + iter->next_route_key = lastkey; + sc.common.root = catree_find_prev_root(iter); + } + else { + sc.common.root = &tbl->tree.root; + } + + stack = get_any_stack(tbl, stack_container); + traverse_update_backwards(&tbl->common, stack, lastkey, &doit_select_replace, + &sc.common, iter); + release_stack(tbl, stack_container,stack); // the more objects we've replaced, the more reductions we've consumed BUMP_REDS(p, MIN(2000, (1000 - sc.max) + (sc.replaced - prev_replaced))); @@ -1920,7 +2016,7 @@ int db_select_replace_continue_tree_common(Process *p, if (sc.max > 0) { RET_TO_BIF(erts_make_integer(sc.replaced,p), DB_ERROR_NONE); } - key = GETKEY(tb, sc.lastobj); + key = GETKEY(tbl, sc.lastobj); if (end_condition != NIL && (cmp_partly_bound(end_condition,key) > 0)) { /* done anyway */ @@ -1956,14 +2052,14 @@ static int db_select_replace_continue_tree(Process *p, Eterm continuation, Eterm *ret) { - DbTableTree *tb = &tbl->tree; - return db_select_replace_continue_tree_common(p, &tb->common, &tb->root, - continuation, ret, tb); + return db_select_replace_continue_tree_common(p, tbl, continuation, ret, + &tbl->tree, NULL); } -int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, +int db_select_replace_tree_common(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { DbTreeStack* stack; struct select_replace_context sc; @@ -1991,14 +2087,13 @@ int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **ro sc.lastobj = NULL; sc.p = p; - sc.tb = tb; - sc.root = root; + sc.tb = &tbl->common; sc.max = 1000; sc.end_condition = NIL; - sc.keypos = tb->keypos; + sc.keypos = tbl->common.keypos; sc.replaced = 0; - if ((errcode = analyze_pattern(tb, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(&tbl->common, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } @@ -2012,32 +2107,44 @@ int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **ro if (mpi.key_boundness == MS_KEY_BOUND) { TreeDbTerm** pp; ASSERT(CMP_EQ(mpi.least, mpi.most)); - pp = find_node2(tb, root, mpi.least); + if (iter) + sc.common.root = catree_find_root(mpi.least, iter); + else + sc.common.root = &tbl->tree.root; + pp = find_node2(&tbl->common, sc.common.root, mpi.least); if (pp) { - doit_select_replace(tb, pp, &sc, 0 /* dummy */); + doit_select_replace(&tbl->common, pp, &sc.common, 0 /* dummy */); reset_static_stack(stack_container); /* may refer replaced term */ } RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE); } - stack = get_any_stack((DbTable*)tb,stack_container); + stack = get_any_stack(tbl,stack_container); if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { - if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) { - lastkey = GETKEY(tb, this->dbterm.tpl); - } + this = find_next_from_pb_key(tbl, &sc.common.root, stack, mpi.most, iter); + if (this) + lastkey = GETKEY(tbl, this->dbterm.tpl); sc.end_condition = mpi.least; } + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_last_root(iter); + else + sc.common.root = &tbl->tree.root; + } - traverse_update_backwards(tb, root, stack, lastkey, &doit_select_replace, &sc); - release_stack((DbTable*)tb,stack_container,stack); + traverse_update_backwards(&tbl->common, stack, lastkey, &doit_select_replace, + &sc.common, iter); + release_stack(tbl,stack_container,stack); // the more objects we've replaced, the more reductions we've consumed BUMP_REDS(p, MIN(2000, (1000 - sc.max) + sc.replaced)); if (sc.max > 0) { RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE); } - key = GETKEY(tb, sc.lastobj); + key = GETKEY(tbl, sc.lastobj); sz = size_object(key); if (IS_USMALL(0, sc.replaced)) { hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6); @@ -2070,9 +2177,8 @@ int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **ro static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { - DbTableTree *tb = &tbl->tree; - return db_select_replace_tree_common(p, &tb->common, &tb->root, - tid, pattern, ret, tb); + return db_select_replace_tree_common(p, tbl, tid, pattern, ret, + &tbl->tree, NULL); } int db_take_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root, @@ -2809,20 +2915,32 @@ static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root, return this; } -static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, - DbTreeStack* stack, Eterm key) + +static TreeDbTerm *find_next_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp, + DbTreeStack* stack, Eterm key, + CATreeRootIterator* iter) { + TreeDbTerm* root; TreeDbTerm *this; TreeDbTerm *tmp; Sint c; + if (iter) { + *rootpp = catree_find_next_from_pb_key_root(key, iter); + root = *rootpp ? **rootpp : NULL; + } + else { + *rootpp = &tbl->tree.root; + root = tbl->tree.root; + } + /* spool the stack, we have to "re-search" */ stack->pos = stack->slot = 0; if (( this = root ) == NULL) return NULL; for (;;) { PUSH_NODE(stack, this); - if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl))) >= 0) { + if (( c = cmp_partly_bound(key,GETKEY(tbl, this->dbterm.tpl))) >= 0) { if (this->right == NULL) { do { tmp = POP_NODE(stack); @@ -2842,20 +2960,31 @@ static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, } } -static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, - DbTreeStack* stack, Eterm key) +static TreeDbTerm *find_prev_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp, + DbTreeStack* stack, Eterm key, + CATreeRootIterator* iter) { + TreeDbTerm* root; TreeDbTerm *this; TreeDbTerm *tmp; Sint c; + if (iter) { + *rootpp = catree_find_prev_from_pb_key_root(key, iter); + root = *rootpp ? **rootpp : NULL; + } + else { + *rootpp = &tbl->tree.root; + root = tbl->tree.root; + } + /* spool the stack, we have to "re-search" */ stack->pos = stack->slot = 0; if (( this = root ) == NULL) return NULL; for (;;) { PUSH_NODE(stack, this); - if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl))) <= 0) { + if (( c = cmp_partly_bound(key,GETKEY(tbl, this->dbterm.tpl))) <= 0) { if (this->left == NULL) { do { tmp = POP_NODE(stack); @@ -2866,7 +2995,7 @@ static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, return this; } else this = this->left; - } else /*if (c < 0)*/ { + } else /*if (c > 0)*/ { if (this->right == NULL) /* Done */ return this; else @@ -3059,126 +3188,147 @@ db_finalize_dbterm_tree(int cret, DbUpdateHandle *handle) * Traverse the tree with a callback function, used by db_match_xxx */ static void traverse_backwards(DbTableCommon *tb, - TreeDbTerm **root, DbTreeStack* stack, Eterm lastkey, - int (*doit)(DbTableCommon *, - TreeDbTerm *, - void *, - int), - void *context) + traverse_doit_funcT* doit, + struct select_common *context, + CATreeRootIterator* iter) { TreeDbTerm *this, *next; + TreeDbTerm** root = context->root; - if (lastkey == THE_NON_VALUE) { - stack->pos = stack->slot = 0; - if (( this = *root ) == NULL) { - return; - } - while (this != NULL) { - PUSH_NODE(stack, this); - this = this->right; - } - this = TOP_NODE(stack); - next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); - if (!((*doit)(tb, this, context, 0))) - return; - } else { - next = find_prev(tb, *root, stack, lastkey); - } + do { + if (lastkey == THE_NON_VALUE) { + stack->pos = stack->slot = 0; + if (( this = *root ) == NULL) { + goto next_root; + } + while (this != NULL) { + PUSH_NODE(stack, this); + this = this->right; + } + this = TOP_NODE(stack); + next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); + if (!((*doit)(tb, this, context, 0))) + return; + } else { + next = find_prev(tb, *root, stack, lastkey); + } - while ((this = next) != NULL) { - next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); - if (!((*doit)(tb, this, context, 0))) - return; - } + while ((this = next) != NULL) { + next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); + if (!((*doit)(tb, this, context, 0))) + return; + } + +next_root: + if (!iter) + return; + root = context->root = catree_find_prev_root(iter); + lastkey = THE_NON_VALUE; + } while (root); } /* * Traverse the tree with a callback function, used by db_match_xxx */ static void traverse_forward(DbTableCommon *tb, - TreeDbTerm **root, DbTreeStack* stack, Eterm lastkey, - int (*doit)(DbTableCommon *, - TreeDbTerm *, - void *, - int), - void *context) + traverse_doit_funcT* doit, + struct select_common *context, + CATreeRootIterator* iter) { TreeDbTerm *this, *next; + TreeDbTerm **root = context->root; - if (lastkey == THE_NON_VALUE) { - stack->pos = stack->slot = 0; - if (( this = *root ) == NULL) { - return; - } - while (this != NULL) { - PUSH_NODE(stack, this); - this = this->left; - } - this = TOP_NODE(stack); - next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); - if (!((*doit)(tb, this, context, 1))) - return; - } else { - next = find_next(tb, *root, stack, lastkey); - } + do { + if (lastkey == THE_NON_VALUE) { + stack->pos = stack->slot = 0; + if (( this = *root ) == NULL) { + goto next_root; + } + while (this != NULL) { + PUSH_NODE(stack, this); + this = this->left; + } + this = TOP_NODE(stack); + next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); + if (!((*doit)(tb, this, context, 1))) + return; + } else { + next = find_next(tb, *root, stack, lastkey); + } - while ((this = next) != NULL) { - next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); - if (!((*doit)(tb, this, context, 1))) - return; - } + while ((this = next) != NULL) { + next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); + if (!((*doit)(tb, this, context, 1))) + return; + } + +next_root: + if (!iter) + return; + root = context->root = catree_find_next_root(iter); + lastkey = THE_NON_VALUE; + } while(root); } /* * Traverse the tree with an update callback function, used by db_select_replace */ static void traverse_update_backwards(DbTableCommon *tb, - TreeDbTerm **root, DbTreeStack* stack, Eterm lastkey, int (*doit)(DbTableCommon*, TreeDbTerm**, - void*, + struct select_common*, int), - void* context) + struct select_common* context, + CATreeRootIterator* iter) { int res; TreeDbTerm *this, *next, **this_ptr; + TreeDbTerm** root = context->root; - if (lastkey == THE_NON_VALUE) { - stack->pos = stack->slot = 0; - if (( this = *root ) == NULL) { - return; + do { + if (lastkey == THE_NON_VALUE) { + stack->pos = stack->slot = 0; + if (( this = *root ) == NULL) { + goto next_root; + } + while (this != NULL) { + PUSH_NODE(stack, this); + this = this->right; + } + this = TOP_NODE(stack); + this_ptr = find_ptr(tb, root, stack, this); + ASSERT(this_ptr != NULL); + res = (*doit)(tb, this_ptr, context, 0); + REPLACE_TOP_NODE(stack, *this_ptr); + next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl)); + if (!res) + return; + } else { + next = find_prev(tb, *root, stack, lastkey); } - while (this != NULL) { - PUSH_NODE(stack, this); - this = this->right; + + while ((this = next) != NULL) { + this_ptr = find_ptr(tb, root, stack, this); + ASSERT(this_ptr != NULL); + res = (*doit)(tb, this_ptr, context, 0); + REPLACE_TOP_NODE(stack, *this_ptr); + next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl)); + if (!res) + return; } - this = TOP_NODE(stack); - this_ptr = find_ptr(tb, root, stack, this); - ASSERT(this_ptr != NULL); - res = (*doit)(tb, this_ptr, context, 0); - REPLACE_TOP_NODE(stack, *this_ptr); - next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl)); - if (!res) - return; - } else { - next = find_prev(tb, *root, stack, lastkey); - } - while ((this = next) != NULL) { - this_ptr = find_ptr(tb, root, stack, this); - ASSERT(this_ptr != NULL); - res = (*doit)(tb, this_ptr, context, 0); - REPLACE_TOP_NODE(stack, *this_ptr); - next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl)); - if (!res) +next_root: + if (!iter) return; - } + root = context->root = catree_find_prev_root(iter); + lastkey = THE_NON_VALUE; + } while (root); } static enum ms_key_boundness key_boundness(DbTableCommon *tb, @@ -3269,7 +3419,8 @@ static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done) } } -static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key) { +Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key) +{ int done = 0; Sint ret = do_cmp_partly_bound(partly_bound_key, bound_key, &done); #ifdef HARDDEBUG @@ -3485,7 +3636,8 @@ static int do_partly_bound_can_match_greater(Eterm a, Eterm b, * Callback functions for the different match functions */ -static int doit_select(DbTableCommon *tb, TreeDbTerm *this, void *ptr, +static int doit_select(DbTableCommon *tb, TreeDbTerm *this, + struct select_common* ptr, int forward) { struct select_context *sc = (struct select_context *) ptr; @@ -3520,7 +3672,8 @@ static int doit_select(DbTableCommon *tb, TreeDbTerm *this, void *ptr, return 1; } -static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, void *ptr, +static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, + struct select_common* ptr, int forward) { struct select_count_context *sc = (struct select_count_context *) ptr; @@ -3544,7 +3697,8 @@ static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, void *ptr, return 1; } -static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, void *ptr, +static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, + struct select_common* ptr, int forward) { struct select_context *sc = (struct select_context *) ptr; @@ -3582,7 +3736,8 @@ static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, void *ptr, } -static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr, +static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, + struct select_common *ptr, int forward) { struct select_delete_context *sc = (struct select_delete_context *) ptr; @@ -3601,7 +3756,7 @@ static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr, ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, NULL, 0); if (ret == am_true) { key = GETKEY(sc->tb, this->dbterm.tpl); - linkout_tree(sc->tb, sc->root, key, sc->stack); + linkout_tree(sc->tb, sc->common.root, key, sc->stack); sc->erase_lastterm = 1; ++sc->accum; } @@ -3611,7 +3766,8 @@ static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr, return 1; } -static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this, void *ptr, +static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this, + struct select_common* ptr, int forward) { struct select_replace_context *sc = (struct select_replace_context *) ptr; diff --git a/erts/emulator/beam/erl_db_tree_util.h b/erts/emulator/beam/erl_db_tree_util.h index fbd9a9124a..c816660c71 100644 --- a/erts/emulator/beam/erl_db_tree_util.h +++ b/erts/emulator/beam/erl_db_tree_util.h @@ -93,48 +93,52 @@ int db_erase_object_tree_common(DbTable *tbl, TreeDbTerm **root, Eterm object, int db_slot_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, Eterm slot_term, Eterm *ret, DbTableTree *stack_container); -int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, +int db_select_chunk_tree_common(Process *p, DbTable *tb, Eterm tid, Eterm pattern, Sint chunk_size, int reverse, Eterm *ret, - DbTableTree *stack_container); -int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, + DbTableTree *stack_container, + CATreeRootIterator*); +int db_select_tree_common(Process *p, DbTable *tb, Eterm tid, Eterm pattern, int reverse, Eterm *ret, - DbTableTree *stack_container); + DbTableTree *stack_container, + CATreeRootIterator*); int db_select_delete_tree_common(Process *p, DbTable *tbl, - TreeDbTerm **root, Eterm tid, Eterm pattern, Eterm *ret, - DbTreeStack* stack); + DbTreeStack* stack, + CATreeRootIterator* iter); int db_select_continue_tree_common(Process *p, DbTableCommon *tb, - TreeDbTerm **root, Eterm continuation, Eterm *ret, - DbTableTree *stack_container); + DbTableTree *stack_container, + CATreeRootIterator* iter); int db_select_delete_continue_tree_common(Process *p, DbTable *tbl, - TreeDbTerm **root, Eterm continuation, Eterm *ret, - DbTreeStack* stack); -int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, + DbTreeStack* stack, + CATreeRootIterator* iter); +int db_select_count_tree_common(Process *p, DbTable *tb, Eterm tid, Eterm pattern, Eterm *ret, - DbTableTree *stack_container); + DbTableTree *stack_container, + CATreeRootIterator* iter); int db_select_count_continue_tree_common(Process *p, - DbTableCommon *tb, - TreeDbTerm **root, + DbTable *tb, Eterm continuation, Eterm *ret, - DbTableTree *stack_container); -int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, + DbTableTree *stack_container, + CATreeRootIterator* iter); +int db_select_replace_tree_common(Process *p, DbTable*, Eterm tid, Eterm pattern, Eterm *ret, - DbTableTree *stack_container); + DbTableTree *stack_container, + CATreeRootIterator* iter); int db_select_replace_continue_tree_common(Process *p, - DbTableCommon *tb, - TreeDbTerm **root, + DbTable*, Eterm continuation, Eterm *ret, - DbTableTree *stack_container); + DbTableTree *stack_container, + CATreeRootIterator* iter); int db_take_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root, Eterm key, Eterm *ret, DbTreeStack *stack /* NULL if no static stack */); @@ -148,4 +152,6 @@ int db_lookup_dbterm_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root, DbTableTree *stack_container); void db_finalize_dbterm_tree_common(int cret, DbUpdateHandle *handle, DbTableTree *stack_container); +Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key); + #endif /* _DB_TREE_UTIL_H */ |