aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_db_tree.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_db_tree.c')
-rw-r--r--erts/emulator/beam/erl_db_tree.c1081
1 files changed, 654 insertions, 427 deletions
diff --git a/erts/emulator/beam/erl_db_tree.c b/erts/emulator/beam/erl_db_tree.c
index c6f1a0fc6d..f5fac9dcb6 100644
--- a/erts/emulator/beam/erl_db_tree.c
+++ b/erts/emulator/beam/erl_db_tree.c
@@ -70,8 +70,10 @@
*/
ERTS_INLINE static DbTreeStack* get_static_stack(DbTableTree* tb)
{
- if (tb != NULL && !erts_atomic_xchg_acqb(&tb->is_stack_busy, 1)) {
- return &tb->static_stack;
+ if (tb != NULL) {
+ ASSERT(IS_TREE_TABLE(tb->common.type));
+ if (!erts_atomic_xchg_acqb(&tb->is_stack_busy, 1))
+ return &tb->static_stack;
}
return NULL;
}
@@ -82,9 +84,10 @@ ERTS_INLINE static DbTreeStack* get_static_stack(DbTableTree* tb)
static DbTreeStack* get_any_stack(DbTable* tb, DbTableTree* stack_container)
{
DbTreeStack* stack;
- if (stack_container != NULL &&
- !erts_atomic_xchg_acqb(&stack_container->is_stack_busy, 1)) {
- return &stack_container->static_stack;
+ if (stack_container != NULL) {
+ ASSERT(IS_TREE_TABLE(stack_container->common.type));
+ if (!erts_atomic_xchg_acqb(&stack_container->is_stack_busy, 1))
+ return &stack_container->static_stack;
}
stack = erts_db_alloc(ERTS_ALC_T_DB_STK, tb,
sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
@@ -96,14 +99,16 @@ static DbTreeStack* get_any_stack(DbTable* tb, DbTableTree* stack_container)
static void release_stack(DbTable* tb, DbTableTree* stack_container, DbTreeStack* stack)
{
- if (stack_container != NULL && stack == &stack_container->static_stack) {
- ASSERT(erts_atomic_read_nob(&stack_container->is_stack_busy) == 1);
- erts_atomic_set_relb(&stack_container->is_stack_busy, 0);
- }
- else {
- erts_db_free(ERTS_ALC_T_DB_STK, tb,
- (void *) stack, sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
+ if (stack_container != NULL) {
+ ASSERT(IS_TREE_TABLE(stack_container->common.type));
+ if (stack == &stack_container->static_stack) {
+ ASSERT(erts_atomic_read_nob(&stack_container->is_stack_busy) == 1);
+ erts_atomic_set_relb(&stack_container->is_stack_busy, 0);
+ return;
+ }
}
+ erts_db_free(ERTS_ALC_T_DB_STK, tb,
+ (void *) stack, sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
}
static ERTS_INLINE void reset_stack(DbTreeStack* stack)
@@ -117,6 +122,7 @@ static ERTS_INLINE void reset_stack(DbTreeStack* stack)
static ERTS_INLINE void reset_static_stack(DbTableTree* tb)
{
if (tb != NULL) {
+ ASSERT(IS_TREE_TABLE(tb->common.type));
reset_stack(&tb->static_stack);
}
}
@@ -185,31 +191,37 @@ static void do_dump_tree2(DbTableTree*, int to, void *to_arg, int show,
** Datatypes
*/
+enum ms_key_boundness {
+ /* Order significant, larger means more "boundness" => less iteration */
+ MS_KEY_UNBOUND = 0,
+ MS_KEY_PARTIALLY_BOUND = 1,
+ MS_KEY_BOUND = 2,
+ MS_KEY_IMPOSSIBLE = 3
+};
+
/*
* This structure is filled in by analyze_pattern() for the select
* functions.
*/
struct mp_info {
- int something_can_match; /* The match_spec is not "impossible" */
- int some_limitation; /* There is some limitation on the search
- * area, i. e. least and/or most is set.*/
- int got_partial; /* The limitation has a partially bound
- * key */
+ enum ms_key_boundness key_boundness;
Eterm least; /* The lowest matching key (possibly
* partially bound expression) */
Eterm most; /* The highest matching key (possibly
* partially bound expression) */
-
- TreeDbTerm **save_term; /* If the key is completely bound, this
- * will be the Tree node we're searching
- * for, otherwise it will be useless */
Binary *mp; /* The compiled match program */
};
+struct select_common {
+ TreeDbTerm **root;
+};
+
+
/*
* Used by doit_select(_chunk)
*/
struct select_context {
+ struct select_common common;
Process *p;
Eterm accum;
Binary *mp;
@@ -225,6 +237,7 @@ struct select_context {
* Used by doit_select_count
*/
struct select_count_context {
+ struct select_common common;
Process *p;
Binary *mp;
Eterm end_condition;
@@ -238,9 +251,9 @@ struct select_count_context {
* Used by doit_select_delete
*/
struct select_delete_context {
+ struct select_common common;
Process *p;
DbTableCommon *tb;
- TreeDbTerm **root;
DbTreeStack *stack;
Uint accum;
Binary *mp;
@@ -255,9 +268,9 @@ struct select_delete_context {
* Used by doit_select_replace
*/
struct select_replace_context {
+ struct select_common common;
Process *p;
DbTableCommon *tb;
- TreeDbTerm **root;
Binary *mp;
Eterm end_condition;
Eterm *lastobj;
@@ -282,7 +295,8 @@ int tree_balance_left(TreeDbTerm **this);
int tree_balance_right(TreeDbTerm **this);
static int delsub(TreeDbTerm **this);
static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root, Sint slot,
- DbTable *tb, DbTableTree *stack_container);
+ DbTable *tb, DbTableTree *stack_container,
+ CATreeRootIterator *iter);
static TreeDbTerm *find_node(DbTableCommon *tb, TreeDbTerm *root,
Eterm key, DbTableTree *stack_container);
static TreeDbTerm **find_node2(DbTableCommon *tb, TreeDbTerm **root, Eterm key);
@@ -292,64 +306,62 @@ static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root,
DbTreeStack* stack, Eterm key);
static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root,
DbTreeStack* stack, Eterm key);
-static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
- DbTreeStack* stack, Eterm key);
-static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
- DbTreeStack* stack, Eterm key);
+static TreeDbTerm *find_next_from_pb_key(DbTable*, TreeDbTerm*** rootpp,
+ DbTreeStack* stack, Eterm key,
+ CATreeRootIterator*);
+static TreeDbTerm *find_prev_from_pb_key(DbTable*, TreeDbTerm*** rootpp,
+ DbTreeStack* stack, Eterm key,
+ CATreeRootIterator*);
+typedef int traverse_doit_funcT(DbTableCommon*, TreeDbTerm*,
+ struct select_common*, int forward);
+
static void traverse_backwards(DbTableCommon *tb,
- TreeDbTerm **root,
DbTreeStack*,
Eterm lastkey,
- int (*doit)(DbTableCommon *,
- TreeDbTerm *,
- void *,
- int),
- void *context);
+ traverse_doit_funcT*,
+ struct select_common *context,
+ CATreeRootIterator*);
static void traverse_forward(DbTableCommon *tb,
- TreeDbTerm **root,
DbTreeStack*,
Eterm lastkey,
- int (*doit)(DbTableCommon *tb,
- TreeDbTerm *,
- void *,
- int),
- void *context);
+ traverse_doit_funcT*,
+ struct select_common *context,
+ CATreeRootIterator*);
static void traverse_update_backwards(DbTableCommon *tb,
- TreeDbTerm **root,
DbTreeStack*,
Eterm lastkey,
int (*doit)(DbTableCommon *tb,
TreeDbTerm **, // out
- void *,
+ struct select_common*,
int),
- void *context);
-static int key_given(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
- TreeDbTerm ***ret, Eterm *partly_bound);
-static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key);
+ struct select_common*,
+ CATreeRootIterator*);
+static enum ms_key_boundness key_boundness(DbTableCommon *tb,
+ Eterm pattern, Eterm *keyp);
static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done);
-static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
+static int analyze_pattern(DbTableCommon *tb, Eterm pattern,
extra_match_validator_t extra_validator, /* Optional callback */
struct mp_info *mpi);
static int doit_select(DbTableCommon *tb,
- TreeDbTerm *this,
- void *ptr,
+ TreeDbTerm *this,
+ struct select_common* ptr,
int forward);
static int doit_select_count(DbTableCommon *tb,
TreeDbTerm *this,
- void *ptr,
+ struct select_common*,
int forward);
static int doit_select_chunk(DbTableCommon *tb,
TreeDbTerm *this,
- void *ptr,
+ struct select_common*,
int forward);
static int doit_select_delete(DbTableCommon *tb,
TreeDbTerm *this,
- void *ptr,
+ struct select_common*,
int forward);
static int doit_select_replace(DbTableCommon *tb,
TreeDbTerm **this_ptr,
- void *ptr,
+ struct select_common*,
int forward);
static int partly_bound_can_match_lesser(Eterm partly_bound_1,
@@ -536,7 +548,7 @@ int db_next_tree_common(Process *p, DbTable *tbl,
{
TreeDbTerm *this;
- if (is_atom(key) && key == am_EOT)
+ if (key == am_EOT)
return DB_ERROR_BADKEY;
this = find_next(&tbl->common, root, stack, key);
if (this == NULL) {
@@ -594,7 +606,7 @@ int db_prev_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, Eterm key,
{
TreeDbTerm *this;
- if (is_atom(key) && key == am_EOT)
+ if (key == am_EOT)
return DB_ERROR_BADKEY;
this = find_prev(&tbl->common, root, stack, key);
if (this == NULL) {
@@ -862,7 +874,8 @@ static int db_erase_object_tree(DbTable *tbl, Eterm object, Eterm *ret)
int db_slot_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
Eterm slot_term, Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator *iter)
{
Sint slot;
TreeDbTerm *st;
@@ -892,7 +905,7 @@ int db_slot_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
* are counted from 1 and up.
*/
++slot;
- st = slot_search(p, root, slot, tbl, stack_container);
+ st = slot_search(p, root, slot, tbl, stack_container, iter);
if (st == NULL) {
*ret = am_false;
return DB_ERROR_UNSPEC;
@@ -910,7 +923,7 @@ static int db_slot_tree(Process *p, DbTable *tbl,
Eterm slot_term, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- return db_slot_tree_common(p, tbl, tb->root, slot_term, ret, tb);
+ return db_slot_tree_common(p, tbl, tb->root, slot_term, ret, tb, NULL);
}
@@ -981,10 +994,10 @@ static BIF_RETTYPE bif_trap3(Export *bif,
int db_select_continue_tree_common(Process *p,
DbTableCommon *tb,
- TreeDbTerm **root,
Eterm continuation,
Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
DbTreeStack* stack;
struct select_context sc;
@@ -998,7 +1011,6 @@ int db_select_continue_tree_common(Process *p,
Sint chunk_size;
Sint reverse;
-
#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);
/* Decode continuation. We know it's a tuple but not the arity or
@@ -1032,23 +1044,32 @@ int db_select_continue_tree_common(Process *p,
reverse = unsigned_val(tptr[7]);
sc.got = signed_val(tptr[8]);
- stack = get_any_stack((DbTable*)tb, stack_container);
- if (chunk_size) {
- if (reverse) {
- traverse_backwards(tb, root, stack, lastkey, &doit_select_chunk, &sc);
- } else {
- traverse_forward(tb, root, stack, lastkey, &doit_select_chunk, &sc);
- }
- } else {
- if (reverse) {
- traverse_forward(tb, root, stack, lastkey, &doit_select, &sc);
- } else {
- traverse_backwards(tb, root, stack, lastkey, &doit_select, &sc);
- }
+ if (iter) {
+ iter->next_route_key = lastkey;
+ sc.common.root = catree_find_nextprev_root(iter, !!reverse != !!chunk_size, NULL);
}
- release_stack((DbTable*)tb,stack_container,stack);
+ else
+ sc.common.root = &((DbTableTree*)tb)->root;
+
+ if (sc.common.root) {
+ stack = get_any_stack((DbTable*)tb, stack_container);
+ if (chunk_size) {
+ if (reverse) {
+ traverse_backwards(tb, stack, lastkey, &doit_select_chunk, &sc.common, iter);
+ } else {
+ traverse_forward(tb, stack, lastkey, &doit_select_chunk, &sc.common, iter);
+ }
+ } else {
+ if (reverse) {
+ traverse_forward(tb, stack, lastkey, &doit_select, &sc.common, iter);
+ } else {
+ traverse_backwards(tb, stack, lastkey, &doit_select, &sc.common, iter);
+ }
+ }
+ release_stack((DbTable*)tb,stack_container,stack);
- BUMP_REDS(p, 1000 - sc.max);
+ BUMP_REDS(p, 1000 - sc.max);
+ }
if (sc.max > 0 || (chunk_size && sc.got == chunk_size)) {
if (chunk_size) {
@@ -1142,13 +1163,14 @@ static int db_select_continue_tree(Process *p,
Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- return db_select_continue_tree_common(p, &tb->common, &tb->root,
- continuation, ret, tb);
+ return db_select_continue_tree_common(p, &tb->common,
+ continuation, ret, tb, NULL);
}
-int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+int db_select_tree_common(Process *p, DbTable *tb,
Eterm tid, Eterm pattern, int reverse, Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
/* Strategy: Traverse backwards to build resulting list from tail to head */
DbTreeStack* stack;
@@ -1179,44 +1201,64 @@ int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
sc.p = p;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->keypos;
+ sc.keypos = tb->common.keypos;
sc.got = 0;
sc.chunk_size = 0;
- if ((errcode = analyze_pattern(tb, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
- if (!mpi.something_can_match) {
+ if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
RET_TO_BIF(NIL,DB_ERROR_NONE);
/* can't possibly match anything */
}
sc.mp = mpi.mp;
- if (!mpi.got_partial && mpi.some_limitation &&
- CMP_EQ(mpi.least,mpi.most)) {
- doit_select(tb,*(mpi.save_term),&sc,0 /* direction doesn't matter */);
+ if (mpi.key_boundness == MS_KEY_BOUND) {
+ ASSERT(CMP_EQ(mpi.least, mpi.most));
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &tb->tree.root;
+ this = find_node(&tb->common, *sc.common.root, mpi.least, NULL);
+ if (this)
+ doit_select(&tb->common, this, &sc.common, 0 /* direction doesn't matter */);
RET_TO_BIF(sc.accum,DB_ERROR_NONE);
}
stack = get_any_stack((DbTable*)tb,stack_container);
if (reverse) {
- if (mpi.some_limitation) {
- if ((this = find_prev_from_pb_key(tb, *root, stack, mpi.least)) != NULL) {
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_prev_from_pb_key(tb, &sc.common.root, stack, mpi.least, iter);
+ if (this)
lastkey = GETKEY(tb, this->dbterm.tpl);
- }
sc.end_condition = mpi.most;
}
- traverse_forward(tb, root, stack, lastkey, &doit_select, &sc);
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_first_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
+ traverse_forward(&tb->common, stack, lastkey, &doit_select, &sc.common, iter);
} else {
- if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(tb, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
- traverse_backwards(tb, root, stack, lastkey, &doit_select, &sc);
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
+ traverse_backwards(&tb->common, stack, lastkey, &doit_select, &sc.common, iter);
}
release_stack((DbTable*)tb,stack_container,stack);
#ifdef HARDDEBUG
@@ -1257,17 +1299,16 @@ int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
static int db_select_tree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, int reverse, Eterm *ret)
{
- DbTableTree *tb = &tbl->tree;
- return db_select_tree_common(p, &tb->common, &tb->root, tid,
- pattern, reverse, ret, tb);
+ return db_select_tree_common(p, tbl, tid,
+ pattern, reverse, ret, &tbl->tree, NULL);
}
int db_select_count_continue_tree_common(Process *p,
- DbTableCommon *tb,
- TreeDbTerm **root,
+ DbTable *tb,
Eterm continuation,
Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
DbTreeStack* stack;
struct select_count_context sc;
@@ -1280,7 +1321,6 @@ int db_select_count_continue_tree_common(Process *p,
Eterm *tptr;
Eterm egot;
-
#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);
/* Decode continuation. We know it's a tuple and everything else as
@@ -1305,18 +1345,28 @@ int db_select_count_continue_tree_common(Process *p,
sc.end_condition = NIL;
sc.lastobj = NULL;
sc.max = 1000;
- sc.keypos = tb->keypos;
+ sc.keypos = tb->common.keypos;
if (is_big(tptr[5])) {
sc.got = big_to_uint32(tptr[5]);
} else {
sc.got = unsigned_val(tptr[5]);
}
- stack = get_any_stack((DbTable*)tb, stack_container);
- traverse_backwards(tb, root, stack, lastkey, &doit_select_count, &sc);
- release_stack((DbTable*)tb,stack_container,stack);
+ if (iter) {
+ iter->next_route_key = lastkey;
+ sc.common.root = catree_find_prev_root(iter, NULL);
+ }
+ else {
+ sc.common.root = &tb->tree.root;
+ }
- BUMP_REDS(p, 1000 - sc.max);
+ if (sc.common.root) {
+ stack = get_any_stack(tb, stack_container);
+ traverse_backwards(&tb->common, stack, lastkey, &doit_select_count, &sc.common, iter);
+ release_stack(tb,stack_container,stack);
+
+ BUMP_REDS(p, 1000 - sc.max);
+ }
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.got,p), DB_ERROR_NONE);
@@ -1361,14 +1411,15 @@ static int db_select_count_continue_tree(Process *p,
Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- return db_select_count_continue_tree_common(p, &tb->common, &tb->root,
- continuation, ret, tb);
+ return db_select_count_continue_tree_common(p, tbl,
+ continuation, ret, tb, NULL);
}
-int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+int db_select_count_tree_common(Process *p, DbTable *tb,
Eterm tid, Eterm pattern, Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
DbTreeStack* stack;
struct select_count_context sc;
@@ -1383,7 +1434,6 @@ int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root
Eterm egot;
Eterm mpb;
-
#define RET_TO_BIF(Term,RetVal) do { \
if (mpi.mp != NULL) { \
erts_bin_free(mpi.mp); \
@@ -1398,35 +1448,48 @@ int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root
sc.p = p;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->keypos;
+ sc.keypos = tb->common.keypos;
sc.got = 0;
- if ((errcode = analyze_pattern(tb, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
- if (!mpi.something_can_match) {
+ if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
RET_TO_BIF(make_small(0),DB_ERROR_NONE);
/* can't possibly match anything */
}
sc.mp = mpi.mp;
- if (!mpi.got_partial && mpi.some_limitation &&
- CMP_EQ(mpi.least,mpi.most)) {
- doit_select_count(tb,*(mpi.save_term),&sc,0 /* dummy */);
+ if (mpi.key_boundness == MS_KEY_BOUND) {
+ ASSERT(CMP_EQ(mpi.least, mpi.most));
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &((DbTable*)tb)->tree.root;
+ this = find_node(&tb->common, *sc.common.root, mpi.least, NULL);
+ if (this)
+ doit_select_count(&tb->common, this, &sc.common, 0 /* dummy */);
RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE);
}
stack = get_any_stack((DbTable*)tb, stack_container);
- if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(tb, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
- traverse_backwards(tb, root, stack, lastkey, &doit_select_count, &sc);
+ traverse_backwards(&tb->common, stack, lastkey, &doit_select_count, &sc.common, iter);
release_stack((DbTable*)tb,stack_container,stack);
BUMP_REDS(p, 1000 - sc.max);
if (sc.max > 0) {
@@ -1467,15 +1530,16 @@ static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- return db_select_count_tree_common(p, &tb->common, &tb->root,
- tid, pattern, ret, tb);
+ return db_select_count_tree_common(p, tbl,
+ tid, pattern, ret, tb, NULL);
}
-int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+int db_select_chunk_tree_common(Process *p, DbTable *tb,
Eterm tid, Eterm pattern, Sint chunk_size,
int reverse, Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
DbTreeStack* stack;
struct select_context sc;
@@ -1489,7 +1553,6 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root
int errcode;
Eterm mpb;
-
#define RET_TO_BIF(Term,RetVal) do { \
if (mpi.mp != NULL) { \
erts_bin_free(mpi.mp); \
@@ -1505,24 +1568,30 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root
sc.p = p;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->keypos;
+ sc.keypos = tb->common.keypos;
sc.got = 0;
sc.chunk_size = chunk_size;
- if ((errcode = analyze_pattern(tb, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
- if (!mpi.something_can_match) {
+ if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
RET_TO_BIF(am_EOT,DB_ERROR_NONE);
/* can't possibly match anything */
}
sc.mp = mpi.mp;
- if (!mpi.got_partial && mpi.some_limitation &&
- CMP_EQ(mpi.least,mpi.most)) {
- doit_select(tb,*(mpi.save_term),&sc, 0 /* direction doesn't matter */);
+ if (mpi.key_boundness == MS_KEY_BOUND) {
+ ASSERT(CMP_EQ(mpi.least, mpi.most));
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &tb->tree.root;
+ this = find_node(&tb->common, *sc.common.root, mpi.least, NULL);
+ if (this)
+ doit_select(&tb->common, this, &sc.common, 0 /* direction doesn't matter */);
if (sc.accum != NIL) {
hp=HAlloc(p, 3);
RET_TO_BIF(TUPLE2(hp,sc.accum,am_EOT),DB_ERROR_NONE);
@@ -1533,21 +1602,35 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root
stack = get_any_stack((DbTable*)tb,stack_container);
if (reverse) {
- if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(tb, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
- traverse_backwards(tb, root, stack, lastkey, &doit_select_chunk, &sc);
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
+ traverse_backwards(&tb->common, stack, lastkey, &doit_select_chunk, &sc.common, iter);
} else {
- if (mpi.some_limitation) {
- if ((this = find_prev_from_pb_key(tb, *root, stack, mpi.least)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_prev_from_pb_key(tb, &sc.common.root, stack, mpi.least, iter);
+ if (this)
+ lastkey = GETKEY(tb, this->dbterm.tpl);
sc.end_condition = mpi.most;
}
- traverse_forward(tb, root, stack, lastkey, &doit_select_chunk, &sc);
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_first_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
+ traverse_forward(&tb->common, stack, lastkey, &doit_select_chunk, &sc.common, iter);
}
release_stack((DbTable*)tb,stack_container,stack);
@@ -1624,18 +1707,18 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- return db_select_chunk_tree_common(p, &tb->common, &tb->root,
+ return db_select_chunk_tree_common(p, tbl,
tid, pattern, chunk_size,
- reverse, ret, tb);
+ reverse, ret, tb, NULL);
}
int db_select_delete_continue_tree_common(Process *p,
DbTable *tbl,
- TreeDbTerm **root,
Eterm continuation,
Eterm *ret,
- DbTreeStack* stack)
+ DbTreeStack* stack,
+ CATreeRootIterator* iter)
{
struct select_delete_context sc;
unsigned sz;
@@ -1647,7 +1730,6 @@ int db_select_delete_continue_tree_common(Process *p,
Eterm *tptr;
Eterm eaccsum;
-
#define RET_TO_BIF(Term, State) do { \
if (sc.erase_lastterm) { \
free_term(tbl, sc.lastterm); \
@@ -1670,7 +1752,6 @@ int db_select_delete_continue_tree_common(Process *p,
mp = erts_db_get_match_prog_binary_unchecked(tptr[4]);
sc.p = p;
sc.tb = &tbl->common;
- sc.root = root;
sc.stack = stack;
if (is_big(tptr[5])) {
sc.accum = big_to_uint32(tptr[5]);
@@ -1682,9 +1763,19 @@ int db_select_delete_continue_tree_common(Process *p,
sc.max = 1000;
sc.keypos = tbl->common.keypos;
- traverse_backwards(&tbl->common, root, stack, lastkey, &doit_select_delete, &sc);
+ if (iter) {
+ iter->next_route_key = lastkey;
+ sc.common.root = catree_find_prev_root(iter, NULL);
+ }
+ else {
+ sc.common.root = &tbl->tree.root;
+ }
- BUMP_REDS(p, 1000 - sc.max);
+ if (sc.common.root) {
+ traverse_backwards(&tbl->common, stack, lastkey, &doit_select_delete, &sc.common, iter);
+
+ BUMP_REDS(p, 1000 - sc.max);
+ }
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.accum, p), DB_ERROR_NONE);
@@ -1726,16 +1817,15 @@ static int db_select_delete_continue_tree(Process *p,
{
DbTableTree *tb = &tbl->tree;
ASSERT(!erts_atomic_read_nob(&tb->is_stack_busy));
- return db_select_delete_continue_tree_common(p, tbl, &tb->root,
- continuation, ret,
- &tb->static_stack);
+ return db_select_delete_continue_tree_common(p, tbl, continuation, ret,
+ &tb->static_stack, NULL);
}
int db_select_delete_tree_common(Process *p, DbTable *tbl,
- TreeDbTerm **root,
Eterm tid, Eterm pattern,
Eterm *ret,
- DbTreeStack* stack)
+ DbTreeStack* stack,
+ CATreeRootIterator* iter)
{
struct select_delete_context sc;
struct mp_info mpi;
@@ -1770,35 +1860,48 @@ int db_select_delete_tree_common(Process *p, DbTable *tbl,
sc.end_condition = NIL;
sc.keypos = tbl->common.keypos;
sc.tb = &tbl->common;
- sc.root = root;
sc.stack = stack;
- if ((errcode = analyze_pattern(&tbl->common, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tbl->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(0,errcode);
}
- if (!mpi.something_can_match) {
+ if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
RET_TO_BIF(make_small(0),DB_ERROR_NONE);
/* can't possibly match anything */
}
sc.mp = mpi.mp;
- if (!mpi.got_partial && mpi.some_limitation &&
- CMP_EQ(mpi.least,mpi.most)) {
- doit_select_delete(&tbl->common,*(mpi.save_term),&sc, 0 /* direction doesn't
+ if (mpi.key_boundness == MS_KEY_BOUND) {
+ ASSERT(CMP_EQ(mpi.least, mpi.most));
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &tbl->tree.root;
+ this = find_node(&tbl->common, *sc.common.root, mpi.least, NULL);
+ if (this)
+ doit_select_delete(&tbl->common, this, &sc.common, 0 /* direction doesn't
matter */);
RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE);
}
- if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(&tbl->common, *root, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(&tbl->common, this->dbterm.tpl);
- }
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_next_from_pb_key(tbl, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(&tbl->common, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tbl->tree.root;
+ }
- traverse_backwards(&tbl->common, root, stack, lastkey, &doit_select_delete, &sc);
+ traverse_backwards(&tbl->common, stack, lastkey,
+ &doit_select_delete, &sc.common, iter);
BUMP_REDS(p, 1000 - sc.max);
if (sc.max > 0) {
@@ -1842,16 +1945,16 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- return db_select_delete_tree_common(p, tbl, &tb->root, tid, pattern, ret,
- &tb->static_stack);
+ return db_select_delete_tree_common(p, tbl, tid, pattern, ret,
+ &tb->static_stack, NULL);
}
int db_select_replace_continue_tree_common(Process *p,
- DbTableCommon *tb,
- TreeDbTerm **root,
+ DbTable *tbl,
Eterm continuation,
Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
DbTreeStack* stack;
struct select_replace_context sc;
@@ -1888,7 +1991,7 @@ int db_select_replace_continue_tree_common(Process *p,
sc.end_condition = NIL;
sc.lastobj = NULL;
sc.max = 1000;
- sc.keypos = tb->keypos;
+ sc.keypos = tbl->common.keypos;
if (is_big(tptr[5])) {
sc.replaced = big_to_uint32(tptr[5]);
} else {
@@ -1896,9 +1999,18 @@ int db_select_replace_continue_tree_common(Process *p,
}
prev_replaced = sc.replaced;
- stack = get_any_stack((DbTable*)tb,stack_container);
- traverse_update_backwards(tb, root, stack, lastkey, &doit_select_replace, &sc);
- release_stack((DbTable*)tb,stack_container,stack);
+ if (iter) {
+ iter->next_route_key = lastkey;
+ sc.common.root = catree_find_prev_root(iter, NULL);
+ }
+ else {
+ sc.common.root = &tbl->tree.root;
+ }
+
+ stack = get_any_stack(tbl, stack_container);
+ traverse_update_backwards(&tbl->common, stack, lastkey, &doit_select_replace,
+ &sc.common, iter);
+ release_stack(tbl, stack_container,stack);
// the more objects we've replaced, the more reductions we've consumed
BUMP_REDS(p, MIN(2000, (1000 - sc.max) + (sc.replaced - prev_replaced)));
@@ -1906,7 +2018,7 @@ int db_select_replace_continue_tree_common(Process *p,
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.replaced,p), DB_ERROR_NONE);
}
- key = GETKEY(tb, sc.lastobj);
+ key = GETKEY(tbl, sc.lastobj);
if (end_condition != NIL &&
(cmp_partly_bound(end_condition,key) > 0)) {
/* done anyway */
@@ -1942,14 +2054,14 @@ static int db_select_replace_continue_tree(Process *p,
Eterm continuation,
Eterm *ret)
{
- DbTableTree *tb = &tbl->tree;
- return db_select_replace_continue_tree_common(p, &tb->common, &tb->root,
- continuation, ret, tb);
+ return db_select_replace_continue_tree_common(p, tbl, continuation, ret,
+ &tbl->tree, NULL);
}
-int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+int db_select_replace_tree_common(Process *p, DbTable *tbl,
Eterm tid, Eterm pattern, Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
DbTreeStack* stack;
struct select_replace_context sc;
@@ -1977,49 +2089,64 @@ int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **ro
sc.lastobj = NULL;
sc.p = p;
- sc.tb = tb;
- sc.root = root;
+ sc.tb = &tbl->common;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->keypos;
+ sc.keypos = tbl->common.keypos;
sc.replaced = 0;
- if ((errcode = analyze_pattern(tb, root, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tbl->common, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
- if (!mpi.something_can_match) {
+ if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
RET_TO_BIF(make_small(0),DB_ERROR_NONE);
/* can't possibly match anything */
}
sc.mp = mpi.mp;
- if (!mpi.got_partial && mpi.some_limitation &&
- CMP_EQ(mpi.least,mpi.most)) {
- doit_select_replace(tb,mpi.save_term,&sc,0 /* dummy */);
- reset_static_stack(stack_container); /* may refer replaced term */
+ if (mpi.key_boundness == MS_KEY_BOUND) {
+ TreeDbTerm** pp;
+ ASSERT(CMP_EQ(mpi.least, mpi.most));
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &tbl->tree.root;
+ pp = find_node2(&tbl->common, sc.common.root, mpi.least);
+ if (pp) {
+ doit_select_replace(&tbl->common, pp, &sc.common, 0 /* dummy */);
+ reset_static_stack(stack_container); /* may refer replaced term */
+ }
RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE);
}
- stack = get_any_stack((DbTable*)tb,stack_container);
+ stack = get_any_stack(tbl,stack_container);
- if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_next_from_pb_key(tbl, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(tbl, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tbl->tree.root;
+ }
- traverse_update_backwards(tb, root, stack, lastkey, &doit_select_replace, &sc);
- release_stack((DbTable*)tb,stack_container,stack);
+ traverse_update_backwards(&tbl->common, stack, lastkey, &doit_select_replace,
+ &sc.common, iter);
+ release_stack(tbl,stack_container,stack);
// the more objects we've replaced, the more reductions we've consumed
BUMP_REDS(p, MIN(2000, (1000 - sc.max) + sc.replaced));
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE);
}
- key = GETKEY(tb, sc.lastobj);
+ key = GETKEY(tbl, sc.lastobj);
sz = size_object(key);
if (IS_USMALL(0, sc.replaced)) {
hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6);
@@ -2052,9 +2179,8 @@ int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **ro
static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
- DbTableTree *tb = &tbl->tree;
- return db_select_replace_tree_common(p, &tb->common, &tb->root,
- tid, pattern, ret, tb);
+ return db_select_replace_tree_common(p, tbl, tid, pattern, ret,
+ &tbl->tree, NULL);
}
int db_take_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root,
@@ -2328,7 +2454,7 @@ static TreeDbTerm *linkout_object_tree(DbTableCommon *tb, TreeDbTerm **root,
** For the select functions, analyzes the pattern and determines which
** part of the tree should be searched. Also compiles the match program
*/
-static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
+static int analyze_pattern(DbTableCommon *tb, Eterm pattern,
extra_match_validator_t extra_validator, /* Optional callback */
struct mp_info *mpi)
{
@@ -2339,17 +2465,12 @@ static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
Eterm *ptpl;
int i;
int num_heads = 0;
- Eterm key;
- Eterm partly_bound;
- int res;
- Eterm least = 0;
- Eterm most = 0;
+ Eterm least = THE_NON_VALUE;
+ Eterm most = THE_NON_VALUE;
+ enum ms_key_boundness boundness;
- mpi->some_limitation = 1;
- mpi->got_partial = 0;
- mpi->something_can_match = 0;
+ mpi->key_boundness = MS_KEY_IMPOSSIBLE;
mpi->mp = NULL;
- mpi->save_term = NULL;
for (lst = pattern; is_list(lst); lst = CDR(list_val(lst)))
++num_heads;
@@ -2370,6 +2491,7 @@ static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
Eterm match;
Eterm guard;
Eterm body;
+ Eterm key;
ttpl = CAR(list_val(lst));
if (!is_tuple(ttpl)) {
@@ -2401,30 +2523,29 @@ static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
}
++i;
- partly_bound = NIL;
- res = key_given(tb, root, tpl, &(mpi->save_term), &partly_bound);
- if ( res >= 0 ) { /* Can match something */
- key = 0;
- mpi->something_can_match = 1;
- if (res > 0) {
- key = GETKEY(tb,tuple_val(tpl));
- } else if (partly_bound != NIL) {
- mpi->got_partial = 1;
- key = partly_bound;
- } else {
- mpi->some_limitation = 0;
- }
- if (key != 0) {
- if (least == 0 ||
- partly_bound_can_match_lesser(key,least)) {
- least = key;
- }
- if (most == 0 ||
- partly_bound_can_match_greater(key,most)) {
- most = key;
- }
- }
- }
+ boundness = key_boundness(tb, tpl, &key);
+ switch (boundness)
+ {
+ case MS_KEY_BOUND:
+ case MS_KEY_PARTIALLY_BOUND:
+ if (is_non_value(least) || partly_bound_can_match_lesser(key,least)) {
+ least = key;
+ }
+ if (is_non_value(most) || partly_bound_can_match_greater(key,most)) {
+ most = key;
+ }
+ break;
+ case MS_KEY_IMPOSSIBLE:
+ case MS_KEY_UNBOUND:
+ break;
+ }
+ if (mpi->key_boundness > boundness)
+ mpi->key_boundness = boundness;
+ }
+
+ if (mpi->key_boundness == MS_KEY_BOUND && !CMP_EQ(least, most)) {
+ /* Several different bound keys */
+ mpi->key_boundness = MS_KEY_PARTIALLY_BOUND;
}
mpi->least = least;
mpi->most = most;
@@ -2608,11 +2729,27 @@ static int delsub(TreeDbTerm **this)
static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root,
Sint slot, DbTable *tb,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator *iter)
{
TreeDbTerm *this;
TreeDbTerm *tmp;
- DbTreeStack* stack = get_any_stack(tb,stack_container);
+ TreeDbTerm *lastobj;
+ Eterm lastkey;
+ TreeDbTerm **pp;
+ DbTreeStack* stack;
+
+ if (iter) {
+ /* Find first non-empty tree */
+ while (!root) {
+ TreeDbTerm** pp = catree_find_next_root(iter, NULL);
+ if (!pp)
+ return NULL;
+ root = *pp;
+ }
+ }
+
+ stack = get_any_stack(tb,stack_container);
ASSERT(stack != NULL);
if (slot == 1) { /* Don't search from where we are if we are
@@ -2624,56 +2761,83 @@ static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root,
are not recorded */
stack->pos = 0;
}
- if (EMPTY_NODE(stack)) {
- this = root;
- if (this == NULL)
- goto done;
- while (this->left != NULL){
- PUSH_NODE(stack, this);
- this = this->left;
- }
- PUSH_NODE(stack, this);
- stack->slot = 1;
- }
- this = TOP_NODE(stack);
- while (stack->slot != slot && this != NULL) {
- if (slot > stack->slot) {
- if (this->right != NULL) {
- this = this->right;
- while (this->left != NULL) {
- PUSH_NODE(stack, this);
- this = this->left;
- }
- PUSH_NODE(stack, this);
- } else {
- for (;;) {
- tmp = POP_NODE(stack);
- this = TOP_NODE(stack);
- if (this == NULL || this->left == tmp)
- break;
- }
- }
- ++(stack->slot);
- } else {
- if (this->left != NULL) {
- this = this->left;
- while (this->right != NULL) {
- PUSH_NODE(stack, this);
- this = this->right;
- }
- PUSH_NODE(stack, this);
- } else {
- for (;;) {
- tmp = POP_NODE(stack);
- this = TOP_NODE(stack);
- if (this == NULL || this->right == tmp)
- break;
- }
- }
- --(stack->slot);
- }
+ while (1) {
+ if (EMPTY_NODE(stack)) {
+ this = root;
+ if (this == NULL)
+ goto next_root;
+ while (this->left != NULL){
+ PUSH_NODE(stack, this);
+ this = this->left;
+ }
+ PUSH_NODE(stack, this);
+ stack->slot++;
+ }
+ this = TOP_NODE(stack);
+ while (stack->slot != slot) {
+ ASSERT(this);
+ lastobj = this;
+ if (slot > stack->slot) {
+ if (this->right != NULL) {
+ this = this->right;
+ while (this->left != NULL) {
+ PUSH_NODE(stack, this);
+ this = this->left;
+ }
+ PUSH_NODE(stack, this);
+ } else {
+ for (;;) {
+ tmp = POP_NODE(stack);
+ this = TOP_NODE(stack);
+ if (!this)
+ goto next_root;
+ if (this->left == tmp)
+ break;
+ }
+ }
+ ++(stack->slot);
+ } else {
+ if (this->left != NULL) {
+ this = this->left;
+ while (this->right != NULL) {
+ PUSH_NODE(stack, this);
+ this = this->right;
+ }
+ PUSH_NODE(stack, this);
+ } else {
+ for (;;) {
+ tmp = POP_NODE(stack);
+ this = TOP_NODE(stack);
+ if (!this)
+ goto next_root;
+ if (this->right == tmp)
+ break;
+ }
+ }
+ --(stack->slot);
+ }
+ }
+ /* Found slot */
+ ASSERT(this);
+ break;
+
+next_root:
+ if (!iter)
+ break; /* EOT */
+
+ ASSERT(slot > stack->slot);
+ if (lastobj) {
+ lastkey = GETKEY(tb, lastobj->dbterm.tpl);
+ lastobj = NULL;
+ }
+ pp = catree_find_next_root(iter, &lastkey);
+ if (!pp)
+ break; /* EOT */
+ root = *pp;
+ stack->pos = 0;
+ find_next(&tb->common, root, stack, lastkey);
}
-done:
+
release_stack(tb,stack_container,stack);
return this;
}
@@ -2708,7 +2872,7 @@ static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root,
this = this->right;
} else if (c < 0) {
if (this->left == NULL) /* Done */
- return this;
+ goto found_next;
else
this = this->left;
} else
@@ -2723,8 +2887,6 @@ static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root,
this = this->left;
PUSH_NODE(stack, this);
}
- if (stack->slot > 0)
- ++(stack->slot);
} else {
do {
tmp = POP_NODE(stack);
@@ -2733,9 +2895,12 @@ static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root,
return NULL;
}
} while (this->right == tmp);
- if (stack->slot > 0)
- ++(stack->slot);
}
+
+found_next:
+ if (stack->slot > 0)
+ ++(stack->slot);
+
return this;
}
@@ -2765,7 +2930,7 @@ static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root,
this = this->left;
} else if (c > 0) {
if (this->right == NULL) /* Done */
- return this;
+ goto found_prev;
else
this = this->right;
} else
@@ -2780,8 +2945,6 @@ static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root,
this = this->right;
PUSH_NODE(stack, this);
}
- if (stack->slot > 0)
- --(stack->slot);
} else {
do {
tmp = POP_NODE(stack);
@@ -2790,74 +2953,92 @@ static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root,
return NULL;
}
} while (this->left == tmp);
- if (stack->slot > 0)
- --(stack->slot);
}
+
+found_prev:
+ if (stack->slot > 0)
+ --(stack->slot);
+
return this;
}
-static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
- DbTreeStack* stack, Eterm key)
+
+static TreeDbTerm *find_next_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp,
+ DbTreeStack* stack, Eterm key,
+ CATreeRootIterator* iter)
{
+ TreeDbTerm* root;
TreeDbTerm *this;
- TreeDbTerm *tmp;
+ Uint candidate = 0;
Sint c;
+ if (iter) {
+ *rootpp = catree_find_next_from_pb_key_root(key, iter);
+ ASSERT(*rootpp);
+ root = **rootpp;
+ }
+ else {
+ *rootpp = &tbl->tree.root;
+ root = tbl->tree.root;
+ }
+
/* spool the stack, we have to "re-search" */
stack->pos = stack->slot = 0;
if (( this = root ) == NULL)
return NULL;
for (;;) {
PUSH_NODE(stack, this);
- if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl))) >= 0) {
+ if (( c = cmp_partly_bound(key,GETKEY(tbl, this->dbterm.tpl))) >= 0) {
if (this->right == NULL) {
- do {
- tmp = POP_NODE(stack);
- if (( this = TOP_NODE(stack)) == NULL) {
- return NULL;
- }
- } while (this->right == tmp);
- return this;
- } else
- this = this->right;
+ stack->pos = candidate;
+ return TOP_NODE(stack);
+ }
+ this = this->right;
} else /*if (c < 0)*/ {
if (this->left == NULL) /* Done */
return this;
- else
- this = this->left;
+ candidate = stack->pos;
+ this = this->left;
}
}
}
-static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
- DbTreeStack* stack, Eterm key)
+static TreeDbTerm *find_prev_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp,
+ DbTreeStack* stack, Eterm key,
+ CATreeRootIterator* iter)
{
+ TreeDbTerm* root;
TreeDbTerm *this;
- TreeDbTerm *tmp;
+ Uint candidate = 0;
Sint c;
+ if (iter) {
+ *rootpp = catree_find_prev_from_pb_key_root(key, iter);
+ ASSERT(*rootpp);
+ root = **rootpp;
+ }
+ else {
+ *rootpp = &tbl->tree.root;
+ root = tbl->tree.root;
+ }
+
/* spool the stack, we have to "re-search" */
stack->pos = stack->slot = 0;
if (( this = root ) == NULL)
return NULL;
for (;;) {
PUSH_NODE(stack, this);
- if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl))) <= 0) {
+ if (( c = cmp_partly_bound(key,GETKEY(tbl, this->dbterm.tpl))) <= 0) {
if (this->left == NULL) {
- do {
- tmp = POP_NODE(stack);
- if (( this = TOP_NODE(stack)) == NULL) {
- return NULL;
- }
- } while (this->left == tmp);
- return this;
- } else
- this = this->left;
- } else /*if (c < 0)*/ {
+ stack->pos = candidate;
+ return TOP_NODE(stack);
+ }
+ this = this->left;
+ } else /*if (c > 0)*/ {
if (this->right == NULL) /* Done */
return this;
- else
- this = this->right;
+ candidate = stack->pos;
+ this = this->right;
}
}
}
@@ -3046,38 +3227,53 @@ db_finalize_dbterm_tree(int cret, DbUpdateHandle *handle)
* Traverse the tree with a callback function, used by db_match_xxx
*/
static void traverse_backwards(DbTableCommon *tb,
- TreeDbTerm **root,
DbTreeStack* stack,
Eterm lastkey,
- int (*doit)(DbTableCommon *,
- TreeDbTerm *,
- void *,
- int),
- void *context)
+ traverse_doit_funcT* doit,
+ struct select_common *context,
+ CATreeRootIterator* iter)
{
TreeDbTerm *this, *next;
+ TreeDbTerm** root = context->root;
if (lastkey == THE_NON_VALUE) {
- stack->pos = stack->slot = 0;
- if (( this = *root ) == NULL) {
- return;
- }
- while (this != NULL) {
- PUSH_NODE(stack, this);
- this = this->right;
- }
- this = TOP_NODE(stack);
- next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 0)))
- return;
+ if (iter) {
+ while (*root == NULL) {
+ root = catree_find_prev_root(iter, NULL);
+ if (!root)
+ return;
+ }
+ context->root = root;
+ }
+ stack->pos = stack->slot = 0;
+ next = *root;
+ while (next != NULL) {
+ PUSH_NODE(stack, next);
+ next = next->right;
+ }
+ next = TOP_NODE(stack);
} else {
- next = find_prev(tb, *root, stack, lastkey);
+ next = find_prev(tb, *root, stack, lastkey);
}
- while ((this = next) != NULL) {
- next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 0)))
- return;
+ while (1) {
+ while (next) {
+ this = next;
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ next = find_prev(tb, *root, stack, lastkey);
+ if (!((*doit)(tb, this, context, 0)))
+ return;
+ }
+
+ if (!iter)
+ return;
+ ASSERT(is_value(lastkey));
+ root = catree_find_prev_root(iter, &lastkey);
+ if (!root)
+ return;
+ context->root = root;
+ stack->pos = stack->slot = 0;
+ next = find_prev(tb, *root, stack, lastkey);
}
}
@@ -3085,38 +3281,53 @@ static void traverse_backwards(DbTableCommon *tb,
* Traverse the tree with a callback function, used by db_match_xxx
*/
static void traverse_forward(DbTableCommon *tb,
- TreeDbTerm **root,
DbTreeStack* stack,
Eterm lastkey,
- int (*doit)(DbTableCommon *,
- TreeDbTerm *,
- void *,
- int),
- void *context)
+ traverse_doit_funcT* doit,
+ struct select_common *context,
+ CATreeRootIterator* iter)
{
TreeDbTerm *this, *next;
+ TreeDbTerm **root = context->root;
if (lastkey == THE_NON_VALUE) {
- stack->pos = stack->slot = 0;
- if (( this = *root ) == NULL) {
- return;
- }
- while (this != NULL) {
- PUSH_NODE(stack, this);
- this = this->left;
- }
- this = TOP_NODE(stack);
- next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 1)))
- return;
+ if (iter) {
+ while (*root == NULL) {
+ root = catree_find_next_root(iter, NULL);
+ if (!root)
+ return;
+ }
+ context->root = root;
+ }
+ stack->pos = stack->slot = 0;
+ next = *root;
+ while (next != NULL) {
+ PUSH_NODE(stack, next);
+ next = next->left;
+ }
+ next = TOP_NODE(stack);
} else {
- next = find_next(tb, *root, stack, lastkey);
+ next = find_next(tb, *root, stack, lastkey);
}
- while ((this = next) != NULL) {
- next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 1)))
- return;
+ while (1) {
+ while (next) {
+ this = next;
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ next = find_next(tb, *root, stack, lastkey);
+ if (!((*doit)(tb, this, context, 1)))
+ return;
+ }
+
+ if (!iter)
+ return;
+ ASSERT(is_value(lastkey));
+ root = catree_find_next_root(iter, &lastkey);
+ if (!root)
+ return;
+ context->root = root;
+ stack->pos = stack->slot = 0;
+ next = find_next(tb, *root, stack, lastkey);
}
}
@@ -3124,77 +3335,87 @@ static void traverse_forward(DbTableCommon *tb,
* Traverse the tree with an update callback function, used by db_select_replace
*/
static void traverse_update_backwards(DbTableCommon *tb,
- TreeDbTerm **root,
DbTreeStack* stack,
Eterm lastkey,
int (*doit)(DbTableCommon*,
TreeDbTerm**,
- void*,
+ struct select_common*,
int),
- void* context)
+ struct select_common* context,
+ CATreeRootIterator* iter)
{
int res;
TreeDbTerm *this, *next, **this_ptr;
+ TreeDbTerm** root = context->root;
if (lastkey == THE_NON_VALUE) {
- stack->pos = stack->slot = 0;
- if (( this = *root ) == NULL) {
- return;
+ if (iter) {
+ while (*root == NULL) {
+ root = catree_find_prev_root(iter, NULL);
+ if (!root)
+ return;
+ context->root = root;
+ }
}
- while (this != NULL) {
- PUSH_NODE(stack, this);
- this = this->right;
+ stack->pos = stack->slot = 0;
+ next = *root;
+ while (next) {
+ PUSH_NODE(stack, next);
+ next = next->right;
}
- this = TOP_NODE(stack);
- this_ptr = find_ptr(tb, root, stack, this);
- ASSERT(this_ptr != NULL);
- res = (*doit)(tb, this_ptr, context, 0);
- REPLACE_TOP_NODE(stack, *this_ptr);
- next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
- if (!res)
- return;
- } else {
- next = find_prev(tb, *root, stack, lastkey);
+ next = TOP_NODE(stack);
}
+ else
+ next = find_prev(tb, *root, stack, lastkey);
+
- while ((this = next) != NULL) {
- this_ptr = find_ptr(tb, root, stack, this);
- ASSERT(this_ptr != NULL);
- res = (*doit)(tb, this_ptr, context, 0);
- REPLACE_TOP_NODE(stack, *this_ptr);
- next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
- if (!res)
+ while (1) {
+ while (next) {
+ this = next;
+ this_ptr = find_ptr(tb, root, stack, this);
+ ASSERT(this_ptr != NULL);
+ res = (*doit)(tb, this_ptr, context, 0);
+ this = *this_ptr;
+ REPLACE_TOP_NODE(stack, this);
+ if (!res)
+ return;
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ next = find_prev(tb, *root, stack, lastkey);
+ }
+
+ if (!iter)
+ return;
+ ASSERT(is_value(lastkey));
+ root = catree_find_prev_root(iter, &lastkey);
+ if (!root)
return;
+ context->root = root;
+ stack->pos = stack->slot = 0;
+ next = find_prev(tb, *root, stack, lastkey);
}
}
-/*
- * Returns 0 if not given 1 if given and -1 on no possible match
- * if key is given; *ret is set to point to the object concerned.
- */
-static int key_given(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
- TreeDbTerm ***ret, Eterm *partly_bound)
+static enum ms_key_boundness key_boundness(DbTableCommon *tb,
+ Eterm pattern, Eterm *keyp)
{
- TreeDbTerm **this;
Eterm key;
- ASSERT(ret != NULL);
if (pattern == am_Underscore || db_is_variable(pattern) != -1)
- return 0;
+ return MS_KEY_UNBOUND;
key = db_getkey(tb->keypos, pattern);
if (is_non_value(key))
- return -1; /* can't possibly match anything */
+ return MS_KEY_IMPOSSIBLE; /* can't possibly match anything */
if (!db_has_variable(key)) { /* Bound key */
- if (( this = find_node2(tb, root, key) ) == NULL) {
- return -1;
- }
- *ret = this;
- return 1;
- } else if (partly_bound != NULL && key != am_Underscore &&
- db_is_variable(key) < 0 && !db_has_map(key))
- *partly_bound = key;
+ *keyp = key;
+ return MS_KEY_BOUND;
+ } else if (key != am_Underscore &&
+ db_is_variable(key) < 0 && !db_has_map(key)) {
+
+ *keyp = key;
+ return MS_KEY_PARTIALLY_BOUND;
+ }
- return 0;
+ return MS_KEY_UNBOUND;
}
@@ -3262,7 +3483,8 @@ static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done)
}
}
-static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key) {
+Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key)
+{
int done = 0;
Sint ret = do_cmp_partly_bound(partly_bound_key, bound_key, &done);
#ifdef HARDDEBUG
@@ -3478,7 +3700,8 @@ static int do_partly_bound_can_match_greater(Eterm a, Eterm b,
* Callback functions for the different match functions
*/
-static int doit_select(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
+static int doit_select(DbTableCommon *tb, TreeDbTerm *this,
+ struct select_common* ptr,
int forward)
{
struct select_context *sc = (struct select_context *) ptr;
@@ -3513,7 +3736,8 @@ static int doit_select(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
return 1;
}
-static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
+static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this,
+ struct select_common* ptr,
int forward)
{
struct select_count_context *sc = (struct select_count_context *) ptr;
@@ -3537,7 +3761,8 @@ static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
return 1;
}
-static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
+static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this,
+ struct select_common* ptr,
int forward)
{
struct select_context *sc = (struct select_context *) ptr;
@@ -3575,7 +3800,8 @@ static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
}
-static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
+static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this,
+ struct select_common *ptr,
int forward)
{
struct select_delete_context *sc = (struct select_delete_context *) ptr;
@@ -3594,7 +3820,7 @@ static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, NULL, 0);
if (ret == am_true) {
key = GETKEY(sc->tb, this->dbterm.tpl);
- linkout_tree(sc->tb, sc->root, key, sc->stack);
+ linkout_tree(sc->tb, sc->common.root, key, sc->stack);
sc->erase_lastterm = 1;
++sc->accum;
}
@@ -3604,7 +3830,8 @@ static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
return 1;
}
-static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this, void *ptr,
+static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this,
+ struct select_common* ptr,
int forward)
{
struct select_replace_context *sc = (struct select_replace_context *) ptr;