diff options
Diffstat (limited to 'erts/emulator/beam/erl_db_tree.c')
-rw-r--r-- | erts/emulator/beam/erl_db_tree.c | 1081 |
1 files changed, 654 insertions, 427 deletions
diff --git a/erts/emulator/beam/erl_db_tree.c b/erts/emulator/beam/erl_db_tree.c index c6f1a0fc6d..f5fac9dcb6 100644 --- a/erts/emulator/beam/erl_db_tree.c +++ b/erts/emulator/beam/erl_db_tree.c @@ -70,8 +70,10 @@ */ ERTS_INLINE static DbTreeStack* get_static_stack(DbTableTree* tb) { - if (tb != NULL && !erts_atomic_xchg_acqb(&tb->is_stack_busy, 1)) { - return &tb->static_stack; + if (tb != NULL) { + ASSERT(IS_TREE_TABLE(tb->common.type)); + if (!erts_atomic_xchg_acqb(&tb->is_stack_busy, 1)) + return &tb->static_stack; } return NULL; } @@ -82,9 +84,10 @@ ERTS_INLINE static DbTreeStack* get_static_stack(DbTableTree* tb) static DbTreeStack* get_any_stack(DbTable* tb, DbTableTree* stack_container) { DbTreeStack* stack; - if (stack_container != NULL && - !erts_atomic_xchg_acqb(&stack_container->is_stack_busy, 1)) { - return &stack_container->static_stack; + if (stack_container != NULL) { + ASSERT(IS_TREE_TABLE(stack_container->common.type)); + if (!erts_atomic_xchg_acqb(&stack_container->is_stack_busy, 1)) + return &stack_container->static_stack; } stack = erts_db_alloc(ERTS_ALC_T_DB_STK, tb, sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED); @@ -96,14 +99,16 @@ static DbTreeStack* get_any_stack(DbTable* tb, DbTableTree* stack_container) static void release_stack(DbTable* tb, DbTableTree* stack_container, DbTreeStack* stack) { - if (stack_container != NULL && stack == &stack_container->static_stack) { - ASSERT(erts_atomic_read_nob(&stack_container->is_stack_busy) == 1); - erts_atomic_set_relb(&stack_container->is_stack_busy, 0); - } - else { - erts_db_free(ERTS_ALC_T_DB_STK, tb, - (void *) stack, sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED); + if (stack_container != NULL) { + ASSERT(IS_TREE_TABLE(stack_container->common.type)); + if (stack == &stack_container->static_stack) { + ASSERT(erts_atomic_read_nob(&stack_container->is_stack_busy) == 1); + erts_atomic_set_relb(&stack_container->is_stack_busy, 0); + return; + } } + erts_db_free(ERTS_ALC_T_DB_STK, tb, + (void *) stack, sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED); } static ERTS_INLINE void reset_stack(DbTreeStack* stack) @@ -117,6 +122,7 @@ static ERTS_INLINE void reset_stack(DbTreeStack* stack) static ERTS_INLINE void reset_static_stack(DbTableTree* tb) { if (tb != NULL) { + ASSERT(IS_TREE_TABLE(tb->common.type)); reset_stack(&tb->static_stack); } } @@ -185,31 +191,37 @@ static void do_dump_tree2(DbTableTree*, int to, void *to_arg, int show, ** Datatypes */ +enum ms_key_boundness { + /* Order significant, larger means more "boundness" => less iteration */ + MS_KEY_UNBOUND = 0, + MS_KEY_PARTIALLY_BOUND = 1, + MS_KEY_BOUND = 2, + MS_KEY_IMPOSSIBLE = 3 +}; + /* * This structure is filled in by analyze_pattern() for the select * functions. */ struct mp_info { - int something_can_match; /* The match_spec is not "impossible" */ - int some_limitation; /* There is some limitation on the search - * area, i. e. least and/or most is set.*/ - int got_partial; /* The limitation has a partially bound - * key */ + enum ms_key_boundness key_boundness; Eterm least; /* The lowest matching key (possibly * partially bound expression) */ Eterm most; /* The highest matching key (possibly * partially bound expression) */ - - TreeDbTerm **save_term; /* If the key is completely bound, this - * will be the Tree node we're searching - * for, otherwise it will be useless */ Binary *mp; /* The compiled match program */ }; +struct select_common { + TreeDbTerm **root; +}; + + /* * Used by doit_select(_chunk) */ struct select_context { + struct select_common common; Process *p; Eterm accum; Binary *mp; @@ -225,6 +237,7 @@ struct select_context { * Used by doit_select_count */ struct select_count_context { + struct select_common common; Process *p; Binary *mp; Eterm end_condition; @@ -238,9 +251,9 @@ struct select_count_context { * Used by doit_select_delete */ struct select_delete_context { + struct select_common common; Process *p; DbTableCommon *tb; - TreeDbTerm **root; DbTreeStack *stack; Uint accum; Binary *mp; @@ -255,9 +268,9 @@ struct select_delete_context { * Used by doit_select_replace */ struct select_replace_context { + struct select_common common; Process *p; DbTableCommon *tb; - TreeDbTerm **root; Binary *mp; Eterm end_condition; Eterm *lastobj; @@ -282,7 +295,8 @@ int tree_balance_left(TreeDbTerm **this); int tree_balance_right(TreeDbTerm **this); static int delsub(TreeDbTerm **this); static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root, Sint slot, - DbTable *tb, DbTableTree *stack_container); + DbTable *tb, DbTableTree *stack_container, + CATreeRootIterator *iter); static TreeDbTerm *find_node(DbTableCommon *tb, TreeDbTerm *root, Eterm key, DbTableTree *stack_container); static TreeDbTerm **find_node2(DbTableCommon *tb, TreeDbTerm **root, Eterm key); @@ -292,64 +306,62 @@ static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root, DbTreeStack* stack, Eterm key); static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root, DbTreeStack* stack, Eterm key); -static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, - DbTreeStack* stack, Eterm key); -static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, - DbTreeStack* stack, Eterm key); +static TreeDbTerm *find_next_from_pb_key(DbTable*, TreeDbTerm*** rootpp, + DbTreeStack* stack, Eterm key, + CATreeRootIterator*); +static TreeDbTerm *find_prev_from_pb_key(DbTable*, TreeDbTerm*** rootpp, + DbTreeStack* stack, Eterm key, + CATreeRootIterator*); +typedef int traverse_doit_funcT(DbTableCommon*, TreeDbTerm*, + struct select_common*, int forward); + static void traverse_backwards(DbTableCommon *tb, - TreeDbTerm **root, DbTreeStack*, Eterm lastkey, - int (*doit)(DbTableCommon *, - TreeDbTerm *, - void *, - int), - void *context); + traverse_doit_funcT*, + struct select_common *context, + CATreeRootIterator*); static void traverse_forward(DbTableCommon *tb, - TreeDbTerm **root, DbTreeStack*, Eterm lastkey, - int (*doit)(DbTableCommon *tb, - TreeDbTerm *, - void *, - int), - void *context); + traverse_doit_funcT*, + struct select_common *context, + CATreeRootIterator*); static void traverse_update_backwards(DbTableCommon *tb, - TreeDbTerm **root, DbTreeStack*, Eterm lastkey, int (*doit)(DbTableCommon *tb, TreeDbTerm **, // out - void *, + struct select_common*, int), - void *context); -static int key_given(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern, - TreeDbTerm ***ret, Eterm *partly_bound); -static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key); + struct select_common*, + CATreeRootIterator*); +static enum ms_key_boundness key_boundness(DbTableCommon *tb, + Eterm pattern, Eterm *keyp); static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done); -static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern, +static int analyze_pattern(DbTableCommon *tb, Eterm pattern, extra_match_validator_t extra_validator, /* Optional callback */ struct mp_info *mpi); static int doit_select(DbTableCommon *tb, - TreeDbTerm *this, - void *ptr, + TreeDbTerm *this, + struct select_common* ptr, int forward); static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, - void *ptr, + struct select_common*, int forward); static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, - void *ptr, + struct select_common*, int forward); static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, - void *ptr, + struct select_common*, int forward); static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this_ptr, - void *ptr, + struct select_common*, int forward); static int partly_bound_can_match_lesser(Eterm partly_bound_1, @@ -536,7 +548,7 @@ int db_next_tree_common(Process *p, DbTable *tbl, { TreeDbTerm *this; - if (is_atom(key) && key == am_EOT) + if (key == am_EOT) return DB_ERROR_BADKEY; this = find_next(&tbl->common, root, stack, key); if (this == NULL) { @@ -594,7 +606,7 @@ int db_prev_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, Eterm key, { TreeDbTerm *this; - if (is_atom(key) && key == am_EOT) + if (key == am_EOT) return DB_ERROR_BADKEY; this = find_prev(&tbl->common, root, stack, key); if (this == NULL) { @@ -862,7 +874,8 @@ static int db_erase_object_tree(DbTable *tbl, Eterm object, Eterm *ret) int db_slot_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, Eterm slot_term, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator *iter) { Sint slot; TreeDbTerm *st; @@ -892,7 +905,7 @@ int db_slot_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, * are counted from 1 and up. */ ++slot; - st = slot_search(p, root, slot, tbl, stack_container); + st = slot_search(p, root, slot, tbl, stack_container, iter); if (st == NULL) { *ret = am_false; return DB_ERROR_UNSPEC; @@ -910,7 +923,7 @@ static int db_slot_tree(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret) { DbTableTree *tb = &tbl->tree; - return db_slot_tree_common(p, tbl, tb->root, slot_term, ret, tb); + return db_slot_tree_common(p, tbl, tb->root, slot_term, ret, tb, NULL); } @@ -981,10 +994,10 @@ static BIF_RETTYPE bif_trap3(Export *bif, int db_select_continue_tree_common(Process *p, DbTableCommon *tb, - TreeDbTerm **root, Eterm continuation, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { DbTreeStack* stack; struct select_context sc; @@ -998,7 +1011,6 @@ int db_select_continue_tree_common(Process *p, Sint chunk_size; Sint reverse; - #define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0); /* Decode continuation. We know it's a tuple but not the arity or @@ -1032,23 +1044,32 @@ int db_select_continue_tree_common(Process *p, reverse = unsigned_val(tptr[7]); sc.got = signed_val(tptr[8]); - stack = get_any_stack((DbTable*)tb, stack_container); - if (chunk_size) { - if (reverse) { - traverse_backwards(tb, root, stack, lastkey, &doit_select_chunk, &sc); - } else { - traverse_forward(tb, root, stack, lastkey, &doit_select_chunk, &sc); - } - } else { - if (reverse) { - traverse_forward(tb, root, stack, lastkey, &doit_select, &sc); - } else { - traverse_backwards(tb, root, stack, lastkey, &doit_select, &sc); - } + if (iter) { + iter->next_route_key = lastkey; + sc.common.root = catree_find_nextprev_root(iter, !!reverse != !!chunk_size, NULL); } - release_stack((DbTable*)tb,stack_container,stack); + else + sc.common.root = &((DbTableTree*)tb)->root; + + if (sc.common.root) { + stack = get_any_stack((DbTable*)tb, stack_container); + if (chunk_size) { + if (reverse) { + traverse_backwards(tb, stack, lastkey, &doit_select_chunk, &sc.common, iter); + } else { + traverse_forward(tb, stack, lastkey, &doit_select_chunk, &sc.common, iter); + } + } else { + if (reverse) { + traverse_forward(tb, stack, lastkey, &doit_select, &sc.common, iter); + } else { + traverse_backwards(tb, stack, lastkey, &doit_select, &sc.common, iter); + } + } + release_stack((DbTable*)tb,stack_container,stack); - BUMP_REDS(p, 1000 - sc.max); + BUMP_REDS(p, 1000 - sc.max); + } if (sc.max > 0 || (chunk_size && sc.got == chunk_size)) { if (chunk_size) { @@ -1142,13 +1163,14 @@ static int db_select_continue_tree(Process *p, Eterm *ret) { DbTableTree *tb = &tbl->tree; - return db_select_continue_tree_common(p, &tb->common, &tb->root, - continuation, ret, tb); + return db_select_continue_tree_common(p, &tb->common, + continuation, ret, tb, NULL); } -int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, +int db_select_tree_common(Process *p, DbTable *tb, Eterm tid, Eterm pattern, int reverse, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { /* Strategy: Traverse backwards to build resulting list from tail to head */ DbTreeStack* stack; @@ -1179,44 +1201,64 @@ int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, sc.p = p; sc.max = 1000; sc.end_condition = NIL; - sc.keypos = tb->keypos; + sc.keypos = tb->common.keypos; sc.got = 0; sc.chunk_size = 0; - if ((errcode = analyze_pattern(tb, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } - if (!mpi.something_can_match) { + if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) { RET_TO_BIF(NIL,DB_ERROR_NONE); /* can't possibly match anything */ } sc.mp = mpi.mp; - if (!mpi.got_partial && mpi.some_limitation && - CMP_EQ(mpi.least,mpi.most)) { - doit_select(tb,*(mpi.save_term),&sc,0 /* direction doesn't matter */); + if (mpi.key_boundness == MS_KEY_BOUND) { + ASSERT(CMP_EQ(mpi.least, mpi.most)); + if (iter) + sc.common.root = catree_find_root(mpi.least, iter); + else + sc.common.root = &tb->tree.root; + this = find_node(&tb->common, *sc.common.root, mpi.least, NULL); + if (this) + doit_select(&tb->common, this, &sc.common, 0 /* direction doesn't matter */); RET_TO_BIF(sc.accum,DB_ERROR_NONE); } stack = get_any_stack((DbTable*)tb,stack_container); if (reverse) { - if (mpi.some_limitation) { - if ((this = find_prev_from_pb_key(tb, *root, stack, mpi.least)) != NULL) { + if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { + this = find_prev_from_pb_key(tb, &sc.common.root, stack, mpi.least, iter); + if (this) lastkey = GETKEY(tb, this->dbterm.tpl); - } sc.end_condition = mpi.most; } - traverse_forward(tb, root, stack, lastkey, &doit_select, &sc); + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_first_root(iter); + else + sc.common.root = &tb->tree.root; + } + traverse_forward(&tb->common, stack, lastkey, &doit_select, &sc.common, iter); } else { - if (mpi.some_limitation) { - if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) { - lastkey = GETKEY(tb, this->dbterm.tpl); - } + if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { + this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter); + if (this) + lastkey = GETKEY(tb, this->dbterm.tpl); sc.end_condition = mpi.least; } - traverse_backwards(tb, root, stack, lastkey, &doit_select, &sc); + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_last_root(iter); + else + sc.common.root = &tb->tree.root; + } + traverse_backwards(&tb->common, stack, lastkey, &doit_select, &sc.common, iter); } release_stack((DbTable*)tb,stack_container,stack); #ifdef HARDDEBUG @@ -1257,17 +1299,16 @@ int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, static int db_select_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, int reverse, Eterm *ret) { - DbTableTree *tb = &tbl->tree; - return db_select_tree_common(p, &tb->common, &tb->root, tid, - pattern, reverse, ret, tb); + return db_select_tree_common(p, tbl, tid, + pattern, reverse, ret, &tbl->tree, NULL); } int db_select_count_continue_tree_common(Process *p, - DbTableCommon *tb, - TreeDbTerm **root, + DbTable *tb, Eterm continuation, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { DbTreeStack* stack; struct select_count_context sc; @@ -1280,7 +1321,6 @@ int db_select_count_continue_tree_common(Process *p, Eterm *tptr; Eterm egot; - #define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0); /* Decode continuation. We know it's a tuple and everything else as @@ -1305,18 +1345,28 @@ int db_select_count_continue_tree_common(Process *p, sc.end_condition = NIL; sc.lastobj = NULL; sc.max = 1000; - sc.keypos = tb->keypos; + sc.keypos = tb->common.keypos; if (is_big(tptr[5])) { sc.got = big_to_uint32(tptr[5]); } else { sc.got = unsigned_val(tptr[5]); } - stack = get_any_stack((DbTable*)tb, stack_container); - traverse_backwards(tb, root, stack, lastkey, &doit_select_count, &sc); - release_stack((DbTable*)tb,stack_container,stack); + if (iter) { + iter->next_route_key = lastkey; + sc.common.root = catree_find_prev_root(iter, NULL); + } + else { + sc.common.root = &tb->tree.root; + } - BUMP_REDS(p, 1000 - sc.max); + if (sc.common.root) { + stack = get_any_stack(tb, stack_container); + traverse_backwards(&tb->common, stack, lastkey, &doit_select_count, &sc.common, iter); + release_stack(tb,stack_container,stack); + + BUMP_REDS(p, 1000 - sc.max); + } if (sc.max > 0) { RET_TO_BIF(erts_make_integer(sc.got,p), DB_ERROR_NONE); @@ -1361,14 +1411,15 @@ static int db_select_count_continue_tree(Process *p, Eterm *ret) { DbTableTree *tb = &tbl->tree; - return db_select_count_continue_tree_common(p, &tb->common, &tb->root, - continuation, ret, tb); + return db_select_count_continue_tree_common(p, tbl, + continuation, ret, tb, NULL); } -int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, +int db_select_count_tree_common(Process *p, DbTable *tb, Eterm tid, Eterm pattern, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { DbTreeStack* stack; struct select_count_context sc; @@ -1383,7 +1434,6 @@ int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root Eterm egot; Eterm mpb; - #define RET_TO_BIF(Term,RetVal) do { \ if (mpi.mp != NULL) { \ erts_bin_free(mpi.mp); \ @@ -1398,35 +1448,48 @@ int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root sc.p = p; sc.max = 1000; sc.end_condition = NIL; - sc.keypos = tb->keypos; + sc.keypos = tb->common.keypos; sc.got = 0; - if ((errcode = analyze_pattern(tb, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } - if (!mpi.something_can_match) { + if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) { RET_TO_BIF(make_small(0),DB_ERROR_NONE); /* can't possibly match anything */ } sc.mp = mpi.mp; - if (!mpi.got_partial && mpi.some_limitation && - CMP_EQ(mpi.least,mpi.most)) { - doit_select_count(tb,*(mpi.save_term),&sc,0 /* dummy */); + if (mpi.key_boundness == MS_KEY_BOUND) { + ASSERT(CMP_EQ(mpi.least, mpi.most)); + if (iter) + sc.common.root = catree_find_root(mpi.least, iter); + else + sc.common.root = &((DbTable*)tb)->tree.root; + this = find_node(&tb->common, *sc.common.root, mpi.least, NULL); + if (this) + doit_select_count(&tb->common, this, &sc.common, 0 /* dummy */); RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE); } stack = get_any_stack((DbTable*)tb, stack_container); - if (mpi.some_limitation) { - if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) { - lastkey = GETKEY(tb, this->dbterm.tpl); - } + if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { + this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter); + if (this) + lastkey = GETKEY(tb, this->dbterm.tpl); sc.end_condition = mpi.least; } + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_last_root(iter); + else + sc.common.root = &tb->tree.root; + } - traverse_backwards(tb, root, stack, lastkey, &doit_select_count, &sc); + traverse_backwards(&tb->common, stack, lastkey, &doit_select_count, &sc.common, iter); release_stack((DbTable*)tb,stack_container,stack); BUMP_REDS(p, 1000 - sc.max); if (sc.max > 0) { @@ -1467,15 +1530,16 @@ static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { DbTableTree *tb = &tbl->tree; - return db_select_count_tree_common(p, &tb->common, &tb->root, - tid, pattern, ret, tb); + return db_select_count_tree_common(p, tbl, + tid, pattern, ret, tb, NULL); } -int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, +int db_select_chunk_tree_common(Process *p, DbTable *tb, Eterm tid, Eterm pattern, Sint chunk_size, int reverse, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { DbTreeStack* stack; struct select_context sc; @@ -1489,7 +1553,6 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root int errcode; Eterm mpb; - #define RET_TO_BIF(Term,RetVal) do { \ if (mpi.mp != NULL) { \ erts_bin_free(mpi.mp); \ @@ -1505,24 +1568,30 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root sc.p = p; sc.max = 1000; sc.end_condition = NIL; - sc.keypos = tb->keypos; + sc.keypos = tb->common.keypos; sc.got = 0; sc.chunk_size = chunk_size; - if ((errcode = analyze_pattern(tb, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } - if (!mpi.something_can_match) { + if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) { RET_TO_BIF(am_EOT,DB_ERROR_NONE); /* can't possibly match anything */ } sc.mp = mpi.mp; - if (!mpi.got_partial && mpi.some_limitation && - CMP_EQ(mpi.least,mpi.most)) { - doit_select(tb,*(mpi.save_term),&sc, 0 /* direction doesn't matter */); + if (mpi.key_boundness == MS_KEY_BOUND) { + ASSERT(CMP_EQ(mpi.least, mpi.most)); + if (iter) + sc.common.root = catree_find_root(mpi.least, iter); + else + sc.common.root = &tb->tree.root; + this = find_node(&tb->common, *sc.common.root, mpi.least, NULL); + if (this) + doit_select(&tb->common, this, &sc.common, 0 /* direction doesn't matter */); if (sc.accum != NIL) { hp=HAlloc(p, 3); RET_TO_BIF(TUPLE2(hp,sc.accum,am_EOT),DB_ERROR_NONE); @@ -1533,21 +1602,35 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root stack = get_any_stack((DbTable*)tb,stack_container); if (reverse) { - if (mpi.some_limitation) { - if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) { - lastkey = GETKEY(tb, this->dbterm.tpl); - } + if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { + this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter); + if (this) + lastkey = GETKEY(tb, this->dbterm.tpl); sc.end_condition = mpi.least; } - traverse_backwards(tb, root, stack, lastkey, &doit_select_chunk, &sc); + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_last_root(iter); + else + sc.common.root = &tb->tree.root; + } + traverse_backwards(&tb->common, stack, lastkey, &doit_select_chunk, &sc.common, iter); } else { - if (mpi.some_limitation) { - if ((this = find_prev_from_pb_key(tb, *root, stack, mpi.least)) != NULL) { - lastkey = GETKEY(tb, this->dbterm.tpl); - } + if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { + this = find_prev_from_pb_key(tb, &sc.common.root, stack, mpi.least, iter); + if (this) + lastkey = GETKEY(tb, this->dbterm.tpl); sc.end_condition = mpi.most; } - traverse_forward(tb, root, stack, lastkey, &doit_select_chunk, &sc); + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_first_root(iter); + else + sc.common.root = &tb->tree.root; + } + traverse_forward(&tb->common, stack, lastkey, &doit_select_chunk, &sc.common, iter); } release_stack((DbTable*)tb,stack_container,stack); @@ -1624,18 +1707,18 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid, Eterm *ret) { DbTableTree *tb = &tbl->tree; - return db_select_chunk_tree_common(p, &tb->common, &tb->root, + return db_select_chunk_tree_common(p, tbl, tid, pattern, chunk_size, - reverse, ret, tb); + reverse, ret, tb, NULL); } int db_select_delete_continue_tree_common(Process *p, DbTable *tbl, - TreeDbTerm **root, Eterm continuation, Eterm *ret, - DbTreeStack* stack) + DbTreeStack* stack, + CATreeRootIterator* iter) { struct select_delete_context sc; unsigned sz; @@ -1647,7 +1730,6 @@ int db_select_delete_continue_tree_common(Process *p, Eterm *tptr; Eterm eaccsum; - #define RET_TO_BIF(Term, State) do { \ if (sc.erase_lastterm) { \ free_term(tbl, sc.lastterm); \ @@ -1670,7 +1752,6 @@ int db_select_delete_continue_tree_common(Process *p, mp = erts_db_get_match_prog_binary_unchecked(tptr[4]); sc.p = p; sc.tb = &tbl->common; - sc.root = root; sc.stack = stack; if (is_big(tptr[5])) { sc.accum = big_to_uint32(tptr[5]); @@ -1682,9 +1763,19 @@ int db_select_delete_continue_tree_common(Process *p, sc.max = 1000; sc.keypos = tbl->common.keypos; - traverse_backwards(&tbl->common, root, stack, lastkey, &doit_select_delete, &sc); + if (iter) { + iter->next_route_key = lastkey; + sc.common.root = catree_find_prev_root(iter, NULL); + } + else { + sc.common.root = &tbl->tree.root; + } - BUMP_REDS(p, 1000 - sc.max); + if (sc.common.root) { + traverse_backwards(&tbl->common, stack, lastkey, &doit_select_delete, &sc.common, iter); + + BUMP_REDS(p, 1000 - sc.max); + } if (sc.max > 0) { RET_TO_BIF(erts_make_integer(sc.accum, p), DB_ERROR_NONE); @@ -1726,16 +1817,15 @@ static int db_select_delete_continue_tree(Process *p, { DbTableTree *tb = &tbl->tree; ASSERT(!erts_atomic_read_nob(&tb->is_stack_busy)); - return db_select_delete_continue_tree_common(p, tbl, &tb->root, - continuation, ret, - &tb->static_stack); + return db_select_delete_continue_tree_common(p, tbl, continuation, ret, + &tb->static_stack, NULL); } int db_select_delete_tree_common(Process *p, DbTable *tbl, - TreeDbTerm **root, Eterm tid, Eterm pattern, Eterm *ret, - DbTreeStack* stack) + DbTreeStack* stack, + CATreeRootIterator* iter) { struct select_delete_context sc; struct mp_info mpi; @@ -1770,35 +1860,48 @@ int db_select_delete_tree_common(Process *p, DbTable *tbl, sc.end_condition = NIL; sc.keypos = tbl->common.keypos; sc.tb = &tbl->common; - sc.root = root; sc.stack = stack; - if ((errcode = analyze_pattern(&tbl->common, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(&tbl->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(0,errcode); } - if (!mpi.something_can_match) { + if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) { RET_TO_BIF(make_small(0),DB_ERROR_NONE); /* can't possibly match anything */ } sc.mp = mpi.mp; - if (!mpi.got_partial && mpi.some_limitation && - CMP_EQ(mpi.least,mpi.most)) { - doit_select_delete(&tbl->common,*(mpi.save_term),&sc, 0 /* direction doesn't + if (mpi.key_boundness == MS_KEY_BOUND) { + ASSERT(CMP_EQ(mpi.least, mpi.most)); + if (iter) + sc.common.root = catree_find_root(mpi.least, iter); + else + sc.common.root = &tbl->tree.root; + this = find_node(&tbl->common, *sc.common.root, mpi.least, NULL); + if (this) + doit_select_delete(&tbl->common, this, &sc.common, 0 /* direction doesn't matter */); RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE); } - if (mpi.some_limitation) { - if ((this = find_next_from_pb_key(&tbl->common, *root, stack, mpi.most)) != NULL) { - lastkey = GETKEY(&tbl->common, this->dbterm.tpl); - } + if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { + this = find_next_from_pb_key(tbl, &sc.common.root, stack, mpi.most, iter); + if (this) + lastkey = GETKEY(&tbl->common, this->dbterm.tpl); sc.end_condition = mpi.least; } + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_last_root(iter); + else + sc.common.root = &tbl->tree.root; + } - traverse_backwards(&tbl->common, root, stack, lastkey, &doit_select_delete, &sc); + traverse_backwards(&tbl->common, stack, lastkey, + &doit_select_delete, &sc.common, iter); BUMP_REDS(p, 1000 - sc.max); if (sc.max > 0) { @@ -1842,16 +1945,16 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { DbTableTree *tb = &tbl->tree; - return db_select_delete_tree_common(p, tbl, &tb->root, tid, pattern, ret, - &tb->static_stack); + return db_select_delete_tree_common(p, tbl, tid, pattern, ret, + &tb->static_stack, NULL); } int db_select_replace_continue_tree_common(Process *p, - DbTableCommon *tb, - TreeDbTerm **root, + DbTable *tbl, Eterm continuation, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { DbTreeStack* stack; struct select_replace_context sc; @@ -1888,7 +1991,7 @@ int db_select_replace_continue_tree_common(Process *p, sc.end_condition = NIL; sc.lastobj = NULL; sc.max = 1000; - sc.keypos = tb->keypos; + sc.keypos = tbl->common.keypos; if (is_big(tptr[5])) { sc.replaced = big_to_uint32(tptr[5]); } else { @@ -1896,9 +1999,18 @@ int db_select_replace_continue_tree_common(Process *p, } prev_replaced = sc.replaced; - stack = get_any_stack((DbTable*)tb,stack_container); - traverse_update_backwards(tb, root, stack, lastkey, &doit_select_replace, &sc); - release_stack((DbTable*)tb,stack_container,stack); + if (iter) { + iter->next_route_key = lastkey; + sc.common.root = catree_find_prev_root(iter, NULL); + } + else { + sc.common.root = &tbl->tree.root; + } + + stack = get_any_stack(tbl, stack_container); + traverse_update_backwards(&tbl->common, stack, lastkey, &doit_select_replace, + &sc.common, iter); + release_stack(tbl, stack_container,stack); // the more objects we've replaced, the more reductions we've consumed BUMP_REDS(p, MIN(2000, (1000 - sc.max) + (sc.replaced - prev_replaced))); @@ -1906,7 +2018,7 @@ int db_select_replace_continue_tree_common(Process *p, if (sc.max > 0) { RET_TO_BIF(erts_make_integer(sc.replaced,p), DB_ERROR_NONE); } - key = GETKEY(tb, sc.lastobj); + key = GETKEY(tbl, sc.lastobj); if (end_condition != NIL && (cmp_partly_bound(end_condition,key) > 0)) { /* done anyway */ @@ -1942,14 +2054,14 @@ static int db_select_replace_continue_tree(Process *p, Eterm continuation, Eterm *ret) { - DbTableTree *tb = &tbl->tree; - return db_select_replace_continue_tree_common(p, &tb->common, &tb->root, - continuation, ret, tb); + return db_select_replace_continue_tree_common(p, tbl, continuation, ret, + &tbl->tree, NULL); } -int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, +int db_select_replace_tree_common(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { DbTreeStack* stack; struct select_replace_context sc; @@ -1977,49 +2089,64 @@ int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **ro sc.lastobj = NULL; sc.p = p; - sc.tb = tb; - sc.root = root; + sc.tb = &tbl->common; sc.max = 1000; sc.end_condition = NIL; - sc.keypos = tb->keypos; + sc.keypos = tbl->common.keypos; sc.replaced = 0; - if ((errcode = analyze_pattern(tb, root, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(&tbl->common, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } - if (!mpi.something_can_match) { + if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) { RET_TO_BIF(make_small(0),DB_ERROR_NONE); /* can't possibly match anything */ } sc.mp = mpi.mp; - if (!mpi.got_partial && mpi.some_limitation && - CMP_EQ(mpi.least,mpi.most)) { - doit_select_replace(tb,mpi.save_term,&sc,0 /* dummy */); - reset_static_stack(stack_container); /* may refer replaced term */ + if (mpi.key_boundness == MS_KEY_BOUND) { + TreeDbTerm** pp; + ASSERT(CMP_EQ(mpi.least, mpi.most)); + if (iter) + sc.common.root = catree_find_root(mpi.least, iter); + else + sc.common.root = &tbl->tree.root; + pp = find_node2(&tbl->common, sc.common.root, mpi.least); + if (pp) { + doit_select_replace(&tbl->common, pp, &sc.common, 0 /* dummy */); + reset_static_stack(stack_container); /* may refer replaced term */ + } RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE); } - stack = get_any_stack((DbTable*)tb,stack_container); + stack = get_any_stack(tbl,stack_container); - if (mpi.some_limitation) { - if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) { - lastkey = GETKEY(tb, this->dbterm.tpl); - } + if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { + this = find_next_from_pb_key(tbl, &sc.common.root, stack, mpi.most, iter); + if (this) + lastkey = GETKEY(tbl, this->dbterm.tpl); sc.end_condition = mpi.least; } + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_last_root(iter); + else + sc.common.root = &tbl->tree.root; + } - traverse_update_backwards(tb, root, stack, lastkey, &doit_select_replace, &sc); - release_stack((DbTable*)tb,stack_container,stack); + traverse_update_backwards(&tbl->common, stack, lastkey, &doit_select_replace, + &sc.common, iter); + release_stack(tbl,stack_container,stack); // the more objects we've replaced, the more reductions we've consumed BUMP_REDS(p, MIN(2000, (1000 - sc.max) + sc.replaced)); if (sc.max > 0) { RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE); } - key = GETKEY(tb, sc.lastobj); + key = GETKEY(tbl, sc.lastobj); sz = size_object(key); if (IS_USMALL(0, sc.replaced)) { hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6); @@ -2052,9 +2179,8 @@ int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **ro static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { - DbTableTree *tb = &tbl->tree; - return db_select_replace_tree_common(p, &tb->common, &tb->root, - tid, pattern, ret, tb); + return db_select_replace_tree_common(p, tbl, tid, pattern, ret, + &tbl->tree, NULL); } int db_take_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root, @@ -2328,7 +2454,7 @@ static TreeDbTerm *linkout_object_tree(DbTableCommon *tb, TreeDbTerm **root, ** For the select functions, analyzes the pattern and determines which ** part of the tree should be searched. Also compiles the match program */ -static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern, +static int analyze_pattern(DbTableCommon *tb, Eterm pattern, extra_match_validator_t extra_validator, /* Optional callback */ struct mp_info *mpi) { @@ -2339,17 +2465,12 @@ static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern, Eterm *ptpl; int i; int num_heads = 0; - Eterm key; - Eterm partly_bound; - int res; - Eterm least = 0; - Eterm most = 0; + Eterm least = THE_NON_VALUE; + Eterm most = THE_NON_VALUE; + enum ms_key_boundness boundness; - mpi->some_limitation = 1; - mpi->got_partial = 0; - mpi->something_can_match = 0; + mpi->key_boundness = MS_KEY_IMPOSSIBLE; mpi->mp = NULL; - mpi->save_term = NULL; for (lst = pattern; is_list(lst); lst = CDR(list_val(lst))) ++num_heads; @@ -2370,6 +2491,7 @@ static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern, Eterm match; Eterm guard; Eterm body; + Eterm key; ttpl = CAR(list_val(lst)); if (!is_tuple(ttpl)) { @@ -2401,30 +2523,29 @@ static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern, } ++i; - partly_bound = NIL; - res = key_given(tb, root, tpl, &(mpi->save_term), &partly_bound); - if ( res >= 0 ) { /* Can match something */ - key = 0; - mpi->something_can_match = 1; - if (res > 0) { - key = GETKEY(tb,tuple_val(tpl)); - } else if (partly_bound != NIL) { - mpi->got_partial = 1; - key = partly_bound; - } else { - mpi->some_limitation = 0; - } - if (key != 0) { - if (least == 0 || - partly_bound_can_match_lesser(key,least)) { - least = key; - } - if (most == 0 || - partly_bound_can_match_greater(key,most)) { - most = key; - } - } - } + boundness = key_boundness(tb, tpl, &key); + switch (boundness) + { + case MS_KEY_BOUND: + case MS_KEY_PARTIALLY_BOUND: + if (is_non_value(least) || partly_bound_can_match_lesser(key,least)) { + least = key; + } + if (is_non_value(most) || partly_bound_can_match_greater(key,most)) { + most = key; + } + break; + case MS_KEY_IMPOSSIBLE: + case MS_KEY_UNBOUND: + break; + } + if (mpi->key_boundness > boundness) + mpi->key_boundness = boundness; + } + + if (mpi->key_boundness == MS_KEY_BOUND && !CMP_EQ(least, most)) { + /* Several different bound keys */ + mpi->key_boundness = MS_KEY_PARTIALLY_BOUND; } mpi->least = least; mpi->most = most; @@ -2608,11 +2729,27 @@ static int delsub(TreeDbTerm **this) static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root, Sint slot, DbTable *tb, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator *iter) { TreeDbTerm *this; TreeDbTerm *tmp; - DbTreeStack* stack = get_any_stack(tb,stack_container); + TreeDbTerm *lastobj; + Eterm lastkey; + TreeDbTerm **pp; + DbTreeStack* stack; + + if (iter) { + /* Find first non-empty tree */ + while (!root) { + TreeDbTerm** pp = catree_find_next_root(iter, NULL); + if (!pp) + return NULL; + root = *pp; + } + } + + stack = get_any_stack(tb,stack_container); ASSERT(stack != NULL); if (slot == 1) { /* Don't search from where we are if we are @@ -2624,56 +2761,83 @@ static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root, are not recorded */ stack->pos = 0; } - if (EMPTY_NODE(stack)) { - this = root; - if (this == NULL) - goto done; - while (this->left != NULL){ - PUSH_NODE(stack, this); - this = this->left; - } - PUSH_NODE(stack, this); - stack->slot = 1; - } - this = TOP_NODE(stack); - while (stack->slot != slot && this != NULL) { - if (slot > stack->slot) { - if (this->right != NULL) { - this = this->right; - while (this->left != NULL) { - PUSH_NODE(stack, this); - this = this->left; - } - PUSH_NODE(stack, this); - } else { - for (;;) { - tmp = POP_NODE(stack); - this = TOP_NODE(stack); - if (this == NULL || this->left == tmp) - break; - } - } - ++(stack->slot); - } else { - if (this->left != NULL) { - this = this->left; - while (this->right != NULL) { - PUSH_NODE(stack, this); - this = this->right; - } - PUSH_NODE(stack, this); - } else { - for (;;) { - tmp = POP_NODE(stack); - this = TOP_NODE(stack); - if (this == NULL || this->right == tmp) - break; - } - } - --(stack->slot); - } + while (1) { + if (EMPTY_NODE(stack)) { + this = root; + if (this == NULL) + goto next_root; + while (this->left != NULL){ + PUSH_NODE(stack, this); + this = this->left; + } + PUSH_NODE(stack, this); + stack->slot++; + } + this = TOP_NODE(stack); + while (stack->slot != slot) { + ASSERT(this); + lastobj = this; + if (slot > stack->slot) { + if (this->right != NULL) { + this = this->right; + while (this->left != NULL) { + PUSH_NODE(stack, this); + this = this->left; + } + PUSH_NODE(stack, this); + } else { + for (;;) { + tmp = POP_NODE(stack); + this = TOP_NODE(stack); + if (!this) + goto next_root; + if (this->left == tmp) + break; + } + } + ++(stack->slot); + } else { + if (this->left != NULL) { + this = this->left; + while (this->right != NULL) { + PUSH_NODE(stack, this); + this = this->right; + } + PUSH_NODE(stack, this); + } else { + for (;;) { + tmp = POP_NODE(stack); + this = TOP_NODE(stack); + if (!this) + goto next_root; + if (this->right == tmp) + break; + } + } + --(stack->slot); + } + } + /* Found slot */ + ASSERT(this); + break; + +next_root: + if (!iter) + break; /* EOT */ + + ASSERT(slot > stack->slot); + if (lastobj) { + lastkey = GETKEY(tb, lastobj->dbterm.tpl); + lastobj = NULL; + } + pp = catree_find_next_root(iter, &lastkey); + if (!pp) + break; /* EOT */ + root = *pp; + stack->pos = 0; + find_next(&tb->common, root, stack, lastkey); } -done: + release_stack(tb,stack_container,stack); return this; } @@ -2708,7 +2872,7 @@ static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root, this = this->right; } else if (c < 0) { if (this->left == NULL) /* Done */ - return this; + goto found_next; else this = this->left; } else @@ -2723,8 +2887,6 @@ static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root, this = this->left; PUSH_NODE(stack, this); } - if (stack->slot > 0) - ++(stack->slot); } else { do { tmp = POP_NODE(stack); @@ -2733,9 +2895,12 @@ static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root, return NULL; } } while (this->right == tmp); - if (stack->slot > 0) - ++(stack->slot); } + +found_next: + if (stack->slot > 0) + ++(stack->slot); + return this; } @@ -2765,7 +2930,7 @@ static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root, this = this->left; } else if (c > 0) { if (this->right == NULL) /* Done */ - return this; + goto found_prev; else this = this->right; } else @@ -2780,8 +2945,6 @@ static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root, this = this->right; PUSH_NODE(stack, this); } - if (stack->slot > 0) - --(stack->slot); } else { do { tmp = POP_NODE(stack); @@ -2790,74 +2953,92 @@ static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root, return NULL; } } while (this->left == tmp); - if (stack->slot > 0) - --(stack->slot); } + +found_prev: + if (stack->slot > 0) + --(stack->slot); + return this; } -static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, - DbTreeStack* stack, Eterm key) + +static TreeDbTerm *find_next_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp, + DbTreeStack* stack, Eterm key, + CATreeRootIterator* iter) { + TreeDbTerm* root; TreeDbTerm *this; - TreeDbTerm *tmp; + Uint candidate = 0; Sint c; + if (iter) { + *rootpp = catree_find_next_from_pb_key_root(key, iter); + ASSERT(*rootpp); + root = **rootpp; + } + else { + *rootpp = &tbl->tree.root; + root = tbl->tree.root; + } + /* spool the stack, we have to "re-search" */ stack->pos = stack->slot = 0; if (( this = root ) == NULL) return NULL; for (;;) { PUSH_NODE(stack, this); - if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl))) >= 0) { + if (( c = cmp_partly_bound(key,GETKEY(tbl, this->dbterm.tpl))) >= 0) { if (this->right == NULL) { - do { - tmp = POP_NODE(stack); - if (( this = TOP_NODE(stack)) == NULL) { - return NULL; - } - } while (this->right == tmp); - return this; - } else - this = this->right; + stack->pos = candidate; + return TOP_NODE(stack); + } + this = this->right; } else /*if (c < 0)*/ { if (this->left == NULL) /* Done */ return this; - else - this = this->left; + candidate = stack->pos; + this = this->left; } } } -static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, - DbTreeStack* stack, Eterm key) +static TreeDbTerm *find_prev_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp, + DbTreeStack* stack, Eterm key, + CATreeRootIterator* iter) { + TreeDbTerm* root; TreeDbTerm *this; - TreeDbTerm *tmp; + Uint candidate = 0; Sint c; + if (iter) { + *rootpp = catree_find_prev_from_pb_key_root(key, iter); + ASSERT(*rootpp); + root = **rootpp; + } + else { + *rootpp = &tbl->tree.root; + root = tbl->tree.root; + } + /* spool the stack, we have to "re-search" */ stack->pos = stack->slot = 0; if (( this = root ) == NULL) return NULL; for (;;) { PUSH_NODE(stack, this); - if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl))) <= 0) { + if (( c = cmp_partly_bound(key,GETKEY(tbl, this->dbterm.tpl))) <= 0) { if (this->left == NULL) { - do { - tmp = POP_NODE(stack); - if (( this = TOP_NODE(stack)) == NULL) { - return NULL; - } - } while (this->left == tmp); - return this; - } else - this = this->left; - } else /*if (c < 0)*/ { + stack->pos = candidate; + return TOP_NODE(stack); + } + this = this->left; + } else /*if (c > 0)*/ { if (this->right == NULL) /* Done */ return this; - else - this = this->right; + candidate = stack->pos; + this = this->right; } } } @@ -3046,38 +3227,53 @@ db_finalize_dbterm_tree(int cret, DbUpdateHandle *handle) * Traverse the tree with a callback function, used by db_match_xxx */ static void traverse_backwards(DbTableCommon *tb, - TreeDbTerm **root, DbTreeStack* stack, Eterm lastkey, - int (*doit)(DbTableCommon *, - TreeDbTerm *, - void *, - int), - void *context) + traverse_doit_funcT* doit, + struct select_common *context, + CATreeRootIterator* iter) { TreeDbTerm *this, *next; + TreeDbTerm** root = context->root; if (lastkey == THE_NON_VALUE) { - stack->pos = stack->slot = 0; - if (( this = *root ) == NULL) { - return; - } - while (this != NULL) { - PUSH_NODE(stack, this); - this = this->right; - } - this = TOP_NODE(stack); - next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); - if (!((*doit)(tb, this, context, 0))) - return; + if (iter) { + while (*root == NULL) { + root = catree_find_prev_root(iter, NULL); + if (!root) + return; + } + context->root = root; + } + stack->pos = stack->slot = 0; + next = *root; + while (next != NULL) { + PUSH_NODE(stack, next); + next = next->right; + } + next = TOP_NODE(stack); } else { - next = find_prev(tb, *root, stack, lastkey); + next = find_prev(tb, *root, stack, lastkey); } - while ((this = next) != NULL) { - next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); - if (!((*doit)(tb, this, context, 0))) - return; + while (1) { + while (next) { + this = next; + lastkey = GETKEY(tb, this->dbterm.tpl); + next = find_prev(tb, *root, stack, lastkey); + if (!((*doit)(tb, this, context, 0))) + return; + } + + if (!iter) + return; + ASSERT(is_value(lastkey)); + root = catree_find_prev_root(iter, &lastkey); + if (!root) + return; + context->root = root; + stack->pos = stack->slot = 0; + next = find_prev(tb, *root, stack, lastkey); } } @@ -3085,38 +3281,53 @@ static void traverse_backwards(DbTableCommon *tb, * Traverse the tree with a callback function, used by db_match_xxx */ static void traverse_forward(DbTableCommon *tb, - TreeDbTerm **root, DbTreeStack* stack, Eterm lastkey, - int (*doit)(DbTableCommon *, - TreeDbTerm *, - void *, - int), - void *context) + traverse_doit_funcT* doit, + struct select_common *context, + CATreeRootIterator* iter) { TreeDbTerm *this, *next; + TreeDbTerm **root = context->root; if (lastkey == THE_NON_VALUE) { - stack->pos = stack->slot = 0; - if (( this = *root ) == NULL) { - return; - } - while (this != NULL) { - PUSH_NODE(stack, this); - this = this->left; - } - this = TOP_NODE(stack); - next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); - if (!((*doit)(tb, this, context, 1))) - return; + if (iter) { + while (*root == NULL) { + root = catree_find_next_root(iter, NULL); + if (!root) + return; + } + context->root = root; + } + stack->pos = stack->slot = 0; + next = *root; + while (next != NULL) { + PUSH_NODE(stack, next); + next = next->left; + } + next = TOP_NODE(stack); } else { - next = find_next(tb, *root, stack, lastkey); + next = find_next(tb, *root, stack, lastkey); } - while ((this = next) != NULL) { - next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); - if (!((*doit)(tb, this, context, 1))) - return; + while (1) { + while (next) { + this = next; + lastkey = GETKEY(tb, this->dbterm.tpl); + next = find_next(tb, *root, stack, lastkey); + if (!((*doit)(tb, this, context, 1))) + return; + } + + if (!iter) + return; + ASSERT(is_value(lastkey)); + root = catree_find_next_root(iter, &lastkey); + if (!root) + return; + context->root = root; + stack->pos = stack->slot = 0; + next = find_next(tb, *root, stack, lastkey); } } @@ -3124,77 +3335,87 @@ static void traverse_forward(DbTableCommon *tb, * Traverse the tree with an update callback function, used by db_select_replace */ static void traverse_update_backwards(DbTableCommon *tb, - TreeDbTerm **root, DbTreeStack* stack, Eterm lastkey, int (*doit)(DbTableCommon*, TreeDbTerm**, - void*, + struct select_common*, int), - void* context) + struct select_common* context, + CATreeRootIterator* iter) { int res; TreeDbTerm *this, *next, **this_ptr; + TreeDbTerm** root = context->root; if (lastkey == THE_NON_VALUE) { - stack->pos = stack->slot = 0; - if (( this = *root ) == NULL) { - return; + if (iter) { + while (*root == NULL) { + root = catree_find_prev_root(iter, NULL); + if (!root) + return; + context->root = root; + } } - while (this != NULL) { - PUSH_NODE(stack, this); - this = this->right; + stack->pos = stack->slot = 0; + next = *root; + while (next) { + PUSH_NODE(stack, next); + next = next->right; } - this = TOP_NODE(stack); - this_ptr = find_ptr(tb, root, stack, this); - ASSERT(this_ptr != NULL); - res = (*doit)(tb, this_ptr, context, 0); - REPLACE_TOP_NODE(stack, *this_ptr); - next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl)); - if (!res) - return; - } else { - next = find_prev(tb, *root, stack, lastkey); + next = TOP_NODE(stack); } + else + next = find_prev(tb, *root, stack, lastkey); + - while ((this = next) != NULL) { - this_ptr = find_ptr(tb, root, stack, this); - ASSERT(this_ptr != NULL); - res = (*doit)(tb, this_ptr, context, 0); - REPLACE_TOP_NODE(stack, *this_ptr); - next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl)); - if (!res) + while (1) { + while (next) { + this = next; + this_ptr = find_ptr(tb, root, stack, this); + ASSERT(this_ptr != NULL); + res = (*doit)(tb, this_ptr, context, 0); + this = *this_ptr; + REPLACE_TOP_NODE(stack, this); + if (!res) + return; + lastkey = GETKEY(tb, this->dbterm.tpl); + next = find_prev(tb, *root, stack, lastkey); + } + + if (!iter) + return; + ASSERT(is_value(lastkey)); + root = catree_find_prev_root(iter, &lastkey); + if (!root) return; + context->root = root; + stack->pos = stack->slot = 0; + next = find_prev(tb, *root, stack, lastkey); } } -/* - * Returns 0 if not given 1 if given and -1 on no possible match - * if key is given; *ret is set to point to the object concerned. - */ -static int key_given(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern, - TreeDbTerm ***ret, Eterm *partly_bound) +static enum ms_key_boundness key_boundness(DbTableCommon *tb, + Eterm pattern, Eterm *keyp) { - TreeDbTerm **this; Eterm key; - ASSERT(ret != NULL); if (pattern == am_Underscore || db_is_variable(pattern) != -1) - return 0; + return MS_KEY_UNBOUND; key = db_getkey(tb->keypos, pattern); if (is_non_value(key)) - return -1; /* can't possibly match anything */ + return MS_KEY_IMPOSSIBLE; /* can't possibly match anything */ if (!db_has_variable(key)) { /* Bound key */ - if (( this = find_node2(tb, root, key) ) == NULL) { - return -1; - } - *ret = this; - return 1; - } else if (partly_bound != NULL && key != am_Underscore && - db_is_variable(key) < 0 && !db_has_map(key)) - *partly_bound = key; + *keyp = key; + return MS_KEY_BOUND; + } else if (key != am_Underscore && + db_is_variable(key) < 0 && !db_has_map(key)) { + + *keyp = key; + return MS_KEY_PARTIALLY_BOUND; + } - return 0; + return MS_KEY_UNBOUND; } @@ -3262,7 +3483,8 @@ static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done) } } -static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key) { +Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key) +{ int done = 0; Sint ret = do_cmp_partly_bound(partly_bound_key, bound_key, &done); #ifdef HARDDEBUG @@ -3478,7 +3700,8 @@ static int do_partly_bound_can_match_greater(Eterm a, Eterm b, * Callback functions for the different match functions */ -static int doit_select(DbTableCommon *tb, TreeDbTerm *this, void *ptr, +static int doit_select(DbTableCommon *tb, TreeDbTerm *this, + struct select_common* ptr, int forward) { struct select_context *sc = (struct select_context *) ptr; @@ -3513,7 +3736,8 @@ static int doit_select(DbTableCommon *tb, TreeDbTerm *this, void *ptr, return 1; } -static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, void *ptr, +static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, + struct select_common* ptr, int forward) { struct select_count_context *sc = (struct select_count_context *) ptr; @@ -3537,7 +3761,8 @@ static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, void *ptr, return 1; } -static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, void *ptr, +static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, + struct select_common* ptr, int forward) { struct select_context *sc = (struct select_context *) ptr; @@ -3575,7 +3800,8 @@ static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, void *ptr, } -static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr, +static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, + struct select_common *ptr, int forward) { struct select_delete_context *sc = (struct select_delete_context *) ptr; @@ -3594,7 +3820,7 @@ static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr, ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, NULL, 0); if (ret == am_true) { key = GETKEY(sc->tb, this->dbterm.tpl); - linkout_tree(sc->tb, sc->root, key, sc->stack); + linkout_tree(sc->tb, sc->common.root, key, sc->stack); sc->erase_lastterm = 1; ++sc->accum; } @@ -3604,7 +3830,8 @@ static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr, return 1; } -static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this, void *ptr, +static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this, + struct select_common* ptr, int forward) { struct select_replace_context *sc = (struct select_replace_context *) ptr; |