aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r--erts/emulator/beam/erl_db_catree.c404
-rw-r--r--erts/emulator/beam/erl_db_catree.h19
-rw-r--r--erts/emulator/beam/erl_db_tree.c658
-rw-r--r--erts/emulator/beam/erl_db_tree_util.h46
4 files changed, 805 insertions, 322 deletions
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c
index a8e48bce1b..2c3b262312 100644
--- a/erts/emulator/beam/erl_db_catree.c
+++ b/erts/emulator/beam/erl_db_catree.c
@@ -158,6 +158,7 @@ db_lookup_dbterm_catree(Process *, DbTable *, Eterm key, Eterm obj,
DbUpdateHandle*);
static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *);
+
/*
** External interface
*/
@@ -710,6 +711,40 @@ void unlock_route_node(DbTableCATreeNode *route_node)
erts_mtx_unlock(&route_node->u.route.lock);
}
+static ERTS_INLINE
+void init_root_iterator(DbTableCATree* tb, CATreeRootIterator* iter,
+ int read_only)
+{
+ iter->tb = tb;
+ iter->read_only = read_only;
+ iter->locked_bnode = NULL;
+ iter->next_route_key = THE_NON_VALUE;
+}
+
+static ERTS_INLINE
+void lock_iter_base_node(CATreeRootIterator* iter,
+ DbTableCATreeBaseNode *base_node)
+{
+ ASSERT(!iter->locked_bnode);
+ if (iter->read_only)
+ rlock_base_node(base_node);
+ else
+ wlock_base_node(base_node);
+ iter->locked_bnode = base_node;
+}
+
+static ERTS_INLINE
+void unlock_iter_base_node(CATreeRootIterator* iter)
+{
+ ASSERT(iter->locked_bnode);
+ if (iter->read_only)
+ runlock_base_node(iter->locked_bnode);
+ else
+ wunlock_base_node(iter->locked_bnode);
+ iter->locked_bnode = NULL;
+}
+
+
/*
* The following macros are used to create the ETS functions that only
@@ -717,6 +752,32 @@ void unlock_route_node(DbTableCATreeNode *route_node)
* db_erase_catree).
*/
+
+typedef struct
+{
+ DbTableCATreeNode *parent;
+ int current_level;
+} FindBaseNode;
+
+static ERTS_INLINE
+DbTableCATreeBaseNode* find_base_node(DbTableCATree* tb, Eterm key,
+ FindBaseNode* fbn)
+{
+ DbTableCATreeNode* ERTS_RESTRICT node = GET_ROOT_ACQB(tb);
+ fbn->parent = NULL;
+ fbn->current_level = 0;
+ while (!node->is_base_node) {
+ fbn->current_level++;
+ fbn->parent = node;
+ if (cmp_key_route(key, node) < 0) {
+ node = GET_LEFT_ACQB(node);
+ } else {
+ node = GET_RIGHT_ACQB(node);
+ }
+ }
+ return &node->u.base;
+}
+
#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(LOCK,UNLOCK) \
int retry; \
DbTableCATreeNode *current_node; \
@@ -1675,7 +1736,10 @@ void db_initialize_catree(void)
int db_create_catree(Process *p, DbTable *tbl)
{
DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *root = create_base_node(tb, NULL, NIL);
+ DbTableCATreeNode *root;
+
+ root = create_base_node(tb, NULL,
+ NIL);/* lc_key of first base node does not matter */
tb->deletion = 0;
tb->base_nodes_to_free_list = NULL;
erts_atomic_init_relb(&(tb->root), (erts_aint_t)root);
@@ -1781,6 +1845,226 @@ static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
return erl_db_catree_do_operation_get(tb, key, p, ret);
}
+
+
+TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator* iter)
+{
+ FindBaseNode fbn;
+ DbTableCATreeBaseNode* base_node;
+
+ while (1) {
+ base_node = find_base_node(iter->tb, key, &fbn);
+ lock_iter_base_node(iter, base_node);
+ if (base_node->is_valid)
+ break;
+ unlock_iter_base_node(iter);
+ }
+ return &base_node->root;
+}
+
+
+TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key,
+ CATreeRootIterator* iter)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode* next_route_node;
+
+ ASSERT(!iter->locked_bnode);
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) <= 0) {
+ node = GET_LEFT_ACQB(node);
+ } else {
+ next_route_node = node;
+ node = GET_RIGHT_ACQB(node);
+ }
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, &node->u.base);
+ if (node->u.base.is_valid) {
+ if (node->u.base.root || !next_route_node) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ return &node->u.base.root;
+ }
+ unlock_iter_base_node(iter);
+ iter->next_route_key = next_route_node->u.route.key.term;
+ return catree_find_next_root(iter);
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter, int next)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode* next_route_node;
+ Eterm key = iter->next_route_key;
+
+ if (iter->locked_bnode)
+ unlock_iter_base_node(iter);
+
+ if (is_non_value(key))
+ return NULL;
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ if (next) {
+ if (cmp_key_route(key,node) < 0) {
+ next_route_node = node;
+ node = GET_LEFT_ACQB(node);
+ } else {
+ node = GET_RIGHT_ACQB(node);
+ }
+ }
+ else {
+ if (cmp_key_route(key,node) > 0) {
+ next_route_node = node;
+ node = GET_RIGHT_ACQB(node);
+ } else {
+ node = GET_LEFT_ACQB(node);
+ }
+ }
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, &node->u.base);
+ if (node->u.base.is_valid) {
+ if (node->u.base.root) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ iter->locked_bnode = &node->u.base;
+ return &node->u.base.root;
+ }
+ if (!next_route_node) {
+ unlock_iter_base_node(iter);
+ return NULL;
+ }
+ key = next_route_node->u.route.key.term;
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+TreeDbTerm** catree_find_next_root(CATreeRootIterator *iter)
+{
+ return catree_find_nextprev_root(iter, 1);
+}
+
+TreeDbTerm** catree_find_prev_root(CATreeRootIterator *iter)
+{
+ return catree_find_nextprev_root(iter, 0);
+}
+
+
+TreeDbTerm** catree_find_next_from_pb_key_root(Eterm key,
+ CATreeRootIterator* iter)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode* next_route_node;
+
+ ASSERT(!iter->locked_bnode);
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) >= 0) {
+ next_route_node = node;
+ node = GET_RIGHT_ACQB(node);
+ } else {
+ node = GET_LEFT_ACQB(node);
+ }
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, &node->u.base);
+ if (node->u.base.is_valid) {
+ if (node->u.base.root || !next_route_node) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ iter->locked_bnode = &node->u.base;
+ return &node->u.base.root;
+ }
+ unlock_iter_base_node(iter);
+ iter->next_route_key = next_route_node->u.route.key.term;
+ return catree_find_prev_root(iter);
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+static TreeDbTerm** catree_find_firstlast_root(CATreeRootIterator* iter,
+ int first)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode* next_route_node;
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ next_route_node = node;
+ node = first ? GET_LEFT_ACQB(node) : GET_RIGHT_ACQB(node);
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, &node->u.base);
+ if (node->u.base.is_valid) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ iter->locked_bnode = &node->u.base;
+ return &node->u.base.root;
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+TreeDbTerm** catree_find_first_root(CATreeRootIterator* iter)
+{
+ return catree_find_firstlast_root(iter, 1);
+}
+
+TreeDbTerm** catree_find_last_root(CATreeRootIterator* iter)
+{
+ return catree_find_firstlast_root(iter, 0);
+}
+
+
#define ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS Eterm *ret
ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(member,
ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS,
@@ -1861,25 +2145,28 @@ static int db_select_continue_catree(Process *p,
Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_continue_tree_common(p, &tbl->common, &base->u.base.root,
- continuation, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_continue_tree_common(p, &tbl->common,
+ continuation, ret, NULL, &iter);
+ if (iter.locked_bnode)
+ runlock_base_node(iter.locked_bnode);
+
return result;
}
-
static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, int reverse, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_tree_common(p, &tbl->common, &base->u.base.root,
- tid, pattern, reverse, ret,
- NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_tree_common(p, tbl, tid, pattern, reverse, ret,
+ NULL, &iter);
+ if (iter.locked_bnode)
+ runlock_base_node(iter.locked_bnode);
return result;
}
@@ -1889,12 +2176,14 @@ static int db_select_count_continue_catree(Process *p,
Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_count_continue_tree_common(p, &tbl->common,
- &base->u.base.root,
- continuation, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_count_continue_tree_common(p, tbl,
+ continuation, ret, NULL,
+ &iter);
+ if (iter.locked_bnode)
+ runlock_base_node(iter.locked_bnode);
return result;
}
@@ -1902,11 +2191,14 @@ static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_count_tree_common(p, &tbl->common, &base->u.base.root,
- tid, pattern, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_count_tree_common(p, tbl,
+ tid, pattern, ret, NULL, &iter);
+ if (iter.locked_bnode)
+ runlock_base_node(iter.locked_bnode);
+
return result;
}
@@ -1915,11 +2207,14 @@ static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
int reversed, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_chunk_tree_common(p, &tbl->common, &base->u.base.root,
- tid, pattern, chunk_size, reversed, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_chunk_tree_common(p, tbl,
+ tid, pattern, chunk_size, reversed, ret,
+ NULL, &iter);
+ if (iter.locked_bnode)
+ runlock_base_node(iter.locked_bnode);
return result;
}
@@ -1931,12 +2226,14 @@ static int db_select_delete_continue_catree(Process *p,
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
int result;
- DbTableCATreeNode *base;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_delete_continue_tree_common(p, tbl, &base->u.base.root,
- continuation, ret, &stack);
- wunlock_base_node(&(base->u.base));
+ result = db_select_delete_continue_tree_common(p, tbl, continuation, ret,
+ &stack, &iter);
+ if (iter.locked_bnode)
+ wunlock_base_node(iter.locked_bnode);
return result;
}
@@ -1946,12 +2243,15 @@ static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
int result;
- DbTableCATreeNode *base;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_delete_tree_common(p, tbl, &base->u.base.root,
- tid, pattern, ret, &stack);
- wunlock_base_node(&(base->u.base));
+ result = db_select_delete_tree_common(p, tbl,
+ tid, pattern, ret, &stack,
+ &iter);
+ if (iter.locked_bnode)
+ wunlock_base_node(iter.locked_bnode);
return result;
}
@@ -1959,12 +2259,13 @@ static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_replace_tree_common(p, &tbl->common,
- &base->u.base.root,
- tid, pattern, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
+ result = db_select_replace_tree_common(p, tbl,
+ tid, pattern, ret, NULL, &iter);
+ if (iter.locked_bnode)
+ wunlock_base_node(iter.locked_bnode);
return result;
}
@@ -1972,12 +2273,13 @@ static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_replace_continue_tree_common(p, &tbl->common,
- &base->u.base.root,
- continuation, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
+ result = db_select_replace_continue_tree_common(p, tbl, continuation, ret,
+ NULL, &iter);
+ if (iter.locked_bnode)
+ wunlock_base_node(iter.locked_bnode);
return result;
}
diff --git a/erts/emulator/beam/erl_db_catree.h b/erts/emulator/beam/erl_db_catree.h
index f9c0784289..510a9e81d3 100644
--- a/erts/emulator/beam/erl_db_catree.h
+++ b/erts/emulator/beam/erl_db_catree.h
@@ -92,11 +92,30 @@ typedef struct db_table_catree {
int is_routing_nodes_freed;
} DbTableCATree;
+typedef struct {
+ DbTableCATree* tb;
+ Eterm next_route_key;
+ DbTableCATreeBaseNode* locked_bnode;
+ int read_only;
+} CATreeRootIterator;
+
+
void db_initialize_catree(void);
int db_create_catree(Process *p, DbTable *tbl);
+TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator*);
+
+TreeDbTerm** catree_find_next_from_pb_key_root(Eterm key, CATreeRootIterator*);
+TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key, CATreeRootIterator*);
+TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator*, int next);
+TreeDbTerm** catree_find_next_root(CATreeRootIterator*);
+TreeDbTerm** catree_find_prev_root(CATreeRootIterator*);
+TreeDbTerm** catree_find_first_root(CATreeRootIterator*);
+TreeDbTerm** catree_find_last_root(CATreeRootIterator*);
+
+
#ifdef ERTS_ENABLE_LOCK_COUNT
void erts_lcnt_enable_db_catree_lock_count(DbTableCATree *tb, int enable);
#endif
diff --git a/erts/emulator/beam/erl_db_tree.c b/erts/emulator/beam/erl_db_tree.c
index 4c58c40b9c..71ec6e28e5 100644
--- a/erts/emulator/beam/erl_db_tree.c
+++ b/erts/emulator/beam/erl_db_tree.c
@@ -212,10 +212,16 @@ struct mp_info {
Binary *mp; /* The compiled match program */
};
+struct select_common {
+ TreeDbTerm **root;
+};
+
+
/*
* Used by doit_select(_chunk)
*/
struct select_context {
+ struct select_common common;
Process *p;
Eterm accum;
Binary *mp;
@@ -231,6 +237,7 @@ struct select_context {
* Used by doit_select_count
*/
struct select_count_context {
+ struct select_common common;
Process *p;
Binary *mp;
Eterm end_condition;
@@ -244,9 +251,9 @@ struct select_count_context {
* Used by doit_select_delete
*/
struct select_delete_context {
+ struct select_common common;
Process *p;
DbTableCommon *tb;
- TreeDbTerm **root;
DbTreeStack *stack;
Uint accum;
Binary *mp;
@@ -261,9 +268,9 @@ struct select_delete_context {
* Used by doit_select_replace
*/
struct select_replace_context {
+ struct select_common common;
Process *p;
DbTableCommon *tb;
- TreeDbTerm **root;
Binary *mp;
Eterm end_condition;
Eterm *lastobj;
@@ -298,64 +305,62 @@ static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root,
DbTreeStack* stack, Eterm key);
static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root,
DbTreeStack* stack, Eterm key);
-static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
- DbTreeStack* stack, Eterm key);
-static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
- DbTreeStack* stack, Eterm key);
+static TreeDbTerm *find_next_from_pb_key(DbTable*, TreeDbTerm*** rootpp,
+ DbTreeStack* stack, Eterm key,
+ CATreeRootIterator*);
+static TreeDbTerm *find_prev_from_pb_key(DbTable*, TreeDbTerm*** rootpp,
+ DbTreeStack* stack, Eterm key,
+ CATreeRootIterator*);
+typedef int traverse_doit_funcT(DbTableCommon*, TreeDbTerm*,
+ struct select_common*, int forward);
+
static void traverse_backwards(DbTableCommon *tb,
- TreeDbTerm **root,
DbTreeStack*,
Eterm lastkey,
- int (*doit)(DbTableCommon *,
- TreeDbTerm *,
- void *,
- int),
- void *context);
+ traverse_doit_funcT*,
+ struct select_common *context,
+ CATreeRootIterator*);
static void traverse_forward(DbTableCommon *tb,
- TreeDbTerm **root,
DbTreeStack*,
Eterm lastkey,
- int (*doit)(DbTableCommon *tb,
- TreeDbTerm *,
- void *,
- int),
- void *context);
+ traverse_doit_funcT*,
+ struct select_common *context,
+ CATreeRootIterator*);
static void traverse_update_backwards(DbTableCommon *tb,
- TreeDbTerm **root,
DbTreeStack*,
Eterm lastkey,
int (*doit)(DbTableCommon *tb,
TreeDbTerm **, // out
- void *,
+ struct select_common*,
int),
- void *context);
+ struct select_common*,
+ CATreeRootIterator*);
static enum ms_key_boundness key_boundness(DbTableCommon *tb,
Eterm pattern, Eterm *keyp);
-static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key);
static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done);
static int analyze_pattern(DbTableCommon *tb, Eterm pattern,
extra_match_validator_t extra_validator, /* Optional callback */
struct mp_info *mpi);
static int doit_select(DbTableCommon *tb,
- TreeDbTerm *this,
- void *ptr,
+ TreeDbTerm *this,
+ struct select_common* ptr,
int forward);
static int doit_select_count(DbTableCommon *tb,
TreeDbTerm *this,
- void *ptr,
+ struct select_common*,
int forward);
static int doit_select_chunk(DbTableCommon *tb,
TreeDbTerm *this,
- void *ptr,
+ struct select_common*,
int forward);
static int doit_select_delete(DbTableCommon *tb,
TreeDbTerm *this,
- void *ptr,
+ struct select_common*,
int forward);
static int doit_select_replace(DbTableCommon *tb,
TreeDbTerm **this_ptr,
- void *ptr,
+ struct select_common*,
int forward);
static int partly_bound_can_match_lesser(Eterm partly_bound_1,
@@ -987,10 +992,10 @@ static BIF_RETTYPE bif_trap3(Export *bif,
int db_select_continue_tree_common(Process *p,
DbTableCommon *tb,
- TreeDbTerm **root,
Eterm continuation,
Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
DbTreeStack* stack;
struct select_context sc;
@@ -1004,7 +1009,6 @@ int db_select_continue_tree_common(Process *p,
Sint chunk_size;
Sint reverse;
-
#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);
/* Decode continuation. We know it's a tuple but not the arity or
@@ -1038,23 +1042,32 @@ int db_select_continue_tree_common(Process *p,
reverse = unsigned_val(tptr[7]);
sc.got = signed_val(tptr[8]);
- stack = get_any_stack((DbTable*)tb, stack_container);
- if (chunk_size) {
- if (reverse) {
- traverse_backwards(tb, root, stack, lastkey, &doit_select_chunk, &sc);
- } else {
- traverse_forward(tb, root, stack, lastkey, &doit_select_chunk, &sc);
- }
- } else {
- if (reverse) {
- traverse_forward(tb, root, stack, lastkey, &doit_select, &sc);
- } else {
- traverse_backwards(tb, root, stack, lastkey, &doit_select, &sc);
- }
+ if (iter) {
+ iter->next_route_key = lastkey;
+ sc.common.root = catree_find_nextprev_root(iter, !!reverse != !!chunk_size);
}
- release_stack((DbTable*)tb,stack_container,stack);
+ else
+ sc.common.root = &((DbTableTree*)tb)->root;
+
+ if (sc.common.root) {
+ stack = get_any_stack((DbTable*)tb, stack_container);
+ if (chunk_size) {
+ if (reverse) {
+ traverse_backwards(tb, stack, lastkey, &doit_select_chunk, &sc.common, iter);
+ } else {
+ traverse_forward(tb, stack, lastkey, &doit_select_chunk, &sc.common, iter);
+ }
+ } else {
+ if (reverse) {
+ traverse_forward(tb, stack, lastkey, &doit_select, &sc.common, iter);
+ } else {
+ traverse_backwards(tb, stack, lastkey, &doit_select, &sc.common, iter);
+ }
+ }
+ release_stack((DbTable*)tb,stack_container,stack);
- BUMP_REDS(p, 1000 - sc.max);
+ BUMP_REDS(p, 1000 - sc.max);
+ }
if (sc.max > 0 || (chunk_size && sc.got == chunk_size)) {
if (chunk_size) {
@@ -1148,13 +1161,14 @@ static int db_select_continue_tree(Process *p,
Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- return db_select_continue_tree_common(p, &tb->common, &tb->root,
- continuation, ret, tb);
+ return db_select_continue_tree_common(p, &tb->common,
+ continuation, ret, tb, NULL);
}
-int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+int db_select_tree_common(Process *p, DbTable *tb,
Eterm tid, Eterm pattern, int reverse, Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
/* Strategy: Traverse backwards to build resulting list from tail to head */
DbTreeStack* stack;
@@ -1185,11 +1199,11 @@ int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
sc.p = p;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->keypos;
+ sc.keypos = tb->common.keypos;
sc.got = 0;
sc.chunk_size = 0;
- if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
@@ -1202,29 +1216,47 @@ int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
if (mpi.key_boundness == MS_KEY_BOUND) {
ASSERT(CMP_EQ(mpi.least, mpi.most));
- this = find_node(tb, *root, mpi.least, NULL);
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &tb->tree.root;
+ this = find_node(&tb->common, *sc.common.root, mpi.least, NULL);
if (this)
- doit_select(tb, this, &sc, 0 /* direction doesn't matter */);
+ doit_select(&tb->common, this, &sc.common, 0 /* direction doesn't matter */);
RET_TO_BIF(sc.accum,DB_ERROR_NONE);
}
stack = get_any_stack((DbTable*)tb,stack_container);
if (reverse) {
if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
- if ((this = find_prev_from_pb_key(tb, *root, stack, mpi.least)) != NULL) {
+ this = find_prev_from_pb_key(tb, &sc.common.root, stack, mpi.least, iter);
+ if (this)
lastkey = GETKEY(tb, this->dbterm.tpl);
- }
sc.end_condition = mpi.most;
}
- traverse_forward(tb, root, stack, lastkey, &doit_select, &sc);
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_first_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
+ traverse_forward(&tb->common, stack, lastkey, &doit_select, &sc.common, iter);
} else {
if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
- if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(tb, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
- traverse_backwards(tb, root, stack, lastkey, &doit_select, &sc);
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
+ traverse_backwards(&tb->common, stack, lastkey, &doit_select, &sc.common, iter);
}
release_stack((DbTable*)tb,stack_container,stack);
#ifdef HARDDEBUG
@@ -1265,17 +1297,16 @@ int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
static int db_select_tree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, int reverse, Eterm *ret)
{
- DbTableTree *tb = &tbl->tree;
- return db_select_tree_common(p, &tb->common, &tb->root, tid,
- pattern, reverse, ret, tb);
+ return db_select_tree_common(p, tbl, tid,
+ pattern, reverse, ret, &tbl->tree, NULL);
}
int db_select_count_continue_tree_common(Process *p,
- DbTableCommon *tb,
- TreeDbTerm **root,
+ DbTable *tb,
Eterm continuation,
Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
DbTreeStack* stack;
struct select_count_context sc;
@@ -1288,7 +1319,6 @@ int db_select_count_continue_tree_common(Process *p,
Eterm *tptr;
Eterm egot;
-
#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);
/* Decode continuation. We know it's a tuple and everything else as
@@ -1313,18 +1343,28 @@ int db_select_count_continue_tree_common(Process *p,
sc.end_condition = NIL;
sc.lastobj = NULL;
sc.max = 1000;
- sc.keypos = tb->keypos;
+ sc.keypos = tb->common.keypos;
if (is_big(tptr[5])) {
sc.got = big_to_uint32(tptr[5]);
} else {
sc.got = unsigned_val(tptr[5]);
}
- stack = get_any_stack((DbTable*)tb, stack_container);
- traverse_backwards(tb, root, stack, lastkey, &doit_select_count, &sc);
- release_stack((DbTable*)tb,stack_container,stack);
+ if (iter) {
+ iter->next_route_key = lastkey;
+ sc.common.root = catree_find_prev_root(iter);
+ }
+ else {
+ sc.common.root = &tb->tree.root;
+ }
- BUMP_REDS(p, 1000 - sc.max);
+ if (sc.common.root) {
+ stack = get_any_stack(tb, stack_container);
+ traverse_backwards(&tb->common, stack, lastkey, &doit_select_count, &sc.common, iter);
+ release_stack(tb,stack_container,stack);
+
+ BUMP_REDS(p, 1000 - sc.max);
+ }
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.got,p), DB_ERROR_NONE);
@@ -1369,14 +1409,15 @@ static int db_select_count_continue_tree(Process *p,
Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- return db_select_count_continue_tree_common(p, &tb->common, &tb->root,
- continuation, ret, tb);
+ return db_select_count_continue_tree_common(p, tbl,
+ continuation, ret, tb, NULL);
}
-int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+int db_select_count_tree_common(Process *p, DbTable *tb,
Eterm tid, Eterm pattern, Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
DbTreeStack* stack;
struct select_count_context sc;
@@ -1391,7 +1432,6 @@ int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root
Eterm egot;
Eterm mpb;
-
#define RET_TO_BIF(Term,RetVal) do { \
if (mpi.mp != NULL) { \
erts_bin_free(mpi.mp); \
@@ -1406,10 +1446,10 @@ int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root
sc.p = p;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->keypos;
+ sc.keypos = tb->common.keypos;
sc.got = 0;
- if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
@@ -1422,21 +1462,32 @@ int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root
if (mpi.key_boundness == MS_KEY_BOUND) {
ASSERT(CMP_EQ(mpi.least, mpi.most));
- this = find_node(tb, *root, mpi.least, NULL);
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &((DbTable*)tb)->tree.root;
+ this = find_node(&tb->common, *sc.common.root, mpi.least, NULL);
if (this)
- doit_select_count(tb, this, &sc, 0 /* dummy */);
+ doit_select_count(&tb->common, this, &sc.common, 0 /* dummy */);
RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE);
}
stack = get_any_stack((DbTable*)tb, stack_container);
if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
- if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(tb, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
- traverse_backwards(tb, root, stack, lastkey, &doit_select_count, &sc);
+ traverse_backwards(&tb->common, stack, lastkey, &doit_select_count, &sc.common, iter);
release_stack((DbTable*)tb,stack_container,stack);
BUMP_REDS(p, 1000 - sc.max);
if (sc.max > 0) {
@@ -1477,15 +1528,16 @@ static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- return db_select_count_tree_common(p, &tb->common, &tb->root,
- tid, pattern, ret, tb);
+ return db_select_count_tree_common(p, tbl,
+ tid, pattern, ret, tb, NULL);
}
-int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+int db_select_chunk_tree_common(Process *p, DbTable *tb,
Eterm tid, Eterm pattern, Sint chunk_size,
int reverse, Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
DbTreeStack* stack;
struct select_context sc;
@@ -1499,7 +1551,6 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root
int errcode;
Eterm mpb;
-
#define RET_TO_BIF(Term,RetVal) do { \
if (mpi.mp != NULL) { \
erts_bin_free(mpi.mp); \
@@ -1515,11 +1566,11 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root
sc.p = p;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->keypos;
+ sc.keypos = tb->common.keypos;
sc.got = 0;
sc.chunk_size = chunk_size;
- if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
@@ -1532,9 +1583,13 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root
if (mpi.key_boundness == MS_KEY_BOUND) {
ASSERT(CMP_EQ(mpi.least, mpi.most));
- this = find_node(tb, *root, mpi.least, NULL);
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &tb->tree.root;
+ this = find_node(&tb->common, *sc.common.root, mpi.least, NULL);
if (this)
- doit_select(tb, this, &sc, 0 /* direction doesn't matter */);
+ doit_select(&tb->common, this, &sc.common, 0 /* direction doesn't matter */);
if (sc.accum != NIL) {
hp=HAlloc(p, 3);
RET_TO_BIF(TUPLE2(hp,sc.accum,am_EOT),DB_ERROR_NONE);
@@ -1546,20 +1601,34 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root
stack = get_any_stack((DbTable*)tb,stack_container);
if (reverse) {
if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
- if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(tb, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
- traverse_backwards(tb, root, stack, lastkey, &doit_select_chunk, &sc);
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
+ traverse_backwards(&tb->common, stack, lastkey, &doit_select_chunk, &sc.common, iter);
} else {
if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
- if ((this = find_prev_from_pb_key(tb, *root, stack, mpi.least)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ this = find_prev_from_pb_key(tb, &sc.common.root, stack, mpi.least, iter);
+ if (this)
+ lastkey = GETKEY(tb, this->dbterm.tpl);
sc.end_condition = mpi.most;
}
- traverse_forward(tb, root, stack, lastkey, &doit_select_chunk, &sc);
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_first_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
+ traverse_forward(&tb->common, stack, lastkey, &doit_select_chunk, &sc.common, iter);
}
release_stack((DbTable*)tb,stack_container,stack);
@@ -1636,18 +1705,18 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- return db_select_chunk_tree_common(p, &tb->common, &tb->root,
+ return db_select_chunk_tree_common(p, tbl,
tid, pattern, chunk_size,
- reverse, ret, tb);
+ reverse, ret, tb, NULL);
}
int db_select_delete_continue_tree_common(Process *p,
DbTable *tbl,
- TreeDbTerm **root,
Eterm continuation,
Eterm *ret,
- DbTreeStack* stack)
+ DbTreeStack* stack,
+ CATreeRootIterator* iter)
{
struct select_delete_context sc;
unsigned sz;
@@ -1659,7 +1728,6 @@ int db_select_delete_continue_tree_common(Process *p,
Eterm *tptr;
Eterm eaccsum;
-
#define RET_TO_BIF(Term, State) do { \
if (sc.erase_lastterm) { \
free_term(tbl, sc.lastterm); \
@@ -1682,7 +1750,6 @@ int db_select_delete_continue_tree_common(Process *p,
mp = erts_db_get_match_prog_binary_unchecked(tptr[4]);
sc.p = p;
sc.tb = &tbl->common;
- sc.root = root;
sc.stack = stack;
if (is_big(tptr[5])) {
sc.accum = big_to_uint32(tptr[5]);
@@ -1694,9 +1761,19 @@ int db_select_delete_continue_tree_common(Process *p,
sc.max = 1000;
sc.keypos = tbl->common.keypos;
- traverse_backwards(&tbl->common, root, stack, lastkey, &doit_select_delete, &sc);
+ if (iter) {
+ iter->next_route_key = lastkey;
+ sc.common.root = catree_find_prev_root(iter);
+ }
+ else {
+ sc.common.root = &tbl->tree.root;
+ }
- BUMP_REDS(p, 1000 - sc.max);
+ if (sc.common.root) {
+ traverse_backwards(&tbl->common, stack, lastkey, &doit_select_delete, &sc.common, iter);
+
+ BUMP_REDS(p, 1000 - sc.max);
+ }
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.accum, p), DB_ERROR_NONE);
@@ -1738,16 +1815,15 @@ static int db_select_delete_continue_tree(Process *p,
{
DbTableTree *tb = &tbl->tree;
ASSERT(!erts_atomic_read_nob(&tb->is_stack_busy));
- return db_select_delete_continue_tree_common(p, tbl, &tb->root,
- continuation, ret,
- &tb->static_stack);
+ return db_select_delete_continue_tree_common(p, tbl, continuation, ret,
+ &tb->static_stack, NULL);
}
int db_select_delete_tree_common(Process *p, DbTable *tbl,
- TreeDbTerm **root,
Eterm tid, Eterm pattern,
Eterm *ret,
- DbTreeStack* stack)
+ DbTreeStack* stack,
+ CATreeRootIterator* iter)
{
struct select_delete_context sc;
struct mp_info mpi;
@@ -1782,7 +1858,6 @@ int db_select_delete_tree_common(Process *p, DbTable *tbl,
sc.end_condition = NIL;
sc.keypos = tbl->common.keypos;
sc.tb = &tbl->common;
- sc.root = root;
sc.stack = stack;
if ((errcode = analyze_pattern(&tbl->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
@@ -1798,21 +1873,33 @@ int db_select_delete_tree_common(Process *p, DbTable *tbl,
if (mpi.key_boundness == MS_KEY_BOUND) {
ASSERT(CMP_EQ(mpi.least, mpi.most));
- this = find_node(&tbl->common, *root, mpi.least, NULL);
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &tbl->tree.root;
+ this = find_node(&tbl->common, *sc.common.root, mpi.least, NULL);
if (this)
- doit_select_delete(&tbl->common, this, &sc, 0 /* direction doesn't
+ doit_select_delete(&tbl->common, this, &sc.common, 0 /* direction doesn't
matter */);
RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE);
}
if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
- if ((this = find_next_from_pb_key(&tbl->common, *root, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(&tbl->common, this->dbterm.tpl);
- }
+ this = find_next_from_pb_key(tbl, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(&tbl->common, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tbl->tree.root;
+ }
- traverse_backwards(&tbl->common, root, stack, lastkey, &doit_select_delete, &sc);
+ traverse_backwards(&tbl->common, stack, lastkey,
+ &doit_select_delete, &sc.common, iter);
BUMP_REDS(p, 1000 - sc.max);
if (sc.max > 0) {
@@ -1856,16 +1943,16 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- return db_select_delete_tree_common(p, tbl, &tb->root, tid, pattern, ret,
- &tb->static_stack);
+ return db_select_delete_tree_common(p, tbl, tid, pattern, ret,
+ &tb->static_stack, NULL);
}
int db_select_replace_continue_tree_common(Process *p,
- DbTableCommon *tb,
- TreeDbTerm **root,
+ DbTable *tbl,
Eterm continuation,
Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
DbTreeStack* stack;
struct select_replace_context sc;
@@ -1902,7 +1989,7 @@ int db_select_replace_continue_tree_common(Process *p,
sc.end_condition = NIL;
sc.lastobj = NULL;
sc.max = 1000;
- sc.keypos = tb->keypos;
+ sc.keypos = tbl->common.keypos;
if (is_big(tptr[5])) {
sc.replaced = big_to_uint32(tptr[5]);
} else {
@@ -1910,9 +1997,18 @@ int db_select_replace_continue_tree_common(Process *p,
}
prev_replaced = sc.replaced;
- stack = get_any_stack((DbTable*)tb,stack_container);
- traverse_update_backwards(tb, root, stack, lastkey, &doit_select_replace, &sc);
- release_stack((DbTable*)tb,stack_container,stack);
+ if (iter) {
+ iter->next_route_key = lastkey;
+ sc.common.root = catree_find_prev_root(iter);
+ }
+ else {
+ sc.common.root = &tbl->tree.root;
+ }
+
+ stack = get_any_stack(tbl, stack_container);
+ traverse_update_backwards(&tbl->common, stack, lastkey, &doit_select_replace,
+ &sc.common, iter);
+ release_stack(tbl, stack_container,stack);
// the more objects we've replaced, the more reductions we've consumed
BUMP_REDS(p, MIN(2000, (1000 - sc.max) + (sc.replaced - prev_replaced)));
@@ -1920,7 +2016,7 @@ int db_select_replace_continue_tree_common(Process *p,
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.replaced,p), DB_ERROR_NONE);
}
- key = GETKEY(tb, sc.lastobj);
+ key = GETKEY(tbl, sc.lastobj);
if (end_condition != NIL &&
(cmp_partly_bound(end_condition,key) > 0)) {
/* done anyway */
@@ -1956,14 +2052,14 @@ static int db_select_replace_continue_tree(Process *p,
Eterm continuation,
Eterm *ret)
{
- DbTableTree *tb = &tbl->tree;
- return db_select_replace_continue_tree_common(p, &tb->common, &tb->root,
- continuation, ret, tb);
+ return db_select_replace_continue_tree_common(p, tbl, continuation, ret,
+ &tbl->tree, NULL);
}
-int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+int db_select_replace_tree_common(Process *p, DbTable *tbl,
Eterm tid, Eterm pattern, Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
DbTreeStack* stack;
struct select_replace_context sc;
@@ -1991,14 +2087,13 @@ int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **ro
sc.lastobj = NULL;
sc.p = p;
- sc.tb = tb;
- sc.root = root;
+ sc.tb = &tbl->common;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->keypos;
+ sc.keypos = tbl->common.keypos;
sc.replaced = 0;
- if ((errcode = analyze_pattern(tb, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tbl->common, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
@@ -2012,32 +2107,44 @@ int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **ro
if (mpi.key_boundness == MS_KEY_BOUND) {
TreeDbTerm** pp;
ASSERT(CMP_EQ(mpi.least, mpi.most));
- pp = find_node2(tb, root, mpi.least);
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &tbl->tree.root;
+ pp = find_node2(&tbl->common, sc.common.root, mpi.least);
if (pp) {
- doit_select_replace(tb, pp, &sc, 0 /* dummy */);
+ doit_select_replace(&tbl->common, pp, &sc.common, 0 /* dummy */);
reset_static_stack(stack_container); /* may refer replaced term */
}
RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE);
}
- stack = get_any_stack((DbTable*)tb,stack_container);
+ stack = get_any_stack(tbl,stack_container);
if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
- if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ this = find_next_from_pb_key(tbl, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(tbl, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tbl->tree.root;
+ }
- traverse_update_backwards(tb, root, stack, lastkey, &doit_select_replace, &sc);
- release_stack((DbTable*)tb,stack_container,stack);
+ traverse_update_backwards(&tbl->common, stack, lastkey, &doit_select_replace,
+ &sc.common, iter);
+ release_stack(tbl,stack_container,stack);
// the more objects we've replaced, the more reductions we've consumed
BUMP_REDS(p, MIN(2000, (1000 - sc.max) + sc.replaced));
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE);
}
- key = GETKEY(tb, sc.lastobj);
+ key = GETKEY(tbl, sc.lastobj);
sz = size_object(key);
if (IS_USMALL(0, sc.replaced)) {
hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6);
@@ -2070,9 +2177,8 @@ int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **ro
static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
- DbTableTree *tb = &tbl->tree;
- return db_select_replace_tree_common(p, &tb->common, &tb->root,
- tid, pattern, ret, tb);
+ return db_select_replace_tree_common(p, tbl, tid, pattern, ret,
+ &tbl->tree, NULL);
}
int db_take_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root,
@@ -2809,20 +2915,32 @@ static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root,
return this;
}
-static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
- DbTreeStack* stack, Eterm key)
+
+static TreeDbTerm *find_next_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp,
+ DbTreeStack* stack, Eterm key,
+ CATreeRootIterator* iter)
{
+ TreeDbTerm* root;
TreeDbTerm *this;
TreeDbTerm *tmp;
Sint c;
+ if (iter) {
+ *rootpp = catree_find_next_from_pb_key_root(key, iter);
+ root = *rootpp ? **rootpp : NULL;
+ }
+ else {
+ *rootpp = &tbl->tree.root;
+ root = tbl->tree.root;
+ }
+
/* spool the stack, we have to "re-search" */
stack->pos = stack->slot = 0;
if (( this = root ) == NULL)
return NULL;
for (;;) {
PUSH_NODE(stack, this);
- if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl))) >= 0) {
+ if (( c = cmp_partly_bound(key,GETKEY(tbl, this->dbterm.tpl))) >= 0) {
if (this->right == NULL) {
do {
tmp = POP_NODE(stack);
@@ -2842,20 +2960,31 @@ static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
}
}
-static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
- DbTreeStack* stack, Eterm key)
+static TreeDbTerm *find_prev_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp,
+ DbTreeStack* stack, Eterm key,
+ CATreeRootIterator* iter)
{
+ TreeDbTerm* root;
TreeDbTerm *this;
TreeDbTerm *tmp;
Sint c;
+ if (iter) {
+ *rootpp = catree_find_prev_from_pb_key_root(key, iter);
+ root = *rootpp ? **rootpp : NULL;
+ }
+ else {
+ *rootpp = &tbl->tree.root;
+ root = tbl->tree.root;
+ }
+
/* spool the stack, we have to "re-search" */
stack->pos = stack->slot = 0;
if (( this = root ) == NULL)
return NULL;
for (;;) {
PUSH_NODE(stack, this);
- if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl))) <= 0) {
+ if (( c = cmp_partly_bound(key,GETKEY(tbl, this->dbterm.tpl))) <= 0) {
if (this->left == NULL) {
do {
tmp = POP_NODE(stack);
@@ -2866,7 +2995,7 @@ static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
return this;
} else
this = this->left;
- } else /*if (c < 0)*/ {
+ } else /*if (c > 0)*/ {
if (this->right == NULL) /* Done */
return this;
else
@@ -3059,126 +3188,147 @@ db_finalize_dbterm_tree(int cret, DbUpdateHandle *handle)
* Traverse the tree with a callback function, used by db_match_xxx
*/
static void traverse_backwards(DbTableCommon *tb,
- TreeDbTerm **root,
DbTreeStack* stack,
Eterm lastkey,
- int (*doit)(DbTableCommon *,
- TreeDbTerm *,
- void *,
- int),
- void *context)
+ traverse_doit_funcT* doit,
+ struct select_common *context,
+ CATreeRootIterator* iter)
{
TreeDbTerm *this, *next;
+ TreeDbTerm** root = context->root;
- if (lastkey == THE_NON_VALUE) {
- stack->pos = stack->slot = 0;
- if (( this = *root ) == NULL) {
- return;
- }
- while (this != NULL) {
- PUSH_NODE(stack, this);
- this = this->right;
- }
- this = TOP_NODE(stack);
- next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 0)))
- return;
- } else {
- next = find_prev(tb, *root, stack, lastkey);
- }
+ do {
+ if (lastkey == THE_NON_VALUE) {
+ stack->pos = stack->slot = 0;
+ if (( this = *root ) == NULL) {
+ goto next_root;
+ }
+ while (this != NULL) {
+ PUSH_NODE(stack, this);
+ this = this->right;
+ }
+ this = TOP_NODE(stack);
+ next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
+ if (!((*doit)(tb, this, context, 0)))
+ return;
+ } else {
+ next = find_prev(tb, *root, stack, lastkey);
+ }
- while ((this = next) != NULL) {
- next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 0)))
- return;
- }
+ while ((this = next) != NULL) {
+ next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
+ if (!((*doit)(tb, this, context, 0)))
+ return;
+ }
+
+next_root:
+ if (!iter)
+ return;
+ root = context->root = catree_find_prev_root(iter);
+ lastkey = THE_NON_VALUE;
+ } while (root);
}
/*
* Traverse the tree with a callback function, used by db_match_xxx
*/
static void traverse_forward(DbTableCommon *tb,
- TreeDbTerm **root,
DbTreeStack* stack,
Eterm lastkey,
- int (*doit)(DbTableCommon *,
- TreeDbTerm *,
- void *,
- int),
- void *context)
+ traverse_doit_funcT* doit,
+ struct select_common *context,
+ CATreeRootIterator* iter)
{
TreeDbTerm *this, *next;
+ TreeDbTerm **root = context->root;
- if (lastkey == THE_NON_VALUE) {
- stack->pos = stack->slot = 0;
- if (( this = *root ) == NULL) {
- return;
- }
- while (this != NULL) {
- PUSH_NODE(stack, this);
- this = this->left;
- }
- this = TOP_NODE(stack);
- next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 1)))
- return;
- } else {
- next = find_next(tb, *root, stack, lastkey);
- }
+ do {
+ if (lastkey == THE_NON_VALUE) {
+ stack->pos = stack->slot = 0;
+ if (( this = *root ) == NULL) {
+ goto next_root;
+ }
+ while (this != NULL) {
+ PUSH_NODE(stack, this);
+ this = this->left;
+ }
+ this = TOP_NODE(stack);
+ next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
+ if (!((*doit)(tb, this, context, 1)))
+ return;
+ } else {
+ next = find_next(tb, *root, stack, lastkey);
+ }
- while ((this = next) != NULL) {
- next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 1)))
- return;
- }
+ while ((this = next) != NULL) {
+ next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
+ if (!((*doit)(tb, this, context, 1)))
+ return;
+ }
+
+next_root:
+ if (!iter)
+ return;
+ root = context->root = catree_find_next_root(iter);
+ lastkey = THE_NON_VALUE;
+ } while(root);
}
/*
* Traverse the tree with an update callback function, used by db_select_replace
*/
static void traverse_update_backwards(DbTableCommon *tb,
- TreeDbTerm **root,
DbTreeStack* stack,
Eterm lastkey,
int (*doit)(DbTableCommon*,
TreeDbTerm**,
- void*,
+ struct select_common*,
int),
- void* context)
+ struct select_common* context,
+ CATreeRootIterator* iter)
{
int res;
TreeDbTerm *this, *next, **this_ptr;
+ TreeDbTerm** root = context->root;
- if (lastkey == THE_NON_VALUE) {
- stack->pos = stack->slot = 0;
- if (( this = *root ) == NULL) {
- return;
+ do {
+ if (lastkey == THE_NON_VALUE) {
+ stack->pos = stack->slot = 0;
+ if (( this = *root ) == NULL) {
+ goto next_root;
+ }
+ while (this != NULL) {
+ PUSH_NODE(stack, this);
+ this = this->right;
+ }
+ this = TOP_NODE(stack);
+ this_ptr = find_ptr(tb, root, stack, this);
+ ASSERT(this_ptr != NULL);
+ res = (*doit)(tb, this_ptr, context, 0);
+ REPLACE_TOP_NODE(stack, *this_ptr);
+ next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
+ if (!res)
+ return;
+ } else {
+ next = find_prev(tb, *root, stack, lastkey);
}
- while (this != NULL) {
- PUSH_NODE(stack, this);
- this = this->right;
+
+ while ((this = next) != NULL) {
+ this_ptr = find_ptr(tb, root, stack, this);
+ ASSERT(this_ptr != NULL);
+ res = (*doit)(tb, this_ptr, context, 0);
+ REPLACE_TOP_NODE(stack, *this_ptr);
+ next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
+ if (!res)
+ return;
}
- this = TOP_NODE(stack);
- this_ptr = find_ptr(tb, root, stack, this);
- ASSERT(this_ptr != NULL);
- res = (*doit)(tb, this_ptr, context, 0);
- REPLACE_TOP_NODE(stack, *this_ptr);
- next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
- if (!res)
- return;
- } else {
- next = find_prev(tb, *root, stack, lastkey);
- }
- while ((this = next) != NULL) {
- this_ptr = find_ptr(tb, root, stack, this);
- ASSERT(this_ptr != NULL);
- res = (*doit)(tb, this_ptr, context, 0);
- REPLACE_TOP_NODE(stack, *this_ptr);
- next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
- if (!res)
+next_root:
+ if (!iter)
return;
- }
+ root = context->root = catree_find_prev_root(iter);
+ lastkey = THE_NON_VALUE;
+ } while (root);
}
static enum ms_key_boundness key_boundness(DbTableCommon *tb,
@@ -3269,7 +3419,8 @@ static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done)
}
}
-static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key) {
+Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key)
+{
int done = 0;
Sint ret = do_cmp_partly_bound(partly_bound_key, bound_key, &done);
#ifdef HARDDEBUG
@@ -3485,7 +3636,8 @@ static int do_partly_bound_can_match_greater(Eterm a, Eterm b,
* Callback functions for the different match functions
*/
-static int doit_select(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
+static int doit_select(DbTableCommon *tb, TreeDbTerm *this,
+ struct select_common* ptr,
int forward)
{
struct select_context *sc = (struct select_context *) ptr;
@@ -3520,7 +3672,8 @@ static int doit_select(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
return 1;
}
-static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
+static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this,
+ struct select_common* ptr,
int forward)
{
struct select_count_context *sc = (struct select_count_context *) ptr;
@@ -3544,7 +3697,8 @@ static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
return 1;
}
-static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
+static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this,
+ struct select_common* ptr,
int forward)
{
struct select_context *sc = (struct select_context *) ptr;
@@ -3582,7 +3736,8 @@ static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
}
-static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
+static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this,
+ struct select_common *ptr,
int forward)
{
struct select_delete_context *sc = (struct select_delete_context *) ptr;
@@ -3601,7 +3756,7 @@ static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, NULL, 0);
if (ret == am_true) {
key = GETKEY(sc->tb, this->dbterm.tpl);
- linkout_tree(sc->tb, sc->root, key, sc->stack);
+ linkout_tree(sc->tb, sc->common.root, key, sc->stack);
sc->erase_lastterm = 1;
++sc->accum;
}
@@ -3611,7 +3766,8 @@ static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
return 1;
}
-static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this, void *ptr,
+static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this,
+ struct select_common* ptr,
int forward)
{
struct select_replace_context *sc = (struct select_replace_context *) ptr;
diff --git a/erts/emulator/beam/erl_db_tree_util.h b/erts/emulator/beam/erl_db_tree_util.h
index fbd9a9124a..c816660c71 100644
--- a/erts/emulator/beam/erl_db_tree_util.h
+++ b/erts/emulator/beam/erl_db_tree_util.h
@@ -93,48 +93,52 @@ int db_erase_object_tree_common(DbTable *tbl, TreeDbTerm **root, Eterm object,
int db_slot_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
Eterm slot_term, Eterm *ret,
DbTableTree *stack_container);
-int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+int db_select_chunk_tree_common(Process *p, DbTable *tb,
Eterm tid, Eterm pattern, Sint chunk_size,
int reverse, Eterm *ret,
- DbTableTree *stack_container);
-int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+ DbTableTree *stack_container,
+ CATreeRootIterator*);
+int db_select_tree_common(Process *p, DbTable *tb,
Eterm tid, Eterm pattern, int reverse, Eterm *ret,
- DbTableTree *stack_container);
+ DbTableTree *stack_container,
+ CATreeRootIterator*);
int db_select_delete_tree_common(Process *p, DbTable *tbl,
- TreeDbTerm **root,
Eterm tid, Eterm pattern,
Eterm *ret,
- DbTreeStack* stack);
+ DbTreeStack* stack,
+ CATreeRootIterator* iter);
int db_select_continue_tree_common(Process *p,
DbTableCommon *tb,
- TreeDbTerm **root,
Eterm continuation,
Eterm *ret,
- DbTableTree *stack_container);
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter);
int db_select_delete_continue_tree_common(Process *p,
DbTable *tbl,
- TreeDbTerm **root,
Eterm continuation,
Eterm *ret,
- DbTreeStack* stack);
-int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+ DbTreeStack* stack,
+ CATreeRootIterator* iter);
+int db_select_count_tree_common(Process *p, DbTable *tb,
Eterm tid, Eterm pattern, Eterm *ret,
- DbTableTree *stack_container);
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter);
int db_select_count_continue_tree_common(Process *p,
- DbTableCommon *tb,
- TreeDbTerm **root,
+ DbTable *tb,
Eterm continuation,
Eterm *ret,
- DbTableTree *stack_container);
-int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter);
+int db_select_replace_tree_common(Process *p, DbTable*,
Eterm tid, Eterm pattern, Eterm *ret,
- DbTableTree *stack_container);
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter);
int db_select_replace_continue_tree_common(Process *p,
- DbTableCommon *tb,
- TreeDbTerm **root,
+ DbTable*,
Eterm continuation,
Eterm *ret,
- DbTableTree *stack_container);
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter);
int db_take_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root,
Eterm key, Eterm *ret,
DbTreeStack *stack /* NULL if no static stack */);
@@ -148,4 +152,6 @@ int db_lookup_dbterm_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root,
DbTableTree *stack_container);
void db_finalize_dbterm_tree_common(int cret, DbUpdateHandle *handle,
DbTableTree *stack_container);
+Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key);
+
#endif /* _DB_TREE_UTIL_H */