diff options
Diffstat (limited to 'erts/emulator/beam/erl_db_tree.c')
-rw-r--r-- | erts/emulator/beam/erl_db_tree.c | 658 |
1 files changed, 407 insertions, 251 deletions
diff --git a/erts/emulator/beam/erl_db_tree.c b/erts/emulator/beam/erl_db_tree.c index 4c58c40b9c..71ec6e28e5 100644 --- a/erts/emulator/beam/erl_db_tree.c +++ b/erts/emulator/beam/erl_db_tree.c @@ -212,10 +212,16 @@ struct mp_info { Binary *mp; /* The compiled match program */ }; +struct select_common { + TreeDbTerm **root; +}; + + /* * Used by doit_select(_chunk) */ struct select_context { + struct select_common common; Process *p; Eterm accum; Binary *mp; @@ -231,6 +237,7 @@ struct select_context { * Used by doit_select_count */ struct select_count_context { + struct select_common common; Process *p; Binary *mp; Eterm end_condition; @@ -244,9 +251,9 @@ struct select_count_context { * Used by doit_select_delete */ struct select_delete_context { + struct select_common common; Process *p; DbTableCommon *tb; - TreeDbTerm **root; DbTreeStack *stack; Uint accum; Binary *mp; @@ -261,9 +268,9 @@ struct select_delete_context { * Used by doit_select_replace */ struct select_replace_context { + struct select_common common; Process *p; DbTableCommon *tb; - TreeDbTerm **root; Binary *mp; Eterm end_condition; Eterm *lastobj; @@ -298,64 +305,62 @@ static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root, DbTreeStack* stack, Eterm key); static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root, DbTreeStack* stack, Eterm key); -static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, - DbTreeStack* stack, Eterm key); -static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, - DbTreeStack* stack, Eterm key); +static TreeDbTerm *find_next_from_pb_key(DbTable*, TreeDbTerm*** rootpp, + DbTreeStack* stack, Eterm key, + CATreeRootIterator*); +static TreeDbTerm *find_prev_from_pb_key(DbTable*, TreeDbTerm*** rootpp, + DbTreeStack* stack, Eterm key, + CATreeRootIterator*); +typedef int traverse_doit_funcT(DbTableCommon*, TreeDbTerm*, + struct select_common*, int forward); + static void traverse_backwards(DbTableCommon *tb, - TreeDbTerm **root, DbTreeStack*, Eterm lastkey, - int (*doit)(DbTableCommon *, - TreeDbTerm *, - void *, - int), - void *context); + traverse_doit_funcT*, + struct select_common *context, + CATreeRootIterator*); static void traverse_forward(DbTableCommon *tb, - TreeDbTerm **root, DbTreeStack*, Eterm lastkey, - int (*doit)(DbTableCommon *tb, - TreeDbTerm *, - void *, - int), - void *context); + traverse_doit_funcT*, + struct select_common *context, + CATreeRootIterator*); static void traverse_update_backwards(DbTableCommon *tb, - TreeDbTerm **root, DbTreeStack*, Eterm lastkey, int (*doit)(DbTableCommon *tb, TreeDbTerm **, // out - void *, + struct select_common*, int), - void *context); + struct select_common*, + CATreeRootIterator*); static enum ms_key_boundness key_boundness(DbTableCommon *tb, Eterm pattern, Eterm *keyp); -static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key); static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done); static int analyze_pattern(DbTableCommon *tb, Eterm pattern, extra_match_validator_t extra_validator, /* Optional callback */ struct mp_info *mpi); static int doit_select(DbTableCommon *tb, - TreeDbTerm *this, - void *ptr, + TreeDbTerm *this, + struct select_common* ptr, int forward); static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, - void *ptr, + struct select_common*, int forward); static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, - void *ptr, + struct select_common*, int forward); static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, - void *ptr, + struct select_common*, int forward); static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this_ptr, - void *ptr, + struct select_common*, int forward); static int partly_bound_can_match_lesser(Eterm partly_bound_1, @@ -987,10 +992,10 @@ static BIF_RETTYPE bif_trap3(Export *bif, int db_select_continue_tree_common(Process *p, DbTableCommon *tb, - TreeDbTerm **root, Eterm continuation, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { DbTreeStack* stack; struct select_context sc; @@ -1004,7 +1009,6 @@ int db_select_continue_tree_common(Process *p, Sint chunk_size; Sint reverse; - #define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0); /* Decode continuation. We know it's a tuple but not the arity or @@ -1038,23 +1042,32 @@ int db_select_continue_tree_common(Process *p, reverse = unsigned_val(tptr[7]); sc.got = signed_val(tptr[8]); - stack = get_any_stack((DbTable*)tb, stack_container); - if (chunk_size) { - if (reverse) { - traverse_backwards(tb, root, stack, lastkey, &doit_select_chunk, &sc); - } else { - traverse_forward(tb, root, stack, lastkey, &doit_select_chunk, &sc); - } - } else { - if (reverse) { - traverse_forward(tb, root, stack, lastkey, &doit_select, &sc); - } else { - traverse_backwards(tb, root, stack, lastkey, &doit_select, &sc); - } + if (iter) { + iter->next_route_key = lastkey; + sc.common.root = catree_find_nextprev_root(iter, !!reverse != !!chunk_size); } - release_stack((DbTable*)tb,stack_container,stack); + else + sc.common.root = &((DbTableTree*)tb)->root; + + if (sc.common.root) { + stack = get_any_stack((DbTable*)tb, stack_container); + if (chunk_size) { + if (reverse) { + traverse_backwards(tb, stack, lastkey, &doit_select_chunk, &sc.common, iter); + } else { + traverse_forward(tb, stack, lastkey, &doit_select_chunk, &sc.common, iter); + } + } else { + if (reverse) { + traverse_forward(tb, stack, lastkey, &doit_select, &sc.common, iter); + } else { + traverse_backwards(tb, stack, lastkey, &doit_select, &sc.common, iter); + } + } + release_stack((DbTable*)tb,stack_container,stack); - BUMP_REDS(p, 1000 - sc.max); + BUMP_REDS(p, 1000 - sc.max); + } if (sc.max > 0 || (chunk_size && sc.got == chunk_size)) { if (chunk_size) { @@ -1148,13 +1161,14 @@ static int db_select_continue_tree(Process *p, Eterm *ret) { DbTableTree *tb = &tbl->tree; - return db_select_continue_tree_common(p, &tb->common, &tb->root, - continuation, ret, tb); + return db_select_continue_tree_common(p, &tb->common, + continuation, ret, tb, NULL); } -int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, +int db_select_tree_common(Process *p, DbTable *tb, Eterm tid, Eterm pattern, int reverse, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { /* Strategy: Traverse backwards to build resulting list from tail to head */ DbTreeStack* stack; @@ -1185,11 +1199,11 @@ int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, sc.p = p; sc.max = 1000; sc.end_condition = NIL; - sc.keypos = tb->keypos; + sc.keypos = tb->common.keypos; sc.got = 0; sc.chunk_size = 0; - if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } @@ -1202,29 +1216,47 @@ int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, if (mpi.key_boundness == MS_KEY_BOUND) { ASSERT(CMP_EQ(mpi.least, mpi.most)); - this = find_node(tb, *root, mpi.least, NULL); + if (iter) + sc.common.root = catree_find_root(mpi.least, iter); + else + sc.common.root = &tb->tree.root; + this = find_node(&tb->common, *sc.common.root, mpi.least, NULL); if (this) - doit_select(tb, this, &sc, 0 /* direction doesn't matter */); + doit_select(&tb->common, this, &sc.common, 0 /* direction doesn't matter */); RET_TO_BIF(sc.accum,DB_ERROR_NONE); } stack = get_any_stack((DbTable*)tb,stack_container); if (reverse) { if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { - if ((this = find_prev_from_pb_key(tb, *root, stack, mpi.least)) != NULL) { + this = find_prev_from_pb_key(tb, &sc.common.root, stack, mpi.least, iter); + if (this) lastkey = GETKEY(tb, this->dbterm.tpl); - } sc.end_condition = mpi.most; } - traverse_forward(tb, root, stack, lastkey, &doit_select, &sc); + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_first_root(iter); + else + sc.common.root = &tb->tree.root; + } + traverse_forward(&tb->common, stack, lastkey, &doit_select, &sc.common, iter); } else { if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { - if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) { - lastkey = GETKEY(tb, this->dbterm.tpl); - } + this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter); + if (this) + lastkey = GETKEY(tb, this->dbterm.tpl); sc.end_condition = mpi.least; } - traverse_backwards(tb, root, stack, lastkey, &doit_select, &sc); + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_last_root(iter); + else + sc.common.root = &tb->tree.root; + } + traverse_backwards(&tb->common, stack, lastkey, &doit_select, &sc.common, iter); } release_stack((DbTable*)tb,stack_container,stack); #ifdef HARDDEBUG @@ -1265,17 +1297,16 @@ int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, static int db_select_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, int reverse, Eterm *ret) { - DbTableTree *tb = &tbl->tree; - return db_select_tree_common(p, &tb->common, &tb->root, tid, - pattern, reverse, ret, tb); + return db_select_tree_common(p, tbl, tid, + pattern, reverse, ret, &tbl->tree, NULL); } int db_select_count_continue_tree_common(Process *p, - DbTableCommon *tb, - TreeDbTerm **root, + DbTable *tb, Eterm continuation, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { DbTreeStack* stack; struct select_count_context sc; @@ -1288,7 +1319,6 @@ int db_select_count_continue_tree_common(Process *p, Eterm *tptr; Eterm egot; - #define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0); /* Decode continuation. We know it's a tuple and everything else as @@ -1313,18 +1343,28 @@ int db_select_count_continue_tree_common(Process *p, sc.end_condition = NIL; sc.lastobj = NULL; sc.max = 1000; - sc.keypos = tb->keypos; + sc.keypos = tb->common.keypos; if (is_big(tptr[5])) { sc.got = big_to_uint32(tptr[5]); } else { sc.got = unsigned_val(tptr[5]); } - stack = get_any_stack((DbTable*)tb, stack_container); - traverse_backwards(tb, root, stack, lastkey, &doit_select_count, &sc); - release_stack((DbTable*)tb,stack_container,stack); + if (iter) { + iter->next_route_key = lastkey; + sc.common.root = catree_find_prev_root(iter); + } + else { + sc.common.root = &tb->tree.root; + } - BUMP_REDS(p, 1000 - sc.max); + if (sc.common.root) { + stack = get_any_stack(tb, stack_container); + traverse_backwards(&tb->common, stack, lastkey, &doit_select_count, &sc.common, iter); + release_stack(tb,stack_container,stack); + + BUMP_REDS(p, 1000 - sc.max); + } if (sc.max > 0) { RET_TO_BIF(erts_make_integer(sc.got,p), DB_ERROR_NONE); @@ -1369,14 +1409,15 @@ static int db_select_count_continue_tree(Process *p, Eterm *ret) { DbTableTree *tb = &tbl->tree; - return db_select_count_continue_tree_common(p, &tb->common, &tb->root, - continuation, ret, tb); + return db_select_count_continue_tree_common(p, tbl, + continuation, ret, tb, NULL); } -int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, +int db_select_count_tree_common(Process *p, DbTable *tb, Eterm tid, Eterm pattern, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { DbTreeStack* stack; struct select_count_context sc; @@ -1391,7 +1432,6 @@ int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root Eterm egot; Eterm mpb; - #define RET_TO_BIF(Term,RetVal) do { \ if (mpi.mp != NULL) { \ erts_bin_free(mpi.mp); \ @@ -1406,10 +1446,10 @@ int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root sc.p = p; sc.max = 1000; sc.end_condition = NIL; - sc.keypos = tb->keypos; + sc.keypos = tb->common.keypos; sc.got = 0; - if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } @@ -1422,21 +1462,32 @@ int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root if (mpi.key_boundness == MS_KEY_BOUND) { ASSERT(CMP_EQ(mpi.least, mpi.most)); - this = find_node(tb, *root, mpi.least, NULL); + if (iter) + sc.common.root = catree_find_root(mpi.least, iter); + else + sc.common.root = &((DbTable*)tb)->tree.root; + this = find_node(&tb->common, *sc.common.root, mpi.least, NULL); if (this) - doit_select_count(tb, this, &sc, 0 /* dummy */); + doit_select_count(&tb->common, this, &sc.common, 0 /* dummy */); RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE); } stack = get_any_stack((DbTable*)tb, stack_container); if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { - if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) { - lastkey = GETKEY(tb, this->dbterm.tpl); - } + this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter); + if (this) + lastkey = GETKEY(tb, this->dbterm.tpl); sc.end_condition = mpi.least; } + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_last_root(iter); + else + sc.common.root = &tb->tree.root; + } - traverse_backwards(tb, root, stack, lastkey, &doit_select_count, &sc); + traverse_backwards(&tb->common, stack, lastkey, &doit_select_count, &sc.common, iter); release_stack((DbTable*)tb,stack_container,stack); BUMP_REDS(p, 1000 - sc.max); if (sc.max > 0) { @@ -1477,15 +1528,16 @@ static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { DbTableTree *tb = &tbl->tree; - return db_select_count_tree_common(p, &tb->common, &tb->root, - tid, pattern, ret, tb); + return db_select_count_tree_common(p, tbl, + tid, pattern, ret, tb, NULL); } -int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, +int db_select_chunk_tree_common(Process *p, DbTable *tb, Eterm tid, Eterm pattern, Sint chunk_size, int reverse, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { DbTreeStack* stack; struct select_context sc; @@ -1499,7 +1551,6 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root int errcode; Eterm mpb; - #define RET_TO_BIF(Term,RetVal) do { \ if (mpi.mp != NULL) { \ erts_bin_free(mpi.mp); \ @@ -1515,11 +1566,11 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root sc.p = p; sc.max = 1000; sc.end_condition = NIL; - sc.keypos = tb->keypos; + sc.keypos = tb->common.keypos; sc.got = 0; sc.chunk_size = chunk_size; - if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } @@ -1532,9 +1583,13 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root if (mpi.key_boundness == MS_KEY_BOUND) { ASSERT(CMP_EQ(mpi.least, mpi.most)); - this = find_node(tb, *root, mpi.least, NULL); + if (iter) + sc.common.root = catree_find_root(mpi.least, iter); + else + sc.common.root = &tb->tree.root; + this = find_node(&tb->common, *sc.common.root, mpi.least, NULL); if (this) - doit_select(tb, this, &sc, 0 /* direction doesn't matter */); + doit_select(&tb->common, this, &sc.common, 0 /* direction doesn't matter */); if (sc.accum != NIL) { hp=HAlloc(p, 3); RET_TO_BIF(TUPLE2(hp,sc.accum,am_EOT),DB_ERROR_NONE); @@ -1546,20 +1601,34 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root stack = get_any_stack((DbTable*)tb,stack_container); if (reverse) { if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { - if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) { - lastkey = GETKEY(tb, this->dbterm.tpl); - } + this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter); + if (this) + lastkey = GETKEY(tb, this->dbterm.tpl); sc.end_condition = mpi.least; } - traverse_backwards(tb, root, stack, lastkey, &doit_select_chunk, &sc); + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_last_root(iter); + else + sc.common.root = &tb->tree.root; + } + traverse_backwards(&tb->common, stack, lastkey, &doit_select_chunk, &sc.common, iter); } else { if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { - if ((this = find_prev_from_pb_key(tb, *root, stack, mpi.least)) != NULL) { - lastkey = GETKEY(tb, this->dbterm.tpl); - } + this = find_prev_from_pb_key(tb, &sc.common.root, stack, mpi.least, iter); + if (this) + lastkey = GETKEY(tb, this->dbterm.tpl); sc.end_condition = mpi.most; } - traverse_forward(tb, root, stack, lastkey, &doit_select_chunk, &sc); + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_first_root(iter); + else + sc.common.root = &tb->tree.root; + } + traverse_forward(&tb->common, stack, lastkey, &doit_select_chunk, &sc.common, iter); } release_stack((DbTable*)tb,stack_container,stack); @@ -1636,18 +1705,18 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid, Eterm *ret) { DbTableTree *tb = &tbl->tree; - return db_select_chunk_tree_common(p, &tb->common, &tb->root, + return db_select_chunk_tree_common(p, tbl, tid, pattern, chunk_size, - reverse, ret, tb); + reverse, ret, tb, NULL); } int db_select_delete_continue_tree_common(Process *p, DbTable *tbl, - TreeDbTerm **root, Eterm continuation, Eterm *ret, - DbTreeStack* stack) + DbTreeStack* stack, + CATreeRootIterator* iter) { struct select_delete_context sc; unsigned sz; @@ -1659,7 +1728,6 @@ int db_select_delete_continue_tree_common(Process *p, Eterm *tptr; Eterm eaccsum; - #define RET_TO_BIF(Term, State) do { \ if (sc.erase_lastterm) { \ free_term(tbl, sc.lastterm); \ @@ -1682,7 +1750,6 @@ int db_select_delete_continue_tree_common(Process *p, mp = erts_db_get_match_prog_binary_unchecked(tptr[4]); sc.p = p; sc.tb = &tbl->common; - sc.root = root; sc.stack = stack; if (is_big(tptr[5])) { sc.accum = big_to_uint32(tptr[5]); @@ -1694,9 +1761,19 @@ int db_select_delete_continue_tree_common(Process *p, sc.max = 1000; sc.keypos = tbl->common.keypos; - traverse_backwards(&tbl->common, root, stack, lastkey, &doit_select_delete, &sc); + if (iter) { + iter->next_route_key = lastkey; + sc.common.root = catree_find_prev_root(iter); + } + else { + sc.common.root = &tbl->tree.root; + } - BUMP_REDS(p, 1000 - sc.max); + if (sc.common.root) { + traverse_backwards(&tbl->common, stack, lastkey, &doit_select_delete, &sc.common, iter); + + BUMP_REDS(p, 1000 - sc.max); + } if (sc.max > 0) { RET_TO_BIF(erts_make_integer(sc.accum, p), DB_ERROR_NONE); @@ -1738,16 +1815,15 @@ static int db_select_delete_continue_tree(Process *p, { DbTableTree *tb = &tbl->tree; ASSERT(!erts_atomic_read_nob(&tb->is_stack_busy)); - return db_select_delete_continue_tree_common(p, tbl, &tb->root, - continuation, ret, - &tb->static_stack); + return db_select_delete_continue_tree_common(p, tbl, continuation, ret, + &tb->static_stack, NULL); } int db_select_delete_tree_common(Process *p, DbTable *tbl, - TreeDbTerm **root, Eterm tid, Eterm pattern, Eterm *ret, - DbTreeStack* stack) + DbTreeStack* stack, + CATreeRootIterator* iter) { struct select_delete_context sc; struct mp_info mpi; @@ -1782,7 +1858,6 @@ int db_select_delete_tree_common(Process *p, DbTable *tbl, sc.end_condition = NIL; sc.keypos = tbl->common.keypos; sc.tb = &tbl->common; - sc.root = root; sc.stack = stack; if ((errcode = analyze_pattern(&tbl->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) { @@ -1798,21 +1873,33 @@ int db_select_delete_tree_common(Process *p, DbTable *tbl, if (mpi.key_boundness == MS_KEY_BOUND) { ASSERT(CMP_EQ(mpi.least, mpi.most)); - this = find_node(&tbl->common, *root, mpi.least, NULL); + if (iter) + sc.common.root = catree_find_root(mpi.least, iter); + else + sc.common.root = &tbl->tree.root; + this = find_node(&tbl->common, *sc.common.root, mpi.least, NULL); if (this) - doit_select_delete(&tbl->common, this, &sc, 0 /* direction doesn't + doit_select_delete(&tbl->common, this, &sc.common, 0 /* direction doesn't matter */); RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE); } if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { - if ((this = find_next_from_pb_key(&tbl->common, *root, stack, mpi.most)) != NULL) { - lastkey = GETKEY(&tbl->common, this->dbterm.tpl); - } + this = find_next_from_pb_key(tbl, &sc.common.root, stack, mpi.most, iter); + if (this) + lastkey = GETKEY(&tbl->common, this->dbterm.tpl); sc.end_condition = mpi.least; } + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_last_root(iter); + else + sc.common.root = &tbl->tree.root; + } - traverse_backwards(&tbl->common, root, stack, lastkey, &doit_select_delete, &sc); + traverse_backwards(&tbl->common, stack, lastkey, + &doit_select_delete, &sc.common, iter); BUMP_REDS(p, 1000 - sc.max); if (sc.max > 0) { @@ -1856,16 +1943,16 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { DbTableTree *tb = &tbl->tree; - return db_select_delete_tree_common(p, tbl, &tb->root, tid, pattern, ret, - &tb->static_stack); + return db_select_delete_tree_common(p, tbl, tid, pattern, ret, + &tb->static_stack, NULL); } int db_select_replace_continue_tree_common(Process *p, - DbTableCommon *tb, - TreeDbTerm **root, + DbTable *tbl, Eterm continuation, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { DbTreeStack* stack; struct select_replace_context sc; @@ -1902,7 +1989,7 @@ int db_select_replace_continue_tree_common(Process *p, sc.end_condition = NIL; sc.lastobj = NULL; sc.max = 1000; - sc.keypos = tb->keypos; + sc.keypos = tbl->common.keypos; if (is_big(tptr[5])) { sc.replaced = big_to_uint32(tptr[5]); } else { @@ -1910,9 +1997,18 @@ int db_select_replace_continue_tree_common(Process *p, } prev_replaced = sc.replaced; - stack = get_any_stack((DbTable*)tb,stack_container); - traverse_update_backwards(tb, root, stack, lastkey, &doit_select_replace, &sc); - release_stack((DbTable*)tb,stack_container,stack); + if (iter) { + iter->next_route_key = lastkey; + sc.common.root = catree_find_prev_root(iter); + } + else { + sc.common.root = &tbl->tree.root; + } + + stack = get_any_stack(tbl, stack_container); + traverse_update_backwards(&tbl->common, stack, lastkey, &doit_select_replace, + &sc.common, iter); + release_stack(tbl, stack_container,stack); // the more objects we've replaced, the more reductions we've consumed BUMP_REDS(p, MIN(2000, (1000 - sc.max) + (sc.replaced - prev_replaced))); @@ -1920,7 +2016,7 @@ int db_select_replace_continue_tree_common(Process *p, if (sc.max > 0) { RET_TO_BIF(erts_make_integer(sc.replaced,p), DB_ERROR_NONE); } - key = GETKEY(tb, sc.lastobj); + key = GETKEY(tbl, sc.lastobj); if (end_condition != NIL && (cmp_partly_bound(end_condition,key) > 0)) { /* done anyway */ @@ -1956,14 +2052,14 @@ static int db_select_replace_continue_tree(Process *p, Eterm continuation, Eterm *ret) { - DbTableTree *tb = &tbl->tree; - return db_select_replace_continue_tree_common(p, &tb->common, &tb->root, - continuation, ret, tb); + return db_select_replace_continue_tree_common(p, tbl, continuation, ret, + &tbl->tree, NULL); } -int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, +int db_select_replace_tree_common(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret, - DbTableTree *stack_container) + DbTableTree *stack_container, + CATreeRootIterator* iter) { DbTreeStack* stack; struct select_replace_context sc; @@ -1991,14 +2087,13 @@ int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **ro sc.lastobj = NULL; sc.p = p; - sc.tb = tb; - sc.root = root; + sc.tb = &tbl->common; sc.max = 1000; sc.end_condition = NIL; - sc.keypos = tb->keypos; + sc.keypos = tbl->common.keypos; sc.replaced = 0; - if ((errcode = analyze_pattern(tb, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(&tbl->common, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } @@ -2012,32 +2107,44 @@ int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **ro if (mpi.key_boundness == MS_KEY_BOUND) { TreeDbTerm** pp; ASSERT(CMP_EQ(mpi.least, mpi.most)); - pp = find_node2(tb, root, mpi.least); + if (iter) + sc.common.root = catree_find_root(mpi.least, iter); + else + sc.common.root = &tbl->tree.root; + pp = find_node2(&tbl->common, sc.common.root, mpi.least); if (pp) { - doit_select_replace(tb, pp, &sc, 0 /* dummy */); + doit_select_replace(&tbl->common, pp, &sc.common, 0 /* dummy */); reset_static_stack(stack_container); /* may refer replaced term */ } RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE); } - stack = get_any_stack((DbTable*)tb,stack_container); + stack = get_any_stack(tbl,stack_container); if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) { - if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) { - lastkey = GETKEY(tb, this->dbterm.tpl); - } + this = find_next_from_pb_key(tbl, &sc.common.root, stack, mpi.most, iter); + if (this) + lastkey = GETKEY(tbl, this->dbterm.tpl); sc.end_condition = mpi.least; } + else { + ASSERT(mpi.key_boundness == MS_KEY_UNBOUND); + if (iter) + sc.common.root = catree_find_last_root(iter); + else + sc.common.root = &tbl->tree.root; + } - traverse_update_backwards(tb, root, stack, lastkey, &doit_select_replace, &sc); - release_stack((DbTable*)tb,stack_container,stack); + traverse_update_backwards(&tbl->common, stack, lastkey, &doit_select_replace, + &sc.common, iter); + release_stack(tbl,stack_container,stack); // the more objects we've replaced, the more reductions we've consumed BUMP_REDS(p, MIN(2000, (1000 - sc.max) + sc.replaced)); if (sc.max > 0) { RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE); } - key = GETKEY(tb, sc.lastobj); + key = GETKEY(tbl, sc.lastobj); sz = size_object(key); if (IS_USMALL(0, sc.replaced)) { hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6); @@ -2070,9 +2177,8 @@ int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **ro static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { - DbTableTree *tb = &tbl->tree; - return db_select_replace_tree_common(p, &tb->common, &tb->root, - tid, pattern, ret, tb); + return db_select_replace_tree_common(p, tbl, tid, pattern, ret, + &tbl->tree, NULL); } int db_take_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root, @@ -2809,20 +2915,32 @@ static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root, return this; } -static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, - DbTreeStack* stack, Eterm key) + +static TreeDbTerm *find_next_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp, + DbTreeStack* stack, Eterm key, + CATreeRootIterator* iter) { + TreeDbTerm* root; TreeDbTerm *this; TreeDbTerm *tmp; Sint c; + if (iter) { + *rootpp = catree_find_next_from_pb_key_root(key, iter); + root = *rootpp ? **rootpp : NULL; + } + else { + *rootpp = &tbl->tree.root; + root = tbl->tree.root; + } + /* spool the stack, we have to "re-search" */ stack->pos = stack->slot = 0; if (( this = root ) == NULL) return NULL; for (;;) { PUSH_NODE(stack, this); - if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl))) >= 0) { + if (( c = cmp_partly_bound(key,GETKEY(tbl, this->dbterm.tpl))) >= 0) { if (this->right == NULL) { do { tmp = POP_NODE(stack); @@ -2842,20 +2960,31 @@ static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, } } -static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, - DbTreeStack* stack, Eterm key) +static TreeDbTerm *find_prev_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp, + DbTreeStack* stack, Eterm key, + CATreeRootIterator* iter) { + TreeDbTerm* root; TreeDbTerm *this; TreeDbTerm *tmp; Sint c; + if (iter) { + *rootpp = catree_find_prev_from_pb_key_root(key, iter); + root = *rootpp ? **rootpp : NULL; + } + else { + *rootpp = &tbl->tree.root; + root = tbl->tree.root; + } + /* spool the stack, we have to "re-search" */ stack->pos = stack->slot = 0; if (( this = root ) == NULL) return NULL; for (;;) { PUSH_NODE(stack, this); - if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl))) <= 0) { + if (( c = cmp_partly_bound(key,GETKEY(tbl, this->dbterm.tpl))) <= 0) { if (this->left == NULL) { do { tmp = POP_NODE(stack); @@ -2866,7 +2995,7 @@ static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, return this; } else this = this->left; - } else /*if (c < 0)*/ { + } else /*if (c > 0)*/ { if (this->right == NULL) /* Done */ return this; else @@ -3059,126 +3188,147 @@ db_finalize_dbterm_tree(int cret, DbUpdateHandle *handle) * Traverse the tree with a callback function, used by db_match_xxx */ static void traverse_backwards(DbTableCommon *tb, - TreeDbTerm **root, DbTreeStack* stack, Eterm lastkey, - int (*doit)(DbTableCommon *, - TreeDbTerm *, - void *, - int), - void *context) + traverse_doit_funcT* doit, + struct select_common *context, + CATreeRootIterator* iter) { TreeDbTerm *this, *next; + TreeDbTerm** root = context->root; - if (lastkey == THE_NON_VALUE) { - stack->pos = stack->slot = 0; - if (( this = *root ) == NULL) { - return; - } - while (this != NULL) { - PUSH_NODE(stack, this); - this = this->right; - } - this = TOP_NODE(stack); - next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); - if (!((*doit)(tb, this, context, 0))) - return; - } else { - next = find_prev(tb, *root, stack, lastkey); - } + do { + if (lastkey == THE_NON_VALUE) { + stack->pos = stack->slot = 0; + if (( this = *root ) == NULL) { + goto next_root; + } + while (this != NULL) { + PUSH_NODE(stack, this); + this = this->right; + } + this = TOP_NODE(stack); + next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); + if (!((*doit)(tb, this, context, 0))) + return; + } else { + next = find_prev(tb, *root, stack, lastkey); + } - while ((this = next) != NULL) { - next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); - if (!((*doit)(tb, this, context, 0))) - return; - } + while ((this = next) != NULL) { + next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); + if (!((*doit)(tb, this, context, 0))) + return; + } + +next_root: + if (!iter) + return; + root = context->root = catree_find_prev_root(iter); + lastkey = THE_NON_VALUE; + } while (root); } /* * Traverse the tree with a callback function, used by db_match_xxx */ static void traverse_forward(DbTableCommon *tb, - TreeDbTerm **root, DbTreeStack* stack, Eterm lastkey, - int (*doit)(DbTableCommon *, - TreeDbTerm *, - void *, - int), - void *context) + traverse_doit_funcT* doit, + struct select_common *context, + CATreeRootIterator* iter) { TreeDbTerm *this, *next; + TreeDbTerm **root = context->root; - if (lastkey == THE_NON_VALUE) { - stack->pos = stack->slot = 0; - if (( this = *root ) == NULL) { - return; - } - while (this != NULL) { - PUSH_NODE(stack, this); - this = this->left; - } - this = TOP_NODE(stack); - next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); - if (!((*doit)(tb, this, context, 1))) - return; - } else { - next = find_next(tb, *root, stack, lastkey); - } + do { + if (lastkey == THE_NON_VALUE) { + stack->pos = stack->slot = 0; + if (( this = *root ) == NULL) { + goto next_root; + } + while (this != NULL) { + PUSH_NODE(stack, this); + this = this->left; + } + this = TOP_NODE(stack); + next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); + if (!((*doit)(tb, this, context, 1))) + return; + } else { + next = find_next(tb, *root, stack, lastkey); + } - while ((this = next) != NULL) { - next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); - if (!((*doit)(tb, this, context, 1))) - return; - } + while ((this = next) != NULL) { + next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); + if (!((*doit)(tb, this, context, 1))) + return; + } + +next_root: + if (!iter) + return; + root = context->root = catree_find_next_root(iter); + lastkey = THE_NON_VALUE; + } while(root); } /* * Traverse the tree with an update callback function, used by db_select_replace */ static void traverse_update_backwards(DbTableCommon *tb, - TreeDbTerm **root, DbTreeStack* stack, Eterm lastkey, int (*doit)(DbTableCommon*, TreeDbTerm**, - void*, + struct select_common*, int), - void* context) + struct select_common* context, + CATreeRootIterator* iter) { int res; TreeDbTerm *this, *next, **this_ptr; + TreeDbTerm** root = context->root; - if (lastkey == THE_NON_VALUE) { - stack->pos = stack->slot = 0; - if (( this = *root ) == NULL) { - return; + do { + if (lastkey == THE_NON_VALUE) { + stack->pos = stack->slot = 0; + if (( this = *root ) == NULL) { + goto next_root; + } + while (this != NULL) { + PUSH_NODE(stack, this); + this = this->right; + } + this = TOP_NODE(stack); + this_ptr = find_ptr(tb, root, stack, this); + ASSERT(this_ptr != NULL); + res = (*doit)(tb, this_ptr, context, 0); + REPLACE_TOP_NODE(stack, *this_ptr); + next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl)); + if (!res) + return; + } else { + next = find_prev(tb, *root, stack, lastkey); } - while (this != NULL) { - PUSH_NODE(stack, this); - this = this->right; + + while ((this = next) != NULL) { + this_ptr = find_ptr(tb, root, stack, this); + ASSERT(this_ptr != NULL); + res = (*doit)(tb, this_ptr, context, 0); + REPLACE_TOP_NODE(stack, *this_ptr); + next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl)); + if (!res) + return; } - this = TOP_NODE(stack); - this_ptr = find_ptr(tb, root, stack, this); - ASSERT(this_ptr != NULL); - res = (*doit)(tb, this_ptr, context, 0); - REPLACE_TOP_NODE(stack, *this_ptr); - next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl)); - if (!res) - return; - } else { - next = find_prev(tb, *root, stack, lastkey); - } - while ((this = next) != NULL) { - this_ptr = find_ptr(tb, root, stack, this); - ASSERT(this_ptr != NULL); - res = (*doit)(tb, this_ptr, context, 0); - REPLACE_TOP_NODE(stack, *this_ptr); - next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl)); - if (!res) +next_root: + if (!iter) return; - } + root = context->root = catree_find_prev_root(iter); + lastkey = THE_NON_VALUE; + } while (root); } static enum ms_key_boundness key_boundness(DbTableCommon *tb, @@ -3269,7 +3419,8 @@ static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done) } } -static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key) { +Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key) +{ int done = 0; Sint ret = do_cmp_partly_bound(partly_bound_key, bound_key, &done); #ifdef HARDDEBUG @@ -3485,7 +3636,8 @@ static int do_partly_bound_can_match_greater(Eterm a, Eterm b, * Callback functions for the different match functions */ -static int doit_select(DbTableCommon *tb, TreeDbTerm *this, void *ptr, +static int doit_select(DbTableCommon *tb, TreeDbTerm *this, + struct select_common* ptr, int forward) { struct select_context *sc = (struct select_context *) ptr; @@ -3520,7 +3672,8 @@ static int doit_select(DbTableCommon *tb, TreeDbTerm *this, void *ptr, return 1; } -static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, void *ptr, +static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, + struct select_common* ptr, int forward) { struct select_count_context *sc = (struct select_count_context *) ptr; @@ -3544,7 +3697,8 @@ static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, void *ptr, return 1; } -static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, void *ptr, +static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, + struct select_common* ptr, int forward) { struct select_context *sc = (struct select_context *) ptr; @@ -3582,7 +3736,8 @@ static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, void *ptr, } -static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr, +static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, + struct select_common *ptr, int forward) { struct select_delete_context *sc = (struct select_delete_context *) ptr; @@ -3601,7 +3756,7 @@ static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr, ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, NULL, 0); if (ret == am_true) { key = GETKEY(sc->tb, this->dbterm.tpl); - linkout_tree(sc->tb, sc->root, key, sc->stack); + linkout_tree(sc->tb, sc->common.root, key, sc->stack); sc->erase_lastterm = 1; ++sc->accum; } @@ -3611,7 +3766,8 @@ static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr, return 1; } -static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this, void *ptr, +static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this, + struct select_common* ptr, int forward) { struct select_replace_context *sc = (struct select_replace_context *) ptr; |