aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_db_tree.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_db_tree.c')
-rw-r--r--erts/emulator/beam/erl_db_tree.c1813
1 files changed, 1158 insertions, 655 deletions
diff --git a/erts/emulator/beam/erl_db_tree.c b/erts/emulator/beam/erl_db_tree.c
index 8c5fc0acb2..19b94b0634 100644
--- a/erts/emulator/beam/erl_db_tree.c
+++ b/erts/emulator/beam/erl_db_tree.c
@@ -48,33 +48,23 @@
#include "erl_binary.h"
#include "erl_db_tree.h"
+#include "erl_db_tree_util.h"
#define GETKEY_WITH_POS(Keypos, Tplp) (*((Tplp) + Keypos))
-#define NITEMS(tb) ((int)erts_atomic_read_nob(&(tb)->common.nitems))
-/*
-** A stack of this size is enough for an AVL tree with more than
-** 0xFFFFFFFF elements. May be subject to change if
-** the datatype of the element counter is changed to a 64 bit integer.
-** The Maximal height of an AVL tree is calculated as:
-** h(n) <= 1.4404 * log(n + 2) - 0.328
-** Where n denotes the number of nodes, h(n) the height of the tree
-** with n nodes and log is the binary logarithm.
-*/
-
-#define STACK_NEED 50
-#define TREE_MAX_ELEMENTS 0xFFFFFFFFUL
-
-#define PUSH_NODE(Dtt, Tdt) \
- ((Dtt)->array[(Dtt)->pos++] = Tdt)
-
-#define POP_NODE(Dtt) \
- (((Dtt)->pos) ? \
- (Dtt)->array[--((Dtt)->pos)] : NULL)
-
-#define TOP_NODE(Dtt) \
- ((Dtt->pos) ? \
- (Dtt)->array[(Dtt)->pos - 1] : NULL)
+#define NITEMS_CENTRALIZED(tb) \
+ ((Sint)erts_flxctr_read_centralized(&(tb)->common.counters, \
+ ERTS_DB_TABLE_NITEMS_COUNTER_ID))
+#define ADD_NITEMS(DB, TO_ADD) \
+ erts_flxctr_add(&(DB)->common.counters, ERTS_DB_TABLE_NITEMS_COUNTER_ID, TO_ADD)
+#define INC_NITEMS(DB) \
+ erts_flxctr_inc(&(DB)->common.counters, ERTS_DB_TABLE_NITEMS_COUNTER_ID)
+#define INC_NITEMS_CENTRALIZED(DB) \
+ erts_flxctr_inc_read_centralized(&(DB)->common.counters, ERTS_DB_TABLE_NITEMS_COUNTER_ID)
+#define RESET_NITEMS(DB) \
+ erts_flxctr_reset(&(DB)->common.counters, ERTS_DB_TABLE_NITEMS_COUNTER_ID)
+#define IS_CENTRALIZED_CTR(tb) (!(tb)->common.counters.is_decentralized)
+#define APPROX_MEM_CONSUMED(tb) erts_flxctr_read_approx(&(tb)->common.counters, ERTS_DB_TABLE_MEM_COUNTER_ID)
#define TOPN_NODE(Dtt, Pos) \
(((Pos) < Dtt->pos) ? \
@@ -89,10 +79,12 @@
/* Obtain table static stack if available. NULL if not.
** Must be released with release_stack()
*/
-static DbTreeStack* get_static_stack(DbTableTree* tb)
+ERTS_INLINE static DbTreeStack* get_static_stack(DbTableTree* tb)
{
- if (!erts_atomic_xchg_acqb(&tb->is_stack_busy, 1)) {
- return &tb->static_stack;
+ if (tb != NULL) {
+ ASSERT(IS_TREE_TABLE(tb->common.type));
+ if (!erts_atomic_xchg_acqb(&tb->is_stack_busy, 1))
+ return &tb->static_stack;
}
return NULL;
}
@@ -100,13 +92,15 @@ static DbTreeStack* get_static_stack(DbTableTree* tb)
/* Obtain static stack if available, otherwise empty dynamic stack.
** Must be released with release_stack()
*/
-static DbTreeStack* get_any_stack(DbTableTree* tb)
+static DbTreeStack* get_any_stack(DbTable* tb, DbTableTree* stack_container)
{
DbTreeStack* stack;
- if (!erts_atomic_xchg_acqb(&tb->is_stack_busy, 1)) {
- return &tb->static_stack;
+ if (stack_container != NULL) {
+ ASSERT(IS_TREE_TABLE(stack_container->common.type));
+ if (!erts_atomic_xchg_acqb(&stack_container->is_stack_busy, 1))
+ return &stack_container->static_stack;
}
- stack = erts_db_alloc(ERTS_ALC_T_DB_STK, (DbTable *) tb,
+ stack = erts_db_alloc(ERTS_ALC_T_DB_STK, tb,
sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
stack->pos = 0;
stack->slot = 0;
@@ -114,62 +108,62 @@ static DbTreeStack* get_any_stack(DbTableTree* tb)
return stack;
}
-static void release_stack(DbTableTree* tb, DbTreeStack* stack)
+static void release_stack(DbTable* tb, DbTableTree* stack_container, DbTreeStack* stack)
{
- if (stack == &tb->static_stack) {
- ASSERT(erts_atomic_read_nob(&tb->is_stack_busy) == 1);
- erts_atomic_set_relb(&tb->is_stack_busy, 0);
- }
- else {
- erts_db_free(ERTS_ALC_T_DB_STK, (DbTable *) tb,
- (void *) stack, sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
+ if (stack_container != NULL) {
+ ASSERT(IS_TREE_TABLE(stack_container->common.type));
+ if (stack == &stack_container->static_stack) {
+ ASSERT(erts_atomic_read_nob(&stack_container->is_stack_busy) == 1);
+ erts_atomic_set_relb(&stack_container->is_stack_busy, 0);
+ return;
+ }
}
+ erts_db_free(ERTS_ALC_T_DB_STK, tb,
+ (void *) stack, sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
}
-static ERTS_INLINE void reset_static_stack(DbTableTree* tb)
+static ERTS_INLINE void reset_stack(DbTreeStack* stack)
{
- tb->static_stack.pos = 0;
- tb->static_stack.slot = 0;
+ if (stack != NULL) {
+ stack->pos = 0;
+ stack->slot = 0;
+ }
}
-static ERTS_INLINE void free_term(DbTableTree *tb, TreeDbTerm* p)
+static ERTS_INLINE void reset_static_stack(DbTableTree* tb)
{
- db_free_term((DbTable*)tb, p, offsetof(TreeDbTerm, dbterm));
+ if (tb != NULL) {
+ ASSERT(IS_TREE_TABLE(tb->common.type));
+ reset_stack(&tb->static_stack);
+ }
}
-static ERTS_INLINE TreeDbTerm* new_dbterm(DbTableTree *tb, Eterm obj)
+static ERTS_INLINE TreeDbTerm* new_dbterm(DbTableCommon *tb, Eterm obj)
{
TreeDbTerm* p;
- if (tb->common.compress) {
- p = db_store_term_comp(&tb->common, NULL, offsetof(TreeDbTerm,dbterm), obj);
+ if (tb->compress) {
+ p = db_store_term_comp(tb, NULL, offsetof(TreeDbTerm,dbterm), obj);
}
else {
- p = db_store_term(&tb->common, NULL, offsetof(TreeDbTerm,dbterm), obj);
+ p = db_store_term(tb, NULL, offsetof(TreeDbTerm,dbterm), obj);
}
return p;
}
-static ERTS_INLINE TreeDbTerm* replace_dbterm(DbTableTree *tb, TreeDbTerm* old,
+static ERTS_INLINE TreeDbTerm* replace_dbterm(DbTableCommon *tb, TreeDbTerm* old,
Eterm obj)
{
TreeDbTerm* p;
ASSERT(old != NULL);
- if (tb->common.compress) {
- p = db_store_term_comp(&tb->common, &(old->dbterm), offsetof(TreeDbTerm,dbterm), obj);
+ if (tb->compress) {
+ p = db_store_term_comp(tb, &(old->dbterm), offsetof(TreeDbTerm,dbterm), obj);
}
else {
- p = db_store_term(&tb->common, &(old->dbterm), offsetof(TreeDbTerm,dbterm), obj);
+ p = db_store_term(tb, &(old->dbterm), offsetof(TreeDbTerm,dbterm), obj);
}
return p;
}
/*
-** Some macros for "direction stacks"
-*/
-#define DIR_LEFT 0
-#define DIR_RIGHT 1
-#define DIR_END 2
-
-/*
* Number of records to delete before trapping.
*/
#define DELETE_RECORD_LIMIT 12000
@@ -208,31 +202,37 @@ static void do_dump_tree2(DbTableTree*, int to, void *to_arg, int show,
** Datatypes
*/
+enum ms_key_boundness {
+ /* Order significant, larger means more "boundness" => less iteration */
+ MS_KEY_UNBOUND = 0,
+ MS_KEY_PARTIALLY_BOUND = 1,
+ MS_KEY_BOUND = 2,
+ MS_KEY_IMPOSSIBLE = 3
+};
+
/*
* This structure is filled in by analyze_pattern() for the select
* functions.
*/
struct mp_info {
- int something_can_match; /* The match_spec is not "impossible" */
- int some_limitation; /* There is some limitation on the search
- * area, i. e. least and/or most is set.*/
- int got_partial; /* The limitation has a partially bound
- * key */
+ enum ms_key_boundness key_boundness;
Eterm least; /* The lowest matching key (possibly
* partially bound expression) */
Eterm most; /* The highest matching key (possibly
* partially bound expression) */
-
- TreeDbTerm **save_term; /* If the key is completely bound, this
- * will be the Tree node we're searching
- * for, otherwise it will be useless */
Binary *mp; /* The compiled match program */
};
+struct select_common {
+ TreeDbTerm **root;
+};
+
+
/*
* Used by doit_select(_chunk)
*/
struct select_context {
+ struct select_common common;
Process *p;
Eterm accum;
Binary *mp;
@@ -248,6 +248,7 @@ struct select_context {
* Used by doit_select_count
*/
struct select_count_context {
+ struct select_common common;
Process *p;
Binary *mp;
Eterm end_condition;
@@ -261,8 +262,10 @@ struct select_count_context {
* Used by doit_select_delete
*/
struct select_delete_context {
+ struct select_common common;
Process *p;
- DbTableTree *tb;
+ DbTableCommon *tb;
+ DbTreeStack *stack;
Uint accum;
Binary *mp;
Eterm end_condition;
@@ -276,8 +279,9 @@ struct select_delete_context {
* Used by doit_select_replace
*/
struct select_replace_context {
+ struct select_common common;
Process *p;
- DbTableTree *tb;
+ DbTableCommon *tb;
Binary *mp;
Eterm end_condition;
Eterm *lastobj;
@@ -292,75 +296,83 @@ typedef int (*extra_match_validator_t)(int keypos, Eterm match, Eterm guard, Ete
/*
** Forward declarations
*/
-static TreeDbTerm *linkout_tree(DbTableTree *tb, Eterm key);
-static TreeDbTerm *linkout_object_tree(DbTableTree *tb,
- Eterm object);
+static TreeDbTerm *linkout_tree(DbTableCommon *tb, TreeDbTerm **root,
+ Eterm key, DbTreeStack *stack);
+static TreeDbTerm *linkout_object_tree(DbTableCommon *tb, TreeDbTerm **root,
+ Eterm object, DbTableTree *stack);
static SWord do_free_tree_continue(DbTableTree *tb, SWord reds);
-static void free_term(DbTableTree *tb, TreeDbTerm* p);
-static int balance_left(TreeDbTerm **this);
-static int balance_right(TreeDbTerm **this);
+static void free_term(DbTable *tb, TreeDbTerm* p);
+int tree_balance_left(TreeDbTerm **this);
+int tree_balance_right(TreeDbTerm **this);
static int delsub(TreeDbTerm **this);
-static TreeDbTerm *slot_search(Process *p, DbTableTree *tb, Sint slot);
-static TreeDbTerm *find_node(DbTableTree *tb, Eterm key);
-static TreeDbTerm **find_node2(DbTableTree *tb, Eterm key);
-static TreeDbTerm **find_ptr(DbTableTree *tb, DbTreeStack*, TreeDbTerm *this);
-static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack*, Eterm key);
-static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack*, Eterm key);
-static TreeDbTerm *find_next_from_pb_key(DbTableTree *tb, DbTreeStack*,
- Eterm key);
-static TreeDbTerm *find_prev_from_pb_key(DbTableTree *tb, DbTreeStack*,
- Eterm key);
-static void traverse_backwards(DbTableTree *tb,
+static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root, Sint slot,
+ DbTable *tb, DbTableTree *stack_container,
+ CATreeRootIterator *iter, int* is_EOT);
+static TreeDbTerm *find_node(DbTableCommon *tb, TreeDbTerm *root,
+ Eterm key, DbTableTree *stack_container);
+static TreeDbTerm **find_node2(DbTableCommon *tb, TreeDbTerm **root, Eterm key);
+static TreeDbTerm **find_ptr(DbTableCommon *tb, TreeDbTerm **root,
+ DbTreeStack *stack, TreeDbTerm *this);
+static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root,
+ DbTreeStack* stack, Eterm key);
+static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root,
+ DbTreeStack* stack, Eterm key);
+static TreeDbTerm *find_next_from_pb_key(DbTable*, TreeDbTerm*** rootpp,
+ DbTreeStack* stack, Eterm key,
+ CATreeRootIterator*);
+static TreeDbTerm *find_prev_from_pb_key(DbTable*, TreeDbTerm*** rootpp,
+ DbTreeStack* stack, Eterm key,
+ CATreeRootIterator*);
+typedef int traverse_doit_funcT(DbTableCommon*, TreeDbTerm*,
+ struct select_common*, int forward);
+
+static void traverse_backwards(DbTableCommon *tb,
DbTreeStack*,
Eterm lastkey,
- int (*doit)(DbTableTree *tb,
- TreeDbTerm *,
- void *,
- int),
- void *context);
-static void traverse_forward(DbTableTree *tb,
+ traverse_doit_funcT*,
+ struct select_common *context,
+ CATreeRootIterator*);
+static void traverse_forward(DbTableCommon *tb,
DbTreeStack*,
Eterm lastkey,
- int (*doit)(DbTableTree *tb,
- TreeDbTerm *,
- void *,
- int),
- void *context);
-static void traverse_update_backwards(DbTableTree *tb,
+ traverse_doit_funcT*,
+ struct select_common *context,
+ CATreeRootIterator*);
+static void traverse_update_backwards(DbTableCommon *tb,
DbTreeStack*,
Eterm lastkey,
- int (*doit)(DbTableTree *tb,
+ int (*doit)(DbTableCommon *tb,
TreeDbTerm **, // out
- void *,
+ struct select_common*,
int),
- void *context);
-static int key_given(DbTableTree *tb, Eterm pattern, TreeDbTerm ***ret,
- Eterm *partly_bound_key);
-static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key);
+ struct select_common*,
+ CATreeRootIterator*);
+static enum ms_key_boundness key_boundness(DbTableCommon *tb,
+ Eterm pattern, Eterm *keyp);
static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done);
-static int analyze_pattern(DbTableTree *tb, Eterm pattern,
+static int analyze_pattern(DbTableCommon *tb, Eterm pattern,
extra_match_validator_t extra_validator, /* Optional callback */
struct mp_info *mpi);
-static int doit_select(DbTableTree *tb,
- TreeDbTerm *this,
- void *ptr,
+static int doit_select(DbTableCommon *tb,
+ TreeDbTerm *this,
+ struct select_common* ptr,
int forward);
-static int doit_select_count(DbTableTree *tb,
+static int doit_select_count(DbTableCommon *tb,
TreeDbTerm *this,
- void *ptr,
+ struct select_common*,
int forward);
-static int doit_select_chunk(DbTableTree *tb,
+static int doit_select_chunk(DbTableCommon *tb,
TreeDbTerm *this,
- void *ptr,
+ struct select_common*,
int forward);
-static int doit_select_delete(DbTableTree *tb,
+static int doit_select_delete(DbTableCommon *tb,
TreeDbTerm *this,
- void *ptr,
+ struct select_common*,
int forward);
-static int doit_select_replace(DbTableTree *tb,
+static int doit_select_replace(DbTableCommon *tb,
TreeDbTerm **this_ptr,
- void *ptr,
+ struct select_common*,
int forward);
static int partly_bound_can_match_lesser(Eterm partly_bound_1,
@@ -396,24 +408,31 @@ static int db_erase_object_tree(DbTable *tbl, Eterm object,Eterm *ret);
static int db_slot_tree(Process *p, DbTable *tbl,
Eterm slot_term, Eterm *ret);
static int db_select_tree(Process *p, DbTable *tbl, Eterm tid,
- Eterm pattern, int reversed, Eterm *ret);
+ Eterm pattern, int reversed, Eterm *ret,
+ enum DbIterSafety);
static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid,
- Eterm pattern, Eterm *ret);
+ Eterm pattern, Eterm *ret, enum DbIterSafety);
static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Sint chunk_size,
- int reversed, Eterm *ret);
+ int reversed, Eterm *ret, enum DbIterSafety);
static int db_select_continue_tree(Process *p, DbTable *tbl,
- Eterm continuation, Eterm *ret);
+ Eterm continuation, Eterm *ret,
+ enum DbIterSafety*);
static int db_select_count_continue_tree(Process *p, DbTable *tbl,
- Eterm continuation, Eterm *ret);
+ Eterm continuation, Eterm *ret,
+ enum DbIterSafety*);
static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
- Eterm pattern, Eterm *ret);
+ Eterm pattern, Eterm *ret,
+ enum DbIterSafety);
static int db_select_delete_continue_tree(Process *p, DbTable *tbl,
- Eterm continuation, Eterm *ret);
+ Eterm continuation, Eterm *ret,
+ enum DbIterSafety*);
static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid,
- Eterm pattern, Eterm *ret);
+ Eterm pattern, Eterm *ret,
+ enum DbIterSafety);
static int db_select_replace_continue_tree(Process *p, DbTable *tbl,
- Eterm continuation, Eterm *ret);
+ Eterm continuation, Eterm *ret,
+ enum DbIterSafety*);
static int db_take_tree(Process *, DbTable *, Eterm, Eterm *);
static void db_print_tree(fmtfn_t to, void *to_arg,
int show, DbTable *tbl);
@@ -425,8 +444,12 @@ static void db_foreach_offheap_tree(DbTable *,
void (*)(ErlOffHeap *, void *),
void *);
-static SWord db_delete_all_objects_tree(Process* p, DbTable* tbl, SWord reds);
-
+static SWord db_delete_all_objects_tree(Process* p,
+ DbTable* tbl,
+ SWord reds,
+ Eterm* nitems_holder_wb);
+static Eterm db_delete_all_objects_get_nitems_from_holder_tree(Process* p,
+ Eterm nitems_holder);
#ifdef HARDDEBUG
static void db_check_table_tree(DbTable *tbl);
#endif
@@ -470,6 +493,7 @@ DbTableMethod db_tree =
db_select_replace_continue_tree,
db_take_tree,
db_delete_all_objects_tree,
+ db_delete_all_objects_get_nitems_from_holder_tree,
db_free_empty_table_tree,
db_free_table_continue_tree,
db_print_tree,
@@ -508,18 +532,18 @@ int db_create_tree(Process *p, DbTable *tbl)
return DB_ERROR_NONE;
}
-static int db_first_tree(Process *p, DbTable *tbl, Eterm *ret)
+int db_first_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
+ Eterm *ret, DbTableTree *stack_container)
{
- DbTableTree *tb = &tbl->tree;
DbTreeStack* stack;
TreeDbTerm *this;
- if (( this = tb->root ) == NULL) {
+ if (( this = root ) == NULL) {
*ret = am_EOT;
return DB_ERROR_NONE;
}
/* Walk down the tree to the left */
- if ((stack = get_static_stack(tb)) != NULL) {
+ if ((stack = get_static_stack(stack_container)) != NULL) {
stack->pos = stack->slot = 0;
}
while (this->left != NULL) {
@@ -529,23 +553,27 @@ static int db_first_tree(Process *p, DbTable *tbl, Eterm *ret)
if (stack) {
PUSH_NODE(stack, this);
stack->slot = 1;
- release_stack(tb,stack);
+ release_stack(tbl,stack_container,stack);
}
*ret = db_copy_key(p, tbl, &this->dbterm);
return DB_ERROR_NONE;
}
-static int db_next_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+static int db_first_tree(Process *p, DbTable *tbl, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- DbTreeStack* stack;
+ return db_first_tree_common(p, tbl, tb->root, ret, tb);
+}
+
+int db_next_tree_common(Process *p, DbTable *tbl,
+ TreeDbTerm *root, Eterm key,
+ Eterm *ret, DbTreeStack* stack)
+{
TreeDbTerm *this;
- if (is_atom(key) && key == am_EOT)
+ if (key == am_EOT)
return DB_ERROR_BADKEY;
- stack = get_any_stack(tb);
- this = find_next(tb, stack, key);
- release_stack(tb,stack);
+ this = find_next(&tbl->common, root, stack, key);
if (this == NULL) {
*ret = am_EOT;
return DB_ERROR_NONE;
@@ -554,18 +582,27 @@ static int db_next_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
return DB_ERROR_NONE;
}
-static int db_last_tree(Process *p, DbTable *tbl, Eterm *ret)
+static int db_next_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ DbTreeStack* stack = get_any_stack(tbl, tb);
+ int ret_val = db_next_tree_common(p, tbl, tb->root, key, ret, stack);
+ release_stack(tbl,tb,stack);
+ return ret_val;
+}
+
+int db_last_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
+ Eterm *ret, DbTableTree *stack_container)
+{
TreeDbTerm *this;
DbTreeStack* stack;
- if (( this = tb->root ) == NULL) {
+ if (( this = root ) == NULL) {
*ret = am_EOT;
return DB_ERROR_NONE;
}
/* Walk down the tree to the right */
- if ((stack = get_static_stack(tb)) != NULL) {
+ if ((stack = get_static_stack(stack_container)) != NULL) {
stack->pos = stack->slot = 0;
}
while (this->right != NULL) {
@@ -574,24 +611,28 @@ static int db_last_tree(Process *p, DbTable *tbl, Eterm *ret)
}
if (stack) {
PUSH_NODE(stack, this);
- stack->slot = NITEMS(tb);
- release_stack(tb,stack);
+ /* Always centralized counters when static stack is used */
+ stack->slot = NITEMS_CENTRALIZED(tbl);
+ release_stack(tbl,stack_container,stack);
}
*ret = db_copy_key(p, tbl, &this->dbterm);
return DB_ERROR_NONE;
}
-static int db_prev_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+static int db_last_tree(Process *p, DbTable *tbl, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_last_tree_common(p, tbl, tb->root, ret, tb);
+}
+
+int db_prev_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, Eterm key,
+ Eterm *ret, DbTreeStack* stack)
+{
TreeDbTerm *this;
- DbTreeStack* stack;
- if (is_atom(key) && key == am_EOT)
+ if (key == am_EOT)
return DB_ERROR_BADKEY;
- stack = get_any_stack(tb);
- this = find_prev(tb, stack, key);
- release_stack(tb,stack);
+ this = find_prev(&tbl->common, root, stack, key);
if (this == NULL) {
*ret = am_EOT;
return DB_ERROR_NONE;
@@ -600,25 +641,30 @@ static int db_prev_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
return DB_ERROR_NONE;
}
-static ERTS_INLINE Sint cmp_key(DbTableTree* tb, Eterm key, TreeDbTerm* obj) {
- return CMP(key, GETKEY(tb,obj->dbterm.tpl));
+static int db_prev_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ DbTreeStack* stack = get_any_stack(tbl, tb);
+ int res = db_prev_tree_common(p, tbl, tb->root, key, ret, stack);
+ release_stack(tbl,tb,stack);
+ return res;
}
-static ERTS_INLINE int cmp_key_eq(DbTableTree* tb, Eterm key, TreeDbTerm* obj) {
+static ERTS_INLINE int cmp_key_eq(DbTableCommon* tb, Eterm key, TreeDbTerm* obj) {
Eterm obj_key = GETKEY(tb,obj->dbterm.tpl);
return is_same(key, obj_key) || CMP(key, obj_key) == 0;
}
-static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail)
+int db_put_tree_common(DbTableCommon *tb, TreeDbTerm **root, Eterm obj,
+ int key_clash_fail, DbTableTree *stack_container)
{
- DbTableTree *tb = &tbl->tree;
/* Non recursive insertion in AVL tree, building our own stack */
TreeDbTerm **tstack[STACK_NEED];
int tpos = 0;
int dstack[STACK_NEED+1];
int dpos = 0;
int state = 0;
- TreeDbTerm **this = &tb->root;
+ TreeDbTerm **this = root;
Sint c;
Eterm key;
int dir;
@@ -626,16 +672,13 @@ static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail)
key = GETKEY(tb, tuple_val(obj));
- reset_static_stack(tb);
+ reset_static_stack(stack_container);
dstack[dpos++] = DIR_END;
for (;;)
if (!*this) { /* Found our place */
state = 1;
- if (erts_atomic_inc_read_nob(&tb->common.nitems) >= TREE_MAX_ELEMENTS) {
- erts_atomic_dec_nob(&tb->common.nitems);
- return DB_ERROR_SYSRES;
- }
+ INC_NITEMS(((DbTable*)tb));
*this = new_dbterm(tb, obj);
(*this)->balance = 0;
(*this)->left = (*this)->right = NULL;
@@ -724,9 +767,15 @@ static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail)
return DB_ERROR_NONE;
}
-static int db_get_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail)
{
DbTableTree *tb = &tbl->tree;
+ return db_put_tree_common(&tb->common, &tb->root, obj, key_clash_fail, tb);
+}
+
+int db_get_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm *root, Eterm key,
+ Eterm *ret, DbTableTree *stack_container)
+{
Eterm copy;
Eterm *hp, *hend;
TreeDbTerm *this;
@@ -737,13 +786,13 @@ static int db_get_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
* The list created around it is purely for interface conformance.
*/
- this = find_node(tb,key);
+ this = find_node(tb,root,key,stack_container);
if (this == NULL) {
*ret = NIL;
} else {
hp = HAlloc(p, this->dbterm.size + 2);
hend = hp + this->dbterm.size + 2;
- copy = db_copy_object_from_ets(&tb->common, &this->dbterm, &hp, &MSO(p));
+ copy = db_copy_object_from_ets(tb, &this->dbterm, &hp, &MSO(p));
*ret = CONS(hp, copy, NIL);
hp += 2;
HRelease(p,hend,hp);
@@ -751,18 +800,28 @@ static int db_get_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
return DB_ERROR_NONE;
}
-static int db_member_tree(DbTable *tbl, Eterm key, Eterm *ret)
+static int db_get_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_get_tree_common(p, &tb->common, tb->root, key, ret, tb);
+}
- *ret = (find_node(tb,key) == NULL) ? am_false : am_true;
+int db_member_tree_common(DbTableCommon *tb, TreeDbTerm *root, Eterm key, Eterm *ret,
+ DbTableTree *stack_container)
+{
+ *ret = (find_node(tb,root,key,stack_container) == NULL) ? am_false : am_true;
return DB_ERROR_NONE;
}
-static int db_get_element_tree(Process *p, DbTable *tbl,
- Eterm key, int ndex, Eterm *ret)
+static int db_member_tree(DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_member_tree_common(&tb->common, tb->root, key, ret, tb);
+}
+
+int db_get_element_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm *root, Eterm key,
+ int ndex, Eterm *ret, DbTableTree *stack_container)
+{
/*
* Look the node up:
*/
@@ -776,54 +835,74 @@ static int db_get_element_tree(Process *p, DbTable *tbl,
* around the element here either.
*/
- this = find_node(tb,key);
+ this = find_node(tb,root,key,stack_container);
if (this == NULL) {
return DB_ERROR_BADKEY;
} else {
if (ndex > arityval(this->dbterm.tpl[0])) {
return DB_ERROR_BADPARAM;
}
- *ret = db_copy_element_from_ets(&tb->common, p, &this->dbterm, ndex, &hp, 0);
+ *ret = db_copy_element_from_ets(tb, p, &this->dbterm, ndex, &hp, 0);
}
return DB_ERROR_NONE;
}
-static int db_erase_tree(DbTable *tbl, Eterm key, Eterm *ret)
+static int db_get_element_tree(Process *p, DbTable *tbl,
+ Eterm key, int ndex, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_get_element_tree_common(p, &tb->common, tb->root, key,
+ ndex, ret, tb);
+}
+
+int db_erase_tree_common(DbTable *tbl, TreeDbTerm **root, Eterm key, Eterm *ret,
+ DbTreeStack *stack /* NULL if no static stack */)
+{
TreeDbTerm *res;
*ret = am_true;
- if ((res = linkout_tree(tb, key)) != NULL) {
- free_term(tb, res);
+ if ((res = linkout_tree(&tbl->common, root,key, stack)) != NULL) {
+ free_term(tbl, res);
}
return DB_ERROR_NONE;
}
-static int db_erase_object_tree(DbTable *tbl, Eterm object, Eterm *ret)
+static int db_erase_tree(DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_erase_tree_common(tbl, &tb->root, key, ret, &tb->static_stack);
+}
+
+int db_erase_object_tree_common(DbTable *tbl, TreeDbTerm **root, Eterm object,
+ Eterm *ret, DbTableTree *stack_container)
+{
TreeDbTerm *res;
*ret = am_true;
- if ((res = linkout_object_tree(tb, object)) != NULL) {
- free_term(tb, res);
+ if ((res = linkout_object_tree(&tbl->common, root, object, stack_container)) != NULL) {
+ free_term(tbl, res);
}
return DB_ERROR_NONE;
}
-
-static int db_slot_tree(Process *p, DbTable *tbl,
- Eterm slot_term, Eterm *ret)
+static int db_erase_object_tree(DbTable *tbl, Eterm object, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_erase_object_tree_common(tbl, &tb->root, object, ret, tb);
+}
+
+int db_slot_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
+ Eterm slot_term, Eterm *ret,
+ DbTableTree *stack_container,
+ CATreeRootIterator *iter)
+{
Sint slot;
TreeDbTerm *st;
Eterm *hp, *hend;
Eterm copy;
-
+ int is_EOT = 0;
/*
* The notion of a "slot" is not natural in a tree, but we try to
* simulate it by giving the n'th node in the tree instead.
@@ -834,10 +913,10 @@ static int db_slot_tree(Process *p, DbTable *tbl,
if (is_not_small(slot_term) ||
((slot = signed_val(slot_term)) < 0) ||
- (slot > NITEMS(tb)))
+ (IS_CENTRALIZED_CTR(tbl) && slot > NITEMS_CENTRALIZED(tbl)))
return DB_ERROR_BADPARAM;
- if (slot == NITEMS(tb)) {
+ if (IS_CENTRALIZED_CTR(tbl) && slot == NITEMS_CENTRALIZED(tbl)) {
*ret = am_EOT;
return DB_ERROR_NONE;
}
@@ -847,20 +926,31 @@ static int db_slot_tree(Process *p, DbTable *tbl,
* are counted from 1 and up.
*/
++slot;
- st = slot_search(p, tb, slot);
+ st = slot_search(p, root, slot, tbl, stack_container, iter, &is_EOT);
+ if (is_EOT) {
+ *ret = am_EOT;
+ return DB_ERROR_NONE;
+ }
if (st == NULL) {
*ret = am_false;
return DB_ERROR_UNSPEC;
}
hp = HAlloc(p, st->dbterm.size + 2);
hend = hp + st->dbterm.size + 2;
- copy = db_copy_object_from_ets(&tb->common, &st->dbterm, &hp, &MSO(p));
+ copy = db_copy_object_from_ets(&tbl->common, &st->dbterm, &hp, &MSO(p));
*ret = CONS(hp, copy, NIL);
hp += 2;
HRelease(p,hend,hp);
return DB_ERROR_NONE;
}
+static int db_slot_tree(Process *p, DbTable *tbl,
+ Eterm slot_term, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ return db_slot_tree_common(p, tbl, tb->root, slot_term, ret, tb, NULL);
+}
+
static BIF_RETTYPE ets_select_reverse(BIF_ALIST_3)
@@ -926,19 +1016,14 @@ static BIF_RETTYPE bif_trap3(Export *bif,
{
BIF_TRAP3(bif, p, p1, p2, p3);
}
-
-/*
-** This is called either when the select bif traps or when ets:select/1
-** is called. It does mostly the same as db_select_tree and may in either case
-** trap to itself again (via the ets:select/1 bif).
-** Note that this is common for db_select_tree and db_select_chunk_tree.
-*/
-static int db_select_continue_tree(Process *p,
- DbTable *tbl,
- Eterm continuation,
- Eterm *ret)
+
+int db_select_continue_tree_common(Process *p,
+ DbTableCommon *tb,
+ Eterm continuation,
+ Eterm *ret,
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
- DbTableTree *tb = &tbl->tree;
DbTreeStack* stack;
struct select_context sc;
unsigned sz;
@@ -951,7 +1036,6 @@ static int db_select_continue_tree(Process *p,
Sint chunk_size;
Sint reverse;
-
#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);
/* Decode continuation. We know it's a tuple but not the arity or
@@ -980,28 +1064,37 @@ static int db_select_continue_tree(Process *p,
sc.end_condition = NIL;
sc.lastobj = NULL;
sc.max = 1000;
- sc.keypos = tb->common.keypos;
+ sc.keypos = tb->keypos;
sc.chunk_size = chunk_size;
reverse = unsigned_val(tptr[7]);
sc.got = signed_val(tptr[8]);
- stack = get_any_stack(tb);
- if (chunk_size) {
- if (reverse) {
- traverse_backwards(tb, stack, lastkey, &doit_select_chunk, &sc);
- } else {
- traverse_forward(tb, stack, lastkey, &doit_select_chunk, &sc);
- }
- } else {
- if (reverse) {
- traverse_forward(tb, stack, lastkey, &doit_select, &sc);
- } else {
- traverse_backwards(tb, stack, lastkey, &doit_select, &sc);
- }
+ if (iter) {
+ iter->next_route_key = lastkey;
+ sc.common.root = catree_find_nextprev_root(iter, !!reverse != !!chunk_size, NULL);
}
- release_stack(tb,stack);
+ else
+ sc.common.root = &((DbTableTree*)tb)->root;
+
+ if (sc.common.root) {
+ stack = get_any_stack((DbTable*)tb, stack_container);
+ if (chunk_size) {
+ if (reverse) {
+ traverse_backwards(tb, stack, lastkey, &doit_select_chunk, &sc.common, iter);
+ } else {
+ traverse_forward(tb, stack, lastkey, &doit_select_chunk, &sc.common, iter);
+ }
+ } else {
+ if (reverse) {
+ traverse_forward(tb, stack, lastkey, &doit_select, &sc.common, iter);
+ } else {
+ traverse_backwards(tb, stack, lastkey, &doit_select, &sc.common, iter);
+ }
+ }
+ release_stack((DbTable*)tb,stack_container,stack);
- BUMP_REDS(p, 1000 - sc.max);
+ BUMP_REDS(p, 1000 - sc.max);
+ }
if (sc.max > 0 || (chunk_size && sc.got == chunk_size)) {
if (chunk_size) {
@@ -1082,13 +1175,30 @@ static int db_select_continue_tree(Process *p,
#undef RET_TO_BIF
}
+
+/*
+** This is called either when the select bif traps or when ets:select/1
+** is called. It does mostly the same as db_select_tree and may in either case
+** trap to itself again (via the ets:select/1 bif).
+** Note that this is common for db_select_tree and db_select_chunk_tree.
+*/
+static int db_select_continue_tree(Process *p,
+ DbTable *tbl,
+ Eterm continuation,
+ Eterm *ret,
+ enum DbIterSafety* safety_p)
+{
+ DbTableTree *tb = &tbl->tree;
+ return db_select_continue_tree_common(p, &tb->common,
+ continuation, ret, tb, NULL);
+}
-
-static int db_select_tree(Process *p, DbTable *tbl, Eterm tid,
- Eterm pattern, int reverse, Eterm *ret)
+int db_select_tree_common(Process *p, DbTable *tb,
+ Eterm tid, Eterm pattern, int reverse, Eterm *ret,
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
/* Strategy: Traverse backwards to build resulting list from tail to head */
- DbTableTree *tb = &tbl->tree;
DbTreeStack* stack;
struct select_context sc;
struct mp_info mpi;
@@ -1121,42 +1231,62 @@ static int db_select_tree(Process *p, DbTable *tbl, Eterm tid,
sc.got = 0;
sc.chunk_size = 0;
- if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
- if (!mpi.something_can_match) {
+ if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
RET_TO_BIF(NIL,DB_ERROR_NONE);
/* can't possibly match anything */
}
sc.mp = mpi.mp;
- if (!mpi.got_partial && mpi.some_limitation &&
- CMP_EQ(mpi.least,mpi.most)) {
- doit_select(tb,*(mpi.save_term),&sc,0 /* direction doesn't matter */);
+ if (mpi.key_boundness == MS_KEY_BOUND) {
+ ASSERT(CMP_EQ(mpi.least, mpi.most));
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &tb->tree.root;
+ this = find_node(&tb->common, *sc.common.root, mpi.least, NULL);
+ if (this)
+ doit_select(&tb->common, this, &sc.common, 0 /* direction doesn't matter */);
RET_TO_BIF(sc.accum,DB_ERROR_NONE);
}
- stack = get_any_stack(tb);
+ stack = get_any_stack((DbTable*)tb,stack_container);
if (reverse) {
- if (mpi.some_limitation) {
- if ((this = find_prev_from_pb_key(tb, stack, mpi.least)) != NULL) {
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_prev_from_pb_key(tb, &sc.common.root, stack, mpi.least, iter);
+ if (this)
lastkey = GETKEY(tb, this->dbterm.tpl);
- }
sc.end_condition = mpi.most;
}
- traverse_forward(tb, stack, lastkey, &doit_select, &sc);
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_first_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
+ traverse_forward(&tb->common, stack, lastkey, &doit_select, &sc.common, iter);
} else {
- if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(tb, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
- traverse_backwards(tb, stack, lastkey, &doit_select, &sc);
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
+ traverse_backwards(&tb->common, stack, lastkey, &doit_select, &sc.common, iter);
}
- release_stack(tb,stack);
+ release_stack((DbTable*)tb,stack_container,stack);
#ifdef HARDDEBUG
erts_fprintf(stderr,"Least: %T\n", mpi.least);
erts_fprintf(stderr,"Most: %T\n", mpi.most);
@@ -1192,16 +1322,21 @@ static int db_select_tree(Process *p, DbTable *tbl, Eterm tid,
}
-
-/*
-** This is called either when the select_count bif traps.
-*/
-static int db_select_count_continue_tree(Process *p,
- DbTable *tbl,
- Eterm continuation,
- Eterm *ret)
+static int db_select_tree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, int reverse, Eterm *ret,
+ enum DbIterSafety safety)
+{
+ return db_select_tree_common(p, tbl, tid,
+ pattern, reverse, ret, &tbl->tree, NULL);
+}
+
+int db_select_count_continue_tree_common(Process *p,
+ DbTable *tb,
+ Eterm continuation,
+ Eterm *ret,
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
- DbTableTree *tb = &tbl->tree;
DbTreeStack* stack;
struct select_count_context sc;
unsigned sz;
@@ -1213,7 +1348,6 @@ static int db_select_count_continue_tree(Process *p,
Eterm *tptr;
Eterm egot;
-
#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);
/* Decode continuation. We know it's a tuple and everything else as
@@ -1245,11 +1379,21 @@ static int db_select_count_continue_tree(Process *p,
sc.got = unsigned_val(tptr[5]);
}
- stack = get_any_stack(tb);
- traverse_backwards(tb, stack, lastkey, &doit_select_count, &sc);
- release_stack(tb,stack);
+ if (iter) {
+ iter->next_route_key = lastkey;
+ sc.common.root = catree_find_prev_root(iter, NULL);
+ }
+ else {
+ sc.common.root = &tb->tree.root;
+ }
- BUMP_REDS(p, 1000 - sc.max);
+ if (sc.common.root) {
+ stack = get_any_stack(tb, stack_container);
+ traverse_backwards(&tb->common, stack, lastkey, &doit_select_count, &sc.common, iter);
+ release_stack(tb,stack_container,stack);
+
+ BUMP_REDS(p, 1000 - sc.max);
+ }
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.got,p), DB_ERROR_NONE);
@@ -1285,11 +1429,26 @@ static int db_select_count_continue_tree(Process *p,
#undef RET_TO_BIF
}
-
-static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid,
- Eterm pattern, Eterm *ret)
+/*
+** This is called either when the select_count bif traps.
+*/
+static int db_select_count_continue_tree(Process *p,
+ DbTable *tbl,
+ Eterm continuation,
+ Eterm *ret,
+ enum DbIterSafety* safety_p)
{
DbTableTree *tb = &tbl->tree;
+ return db_select_count_continue_tree_common(p, tbl,
+ continuation, ret, tb, NULL);
+}
+
+
+int db_select_count_tree_common(Process *p, DbTable *tb,
+ Eterm tid, Eterm pattern, Eterm *ret,
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
+{
DbTreeStack* stack;
struct select_count_context sc;
struct mp_info mpi;
@@ -1303,7 +1462,6 @@ static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid,
Eterm egot;
Eterm mpb;
-
#define RET_TO_BIF(Term,RetVal) do { \
if (mpi.mp != NULL) { \
erts_bin_free(mpi.mp); \
@@ -1321,33 +1479,46 @@ static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid,
sc.keypos = tb->common.keypos;
sc.got = 0;
- if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
- if (!mpi.something_can_match) {
+ if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
RET_TO_BIF(make_small(0),DB_ERROR_NONE);
/* can't possibly match anything */
}
sc.mp = mpi.mp;
- if (!mpi.got_partial && mpi.some_limitation &&
- CMP_EQ(mpi.least,mpi.most)) {
- doit_select_count(tb,*(mpi.save_term),&sc,0 /* dummy */);
+ if (mpi.key_boundness == MS_KEY_BOUND) {
+ ASSERT(CMP_EQ(mpi.least, mpi.most));
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &((DbTable*)tb)->tree.root;
+ this = find_node(&tb->common, *sc.common.root, mpi.least, NULL);
+ if (this)
+ doit_select_count(&tb->common, this, &sc.common, 0 /* dummy */);
RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE);
}
- stack = get_any_stack(tb);
- if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ stack = get_any_stack((DbTable*)tb, stack_container);
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(tb, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
- traverse_backwards(tb, stack, lastkey, &doit_select_count, &sc);
- release_stack(tb,stack);
+ traverse_backwards(&tb->common, stack, lastkey, &doit_select_count, &sc.common, iter);
+ release_stack((DbTable*)tb,stack_container,stack);
BUMP_REDS(p, 1000 - sc.max);
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE);
@@ -1383,12 +1554,22 @@ static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid,
}
-static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
- Eterm pattern, Sint chunk_size,
- int reverse,
- Eterm *ret)
+static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret,
+ enum DbIterSafety safety)
{
DbTableTree *tb = &tbl->tree;
+ return db_select_count_tree_common(p, tbl,
+ tid, pattern, ret, tb, NULL);
+}
+
+
+int db_select_chunk_tree_common(Process *p, DbTable *tb,
+ Eterm tid, Eterm pattern, Sint chunk_size,
+ int reverse, Eterm *ret,
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
+{
DbTreeStack* stack;
struct select_context sc;
struct mp_info mpi;
@@ -1401,7 +1582,6 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
int errcode;
Eterm mpb;
-
#define RET_TO_BIF(Term,RetVal) do { \
if (mpi.mp != NULL) { \
erts_bin_free(mpi.mp); \
@@ -1421,20 +1601,26 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
sc.got = 0;
sc.chunk_size = chunk_size;
- if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
- if (!mpi.something_can_match) {
+ if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
RET_TO_BIF(am_EOT,DB_ERROR_NONE);
/* can't possibly match anything */
}
sc.mp = mpi.mp;
- if (!mpi.got_partial && mpi.some_limitation &&
- CMP_EQ(mpi.least,mpi.most)) {
- doit_select(tb,*(mpi.save_term),&sc, 0 /* direction doesn't matter */);
+ if (mpi.key_boundness == MS_KEY_BOUND) {
+ ASSERT(CMP_EQ(mpi.least, mpi.most));
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &tb->tree.root;
+ this = find_node(&tb->common, *sc.common.root, mpi.least, NULL);
+ if (this)
+ doit_select(&tb->common, this, &sc.common, 0 /* direction doesn't matter */);
if (sc.accum != NIL) {
hp=HAlloc(p, 3);
RET_TO_BIF(TUPLE2(hp,sc.accum,am_EOT),DB_ERROR_NONE);
@@ -1443,25 +1629,39 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
}
}
- stack = get_any_stack(tb);
+ stack = get_any_stack((DbTable*)tb,stack_container);
if (reverse) {
- if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(tb, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
- traverse_backwards(tb, stack, lastkey, &doit_select_chunk, &sc);
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
+ traverse_backwards(&tb->common, stack, lastkey, &doit_select_chunk, &sc.common, iter);
} else {
- if (mpi.some_limitation) {
- if ((this = find_prev_from_pb_key(tb, stack, mpi.least)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_prev_from_pb_key(tb, &sc.common.root, stack, mpi.least, iter);
+ if (this)
+ lastkey = GETKEY(tb, this->dbterm.tpl);
sc.end_condition = mpi.most;
}
- traverse_forward(tb, stack, lastkey, &doit_select_chunk, &sc);
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_first_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
+ traverse_forward(&tb->common, stack, lastkey, &doit_select_chunk, &sc.common, iter);
}
- release_stack(tb,stack);
+ release_stack((DbTable*)tb,stack_container,stack);
BUMP_REDS(p, 1000 - sc.max);
if (sc.max > 0 || sc.got == chunk_size) {
@@ -1530,15 +1730,25 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
}
-/*
-** This is called when select_delete traps
-*/
-static int db_select_delete_continue_tree(Process *p,
- DbTable *tbl,
- Eterm continuation,
- Eterm *ret)
+static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Sint chunk_size,
+ int reverse,
+ Eterm *ret, enum DbIterSafety safety)
{
DbTableTree *tb = &tbl->tree;
+ return db_select_chunk_tree_common(p, tbl,
+ tid, pattern, chunk_size,
+ reverse, ret, tb, NULL);
+}
+
+
+int db_select_delete_continue_tree_common(Process *p,
+ DbTable *tbl,
+ Eterm continuation,
+ Eterm *ret,
+ DbTreeStack* stack,
+ CATreeRootIterator* iter)
+{
struct select_delete_context sc;
unsigned sz;
Eterm *hp;
@@ -1549,10 +1759,9 @@ static int db_select_delete_continue_tree(Process *p,
Eterm *tptr;
Eterm eaccsum;
-
#define RET_TO_BIF(Term, State) do { \
if (sc.erase_lastterm) { \
- free_term(tb, sc.lastterm); \
+ free_term(tbl, sc.lastterm); \
} \
*ret = (Term); \
return State; \
@@ -1571,7 +1780,8 @@ static int db_select_delete_continue_tree(Process *p,
mp = erts_db_get_match_prog_binary_unchecked(tptr[4]);
sc.p = p;
- sc.tb = tb;
+ sc.tb = &tbl->common;
+ sc.stack = stack;
if (is_big(tptr[5])) {
sc.accum = big_to_uint32(tptr[5]);
} else {
@@ -1580,17 +1790,26 @@ static int db_select_delete_continue_tree(Process *p,
sc.mp = mp;
sc.end_condition = NIL;
sc.max = 1000;
- sc.keypos = tb->common.keypos;
+ sc.keypos = tbl->common.keypos;
- ASSERT(!erts_atomic_read_nob(&tb->is_stack_busy));
- traverse_backwards(tb, &tb->static_stack, lastkey, &doit_select_delete, &sc);
+ if (iter) {
+ iter->next_route_key = lastkey;
+ sc.common.root = catree_find_prev_root(iter, NULL);
+ }
+ else {
+ sc.common.root = &tbl->tree.root;
+ }
- BUMP_REDS(p, 1000 - sc.max);
+ if (sc.common.root) {
+ traverse_backwards(&tbl->common, stack, lastkey, &doit_select_delete, &sc.common, iter);
+
+ BUMP_REDS(p, 1000 - sc.max);
+ }
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.accum, p), DB_ERROR_NONE);
}
- key = GETKEY(tb, (sc.lastterm)->dbterm.tpl);
+ key = GETKEY(&tbl->common, (sc.lastterm)->dbterm.tpl);
if (end_condition != NIL &&
cmp_partly_bound(end_condition,key) > 0) { /* done anyway */
RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE);
@@ -1620,10 +1839,24 @@ static int db_select_delete_continue_tree(Process *p,
#undef RET_TO_BIF
}
-static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
- Eterm pattern, Eterm *ret)
+static int db_select_delete_continue_tree(Process *p,
+ DbTable *tbl,
+ Eterm continuation,
+ Eterm *ret,
+ enum DbIterSafety* safety_p)
{
DbTableTree *tb = &tbl->tree;
+ ASSERT(!erts_atomic_read_nob(&tb->is_stack_busy));
+ return db_select_delete_continue_tree_common(p, tbl, continuation, ret,
+ &tb->static_stack, NULL);
+}
+
+int db_select_delete_tree_common(Process *p, DbTable *tbl,
+ Eterm tid, Eterm pattern,
+ Eterm *ret,
+ DbTreeStack* stack,
+ CATreeRootIterator* iter)
+{
struct select_delete_context sc;
struct mp_info mpi;
Eterm lastkey = THE_NON_VALUE;
@@ -1641,7 +1874,7 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
erts_bin_free(mpi.mp); \
} \
if (sc.erase_lastterm) { \
- free_term(tb, sc.lastterm); \
+ free_term(tbl, sc.lastterm); \
} \
*ret = (Term); \
return RetVal; \
@@ -1655,42 +1888,57 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
sc.p = p;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->common.keypos;
- sc.tb = tb;
+ sc.keypos = tbl->common.keypos;
+ sc.tb = &tbl->common;
+ sc.stack = stack;
- if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tbl->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(0,errcode);
}
- if (!mpi.something_can_match) {
+ if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
RET_TO_BIF(make_small(0),DB_ERROR_NONE);
/* can't possibly match anything */
}
sc.mp = mpi.mp;
- if (!mpi.got_partial && mpi.some_limitation &&
- CMP_EQ(mpi.least,mpi.most)) {
- doit_select_delete(tb,*(mpi.save_term),&sc, 0 /* direction doesn't
+ if (mpi.key_boundness == MS_KEY_BOUND) {
+ ASSERT(CMP_EQ(mpi.least, mpi.most));
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &tbl->tree.root;
+ this = find_node(&tbl->common, *sc.common.root, mpi.least, NULL);
+ if (this)
+ doit_select_delete(&tbl->common, this, &sc.common, 0 /* direction doesn't
matter */);
RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE);
}
- if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(tb, &tb->static_stack, mpi.most)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_next_from_pb_key(tbl, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(&tbl->common, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tbl->tree.root;
+ }
- traverse_backwards(tb, &tb->static_stack, lastkey, &doit_select_delete, &sc);
+ traverse_backwards(&tbl->common, stack, lastkey,
+ &doit_select_delete, &sc.common, iter);
BUMP_REDS(p, 1000 - sc.max);
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.accum,p), DB_ERROR_NONE);
}
- key = GETKEY(tb, (sc.lastterm)->dbterm.tpl);
+ key = GETKEY(&tbl->common, (sc.lastterm)->dbterm.tpl);
sz = size_object(key);
if (IS_USMALL(0, sc.accum)) {
hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6);
@@ -1714,7 +1962,7 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
/* Don't free mpi.mp, so don't use macro */
if (sc.erase_lastterm) {
- free_term(tb, sc.lastterm);
+ free_term(tbl, sc.lastterm);
}
*ret = bif_trap1(&ets_select_delete_continue_exp, p, continuation);
return DB_ERROR_NONE;
@@ -1723,12 +1971,22 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
}
-static int db_select_replace_continue_tree(Process *p,
+static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret,
+ enum DbIterSafety safety)
+{
+ DbTableTree *tb = &tbl->tree;
+ return db_select_delete_tree_common(p, tbl, tid, pattern, ret,
+ &tb->static_stack, NULL);
+}
+
+int db_select_replace_continue_tree_common(Process *p,
DbTable *tbl,
Eterm continuation,
- Eterm *ret)
+ Eterm *ret,
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
- DbTableTree *tb = &tbl->tree;
DbTreeStack* stack;
struct select_replace_context sc;
unsigned sz;
@@ -1764,7 +2022,7 @@ static int db_select_replace_continue_tree(Process *p,
sc.end_condition = NIL;
sc.lastobj = NULL;
sc.max = 1000;
- sc.keypos = tb->common.keypos;
+ sc.keypos = tbl->common.keypos;
if (is_big(tptr[5])) {
sc.replaced = big_to_uint32(tptr[5]);
} else {
@@ -1772,9 +2030,18 @@ static int db_select_replace_continue_tree(Process *p,
}
prev_replaced = sc.replaced;
- stack = get_any_stack(tb);
- traverse_update_backwards(tb, stack, lastkey, &doit_select_replace, &sc);
- release_stack(tb,stack);
+ if (iter) {
+ iter->next_route_key = lastkey;
+ sc.common.root = catree_find_prev_root(iter, NULL);
+ }
+ else {
+ sc.common.root = &tbl->tree.root;
+ }
+
+ stack = get_any_stack(tbl, stack_container);
+ traverse_update_backwards(&tbl->common, stack, lastkey, &doit_select_replace,
+ &sc.common, iter);
+ release_stack(tbl, stack_container,stack);
// the more objects we've replaced, the more reductions we've consumed
BUMP_REDS(p, MIN(2000, (1000 - sc.max) + (sc.replaced - prev_replaced)));
@@ -1782,7 +2049,7 @@ static int db_select_replace_continue_tree(Process *p,
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.replaced,p), DB_ERROR_NONE);
}
- key = GETKEY(tb, sc.lastobj);
+ key = GETKEY(tbl, sc.lastobj);
if (end_condition != NIL &&
(cmp_partly_bound(end_condition,key) > 0)) {
/* done anyway */
@@ -1813,10 +2080,21 @@ static int db_select_replace_continue_tree(Process *p,
#undef RET_TO_BIF
}
-static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid,
- Eterm pattern, Eterm *ret)
+static int db_select_replace_continue_tree(Process *p,
+ DbTable *tbl,
+ Eterm continuation,
+ Eterm *ret,
+ enum DbIterSafety* safety_p)
+{
+ return db_select_replace_continue_tree_common(p, tbl, continuation, ret,
+ &tbl->tree, NULL);
+}
+
+int db_select_replace_tree_common(Process *p, DbTable *tbl,
+ Eterm tid, Eterm pattern, Eterm *ret,
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
- DbTableTree *tb = &tbl->tree;
DbTreeStack* stack;
struct select_replace_context sc;
struct mp_info mpi;
@@ -1843,48 +2121,64 @@ static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid,
sc.lastobj = NULL;
sc.p = p;
- sc.tb = tb;
+ sc.tb = &tbl->common;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->common.keypos;
+ sc.keypos = tbl->common.keypos;
sc.replaced = 0;
- if ((errcode = analyze_pattern(tb, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tbl->common, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
- if (!mpi.something_can_match) {
+ if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
RET_TO_BIF(make_small(0),DB_ERROR_NONE);
/* can't possibly match anything */
}
sc.mp = mpi.mp;
- if (!mpi.got_partial && mpi.some_limitation &&
- CMP_EQ(mpi.least,mpi.most)) {
- doit_select_replace(tb,mpi.save_term,&sc,0 /* dummy */);
- reset_static_stack(tb); /* may refer replaced term */
+ if (mpi.key_boundness == MS_KEY_BOUND) {
+ TreeDbTerm** pp;
+ ASSERT(CMP_EQ(mpi.least, mpi.most));
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &tbl->tree.root;
+ pp = find_node2(&tbl->common, sc.common.root, mpi.least);
+ if (pp) {
+ doit_select_replace(&tbl->common, pp, &sc.common, 0 /* dummy */);
+ reset_static_stack(stack_container); /* may refer replaced term */
+ }
RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE);
}
- stack = get_any_stack(tb);
+ stack = get_any_stack(tbl,stack_container);
- if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_next_from_pb_key(tbl, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(tbl, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tbl->tree.root;
+ }
- traverse_update_backwards(tb, stack, lastkey, &doit_select_replace, &sc);
- release_stack(tb,stack);
+ traverse_update_backwards(&tbl->common, stack, lastkey, &doit_select_replace,
+ &sc.common, iter);
+ release_stack(tbl,stack_container,stack);
// the more objects we've replaced, the more reductions we've consumed
BUMP_REDS(p, MIN(2000, (1000 - sc.max) + sc.replaced));
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE);
}
- key = GETKEY(tb, sc.lastobj);
+ key = GETKEY(tbl, sc.lastobj);
sz = size_object(key);
if (IS_USMALL(0, sc.replaced)) {
hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6);
@@ -1914,52 +2208,74 @@ static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid,
}
-static int db_take_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret,
+ enum DbIterSafety safety)
+{
+ return db_select_replace_tree_common(p, tbl, tid, pattern, ret,
+ &tbl->tree, NULL);
+}
+
+int db_take_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root,
+ Eterm key, Eterm *ret,
+ DbTreeStack *stack /* NULL if no static stack */)
{
- DbTableTree *tb = &tbl->tree;
TreeDbTerm *this;
*ret = NIL;
- this = linkout_tree(tb, key);
+ this = linkout_tree(&tbl->common, root, key, stack);
if (this) {
Eterm copy, *hp, *hend;
hp = HAlloc(p, this->dbterm.size + 2);
hend = hp + this->dbterm.size + 2;
- copy = db_copy_object_from_ets(&tb->common,
+ copy = db_copy_object_from_ets(&tbl->common,
&this->dbterm, &hp, &MSO(p));
*ret = CONS(hp, copy, NIL);
hp += 2;
HRelease(p, hend, hp);
- free_term(tb, this);
+ free_term(tbl, this);
}
return DB_ERROR_NONE;
}
+static int db_take_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ return db_take_tree_common(p, tbl, &tb->root,
+ key, ret, &tb->static_stack);
+}
+
/*
** Other interface routines (not directly coupled to one bif)
*/
-
-/* Display tree contents (for dump) */
-static void db_print_tree(fmtfn_t to, void *to_arg,
- int show,
- DbTable *tbl)
+void db_print_tree_common(fmtfn_t to, void *to_arg,
+ int show, TreeDbTerm *root, DbTable *tbl)
{
- DbTableTree *tb = &tbl->tree;
#ifdef TREE_DEBUG
if (show)
erts_print(to, to_arg, "\nTree data dump:\n"
"------------------------------------------------\n");
- do_dump_tree2(&tbl->tree, to, to_arg, show, tb->root, 0);
+ do_dump_tree2(&tbl->common, to, to_arg, show, root, 0);
if (show)
erts_print(to, to_arg, "\n"
"------------------------------------------------\n");
#else
- erts_print(to, to_arg, "Ordered set (AVL tree), Elements: %d\n", NITEMS(tb));
+ erts_print(to, to_arg, "Ordered set (AVL tree), Elements: %d\n",
+ erts_flxctr_read_approx(&tbl->common.counters, ERTS_DB_TABLE_NITEMS_COUNTER_ID));
#endif
}
+/* Display tree contents (for dump) */
+static void db_print_tree(fmtfn_t to, void *to_arg,
+ int show,
+ DbTable *tbl)
+{
+ DbTableTree *tb = &tbl->tree;
+ db_print_tree_common(to, to_arg, show, tb->root, tbl);
+}
+
/* release all memory occupied by a single table */
static int db_free_empty_table_tree(DbTable *tbl)
{
@@ -1984,31 +2300,57 @@ static SWord db_free_table_continue_tree(DbTable *tbl, SWord reds)
(DbTable *) tb,
(void *) tb->static_stack.array,
sizeof(TreeDbTerm *) * STACK_NEED);
- ASSERT(erts_atomic_read_nob(&tb->common.memory_size)
- == sizeof(DbTable));
+ ASSERT(erts_flxctr_is_snapshot_ongoing(&tb->common.counters) ||
+ ((APPROX_MEM_CONSUMED(tb)
+ == sizeof(DbTable)) ||
+ (APPROX_MEM_CONSUMED(tb)
+ == (sizeof(DbTable) + sizeof(DbFixation)))));
}
return reds;
}
-static SWord db_delete_all_objects_tree(Process* p, DbTable* tbl, SWord reds)
+static SWord db_delete_all_objects_tree(Process* p,
+ DbTable* tbl,
+ SWord reds,
+ Eterm* nitems_holder_wb)
{
+ if (nitems_holder_wb != NULL) {
+ Uint nr_of_items =
+ erts_flxctr_read_centralized(&tbl->common.counters,
+ ERTS_DB_TABLE_NITEMS_COUNTER_ID);
+ *nitems_holder_wb = erts_make_integer(nr_of_items, p);
+ }
reds = db_free_table_continue_tree(tbl, reds);
if (reds < 0)
return reds;
db_create_tree(p, tbl);
- erts_atomic_set_nob(&tbl->tree.common.nitems, 0);
+ RESET_NITEMS(tbl);
return reds;
}
+static Eterm db_delete_all_objects_get_nitems_from_holder_tree(Process* p,
+ Eterm holder)
+{
+ (void)p;
+ return holder;
+}
+
static void do_db_tree_foreach_offheap(TreeDbTerm *,
void (*)(ErlOffHeap *, void *),
void *);
+void db_foreach_offheap_tree_common(TreeDbTerm *root,
+ void (*func)(ErlOffHeap *, void *),
+ void * arg)
+{
+ do_db_tree_foreach_offheap(root, func, arg);
+}
+
static void db_foreach_offheap_tree(DbTable *tbl,
void (*func)(ErlOffHeap *, void *),
void * arg)
{
- do_db_tree_foreach_offheap(tbl->tree.root, func, arg);
+ db_foreach_offheap_tree_common(tbl->tree.root, func, arg);
}
@@ -2033,13 +2375,14 @@ do_db_tree_foreach_offheap(TreeDbTerm *tdbt,
do_db_tree_foreach_offheap(tdbt->right, func, arg);
}
-static TreeDbTerm *linkout_tree(DbTableTree *tb, Eterm key) {
+static TreeDbTerm *linkout_tree(DbTableCommon *tb, TreeDbTerm **root,
+ Eterm key, DbTreeStack *stack) {
TreeDbTerm **tstack[STACK_NEED];
int tpos = 0;
int dstack[STACK_NEED+1];
int dpos = 0;
int state = 0;
- TreeDbTerm **this = &tb->root;
+ TreeDbTerm **this = root;
Sint c;
int dir;
TreeDbTerm *q = NULL;
@@ -2050,7 +2393,7 @@ static TreeDbTerm *linkout_tree(DbTableTree *tb, Eterm key) {
* keep the balance. As in insert, we do the stacking ourselves.
*/
- reset_static_stack(tb);
+ reset_stack(stack);
dstack[dpos++] = DIR_END;
for (;;) {
if (!*this) { /* Failure */
@@ -2076,30 +2419,30 @@ static TreeDbTerm *linkout_tree(DbTableTree *tb, Eterm key) {
tstack[tpos++] = this;
state = delsub(this);
}
- erts_atomic_dec_nob(&tb->common.nitems);
+ DEC_NITEMS(((DbTable*)tb));
break;
}
}
while (state && ( dir = dstack[--dpos] ) != DIR_END) {
this = tstack[--tpos];
if (dir == DIR_LEFT) {
- state = balance_left(this);
+ state = tree_balance_left(this);
} else {
- state = balance_right(this);
+ state = tree_balance_right(this);
}
}
return q;
}
-static TreeDbTerm *linkout_object_tree(DbTableTree *tb,
- Eterm object)
+static TreeDbTerm *linkout_object_tree(DbTableCommon *tb, TreeDbTerm **root,
+ Eterm object, DbTableTree *stack)
{
TreeDbTerm **tstack[STACK_NEED];
int tpos = 0;
int dstack[STACK_NEED+1];
int dpos = 0;
int state = 0;
- TreeDbTerm **this = &tb->root;
+ TreeDbTerm **this = root;
Sint c;
int dir;
TreeDbTerm *q = NULL;
@@ -2114,7 +2457,7 @@ static TreeDbTerm *linkout_object_tree(DbTableTree *tb,
key = GETKEY(tb, tuple_val(object));
- reset_static_stack(tb);
+ reset_static_stack(stack);
dstack[dpos++] = DIR_END;
for (;;) {
if (!*this) { /* Failure */
@@ -2128,7 +2471,7 @@ static TreeDbTerm *linkout_object_tree(DbTableTree *tb,
tstack[tpos++] = this;
this = &((*this)->right);
} else { /* Equal key, found the only possible matching object*/
- if (!db_eq(&tb->common,object,&(*this)->dbterm)) {
+ if (!db_eq(tb,object,&(*this)->dbterm)) {
return NULL;
}
q = (*this);
@@ -2143,16 +2486,16 @@ static TreeDbTerm *linkout_object_tree(DbTableTree *tb,
tstack[tpos++] = this;
state = delsub(this);
}
- erts_atomic_dec_nob(&tb->common.nitems);
+ DEC_NITEMS(((DbTable*)tb));
break;
}
}
while (state && ( dir = dstack[--dpos] ) != DIR_END) {
this = tstack[--tpos];
if (dir == DIR_LEFT) {
- state = balance_left(this);
+ state = tree_balance_left(this);
} else {
- state = balance_right(this);
+ state = tree_balance_right(this);
}
}
return q;
@@ -2162,7 +2505,7 @@ static TreeDbTerm *linkout_object_tree(DbTableTree *tb,
** For the select functions, analyzes the pattern and determines which
** part of the tree should be searched. Also compiles the match program
*/
-static int analyze_pattern(DbTableTree *tb, Eterm pattern,
+static int analyze_pattern(DbTableCommon *tb, Eterm pattern,
extra_match_validator_t extra_validator, /* Optional callback */
struct mp_info *mpi)
{
@@ -2173,17 +2516,12 @@ static int analyze_pattern(DbTableTree *tb, Eterm pattern,
Eterm *ptpl;
int i;
int num_heads = 0;
- Eterm key;
- Eterm partly_bound;
- int res;
- Eterm least = 0;
- Eterm most = 0;
+ Eterm least = THE_NON_VALUE;
+ Eterm most = THE_NON_VALUE;
+ enum ms_key_boundness boundness;
- mpi->some_limitation = 1;
- mpi->got_partial = 0;
- mpi->something_can_match = 0;
+ mpi->key_boundness = MS_KEY_IMPOSSIBLE;
mpi->mp = NULL;
- mpi->save_term = NULL;
for (lst = pattern; is_list(lst); lst = CDR(list_val(lst)))
++num_heads;
@@ -2204,6 +2542,7 @@ static int analyze_pattern(DbTableTree *tb, Eterm pattern,
Eterm match;
Eterm guard;
Eterm body;
+ Eterm key;
ttpl = CAR(list_val(lst));
if (!is_tuple(ttpl)) {
@@ -2223,7 +2562,7 @@ static int analyze_pattern(DbTableTree *tb, Eterm pattern,
guards[i] = guard = ptpl[2];
bodies[i] = body = ptpl[3];
- if(extra_validator != NULL && !extra_validator(tb->common.keypos, match, guard, body)) {
+ if(extra_validator != NULL && !extra_validator(tb->keypos, match, guard, body)) {
if (buff != sbuff) {
erts_free(ERTS_ALC_T_DB_TMP, buff);
}
@@ -2235,30 +2574,29 @@ static int analyze_pattern(DbTableTree *tb, Eterm pattern,
}
++i;
- partly_bound = NIL;
- res = key_given(tb, tpl, &(mpi->save_term), &partly_bound);
- if ( res >= 0 ) { /* Can match something */
- key = 0;
- mpi->something_can_match = 1;
- if (res > 0) {
- key = GETKEY(tb,tuple_val(tpl));
- } else if (partly_bound != NIL) {
- mpi->got_partial = 1;
- key = partly_bound;
- } else {
- mpi->some_limitation = 0;
- }
- if (key != 0) {
- if (least == 0 ||
- partly_bound_can_match_lesser(key,least)) {
- least = key;
- }
- if (most == 0 ||
- partly_bound_can_match_greater(key,most)) {
- most = key;
- }
- }
- }
+ boundness = key_boundness(tb, tpl, &key);
+ switch (boundness)
+ {
+ case MS_KEY_BOUND:
+ case MS_KEY_PARTIALLY_BOUND:
+ if (is_non_value(least) || partly_bound_can_match_lesser(key,least)) {
+ least = key;
+ }
+ if (is_non_value(most) || partly_bound_can_match_greater(key,most)) {
+ most = key;
+ }
+ break;
+ case MS_KEY_IMPOSSIBLE:
+ case MS_KEY_UNBOUND:
+ break;
+ }
+ if (mpi->key_boundness > boundness)
+ mpi->key_boundness = boundness;
+ }
+
+ if (mpi->key_boundness == MS_KEY_BOUND && !CMP_EQ(least, most)) {
+ /* Several different bound keys */
+ mpi->key_boundness = MS_KEY_PARTIALLY_BOUND;
}
mpi->least = least;
mpi->most = most;
@@ -2300,7 +2638,7 @@ static SWord do_free_tree_continue(DbTableTree *tb, SWord reds)
PUSH_NODE(&tb->static_stack, root);
root = p;
} else {
- free_term(tb, root);
+ free_term((DbTable*)tb, root);
if (--reds < 0) {
return reds; /* Done enough for now */
}
@@ -2314,7 +2652,7 @@ static SWord do_free_tree_continue(DbTableTree *tb, SWord reds)
/*
* Deletion helpers
*/
-static int balance_left(TreeDbTerm **this)
+int tree_balance_left(TreeDbTerm **this)
{
TreeDbTerm *p, *p1, *p2;
int b1, b2, h = 1;
@@ -2359,7 +2697,7 @@ static int balance_left(TreeDbTerm **this)
return h;
}
-static int balance_right(TreeDbTerm **this)
+int tree_balance_right(TreeDbTerm **this)
{
TreeDbTerm *p, *p1, *p2;
int b1, b2, h = 1;
@@ -2431,7 +2769,7 @@ static int delsub(TreeDbTerm **this)
h = 1;
while (tpos && h) {
r = tstack[--tpos];
- h = balance_right(r);
+ h = tree_balance_right(r);
}
return h;
}
@@ -2440,11 +2778,30 @@ static int delsub(TreeDbTerm **this)
* Helper for db_slot
*/
-static TreeDbTerm *slot_search(Process *p, DbTableTree *tb, Sint slot)
+static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root,
+ Sint slot, DbTable *tb,
+ DbTableTree *stack_container,
+ CATreeRootIterator *iter,
+ int* is_EOT)
{
TreeDbTerm *this;
TreeDbTerm *tmp;
- DbTreeStack* stack = get_any_stack(tb);
+ TreeDbTerm *lastobj;
+ Eterm lastkey;
+ TreeDbTerm **pp;
+ DbTreeStack* stack;
+
+ if (iter) {
+ /* Find first non-empty tree */
+ while (!root) {
+ TreeDbTerm** pp = catree_find_next_root(iter, NULL);
+ if (!pp)
+ return NULL;
+ root = *pp;
+ }
+ }
+
+ stack = get_any_stack(tb,stack_container);
ASSERT(stack != NULL);
if (slot == 1) { /* Don't search from where we are if we are
@@ -2456,57 +2813,92 @@ static TreeDbTerm *slot_search(Process *p, DbTableTree *tb, Sint slot)
are not recorded */
stack->pos = 0;
}
- if (EMPTY_NODE(stack)) {
- this = tb->root;
- if (this == NULL)
- goto done;
- while (this->left != NULL){
- PUSH_NODE(stack, this);
- this = this->left;
- }
- PUSH_NODE(stack, this);
- stack->slot = 1;
- }
- this = TOP_NODE(stack);
- while (stack->slot != slot && this != NULL) {
- if (slot > stack->slot) {
- if (this->right != NULL) {
- this = this->right;
- while (this->left != NULL) {
- PUSH_NODE(stack, this);
- this = this->left;
- }
- PUSH_NODE(stack, this);
- } else {
- for (;;) {
- tmp = POP_NODE(stack);
- this = TOP_NODE(stack);
- if (this == NULL || this->left == tmp)
- break;
- }
- }
- ++(stack->slot);
- } else {
- if (this->left != NULL) {
- this = this->left;
- while (this->right != NULL) {
- PUSH_NODE(stack, this);
- this = this->right;
- }
- PUSH_NODE(stack, this);
- } else {
- for (;;) {
- tmp = POP_NODE(stack);
- this = TOP_NODE(stack);
- if (this == NULL || this->right == tmp)
- break;
- }
- }
- --(stack->slot);
- }
+ while (1) {
+ if (EMPTY_NODE(stack)) {
+ this = root;
+ if (this == NULL)
+ goto next_root;
+ while (this->left != NULL){
+ PUSH_NODE(stack, this);
+ this = this->left;
+ }
+ PUSH_NODE(stack, this);
+ stack->slot++;
+ }
+ this = TOP_NODE(stack);
+ while (stack->slot != slot) {
+ ASSERT(this);
+ lastobj = this;
+ if (slot > stack->slot) {
+ if (this->right != NULL) {
+ this = this->right;
+ while (this->left != NULL) {
+ PUSH_NODE(stack, this);
+ this = this->left;
+ }
+ PUSH_NODE(stack, this);
+ } else {
+ for (;;) {
+ tmp = POP_NODE(stack);
+ this = TOP_NODE(stack);
+ if (!this)
+ goto next_root;
+ if (this->left == tmp)
+ break;
+ }
+ }
+ ++(stack->slot);
+ } else {
+ if (this->left != NULL) {
+ this = this->left;
+ while (this->right != NULL) {
+ PUSH_NODE(stack, this);
+ this = this->right;
+ }
+ PUSH_NODE(stack, this);
+ } else {
+ for (;;) {
+ tmp = POP_NODE(stack);
+ this = TOP_NODE(stack);
+ if (!this)
+ goto next_root;
+ if (this->right == tmp)
+ break;
+ }
+ }
+ --(stack->slot);
+ }
+ }
+ /* Found slot */
+ ASSERT(this);
+ break;
+
+next_root:
+ if (!iter) {
+ if (stack->slot == (slot-1)) {
+ *is_EOT = 1;
+ }
+ break; /* EOT */
+ }
+
+ ASSERT(slot > stack->slot);
+ if (lastobj) {
+ lastkey = GETKEY(tb, lastobj->dbterm.tpl);
+ lastobj = NULL;
+ }
+ pp = catree_find_next_root(iter, &lastkey);
+ if (!pp) {
+ if (stack->slot == (slot-1)) {
+ *is_EOT = 1;
+ }
+ break; /* EOT */
+ }
+ root = *pp;
+ stack->pos = 0;
+ find_next(&tb->common, root, stack, lastkey);
}
-done:
- release_stack(tb,stack);
+
+ release_stack(tb,stack_container,stack);
return this;
}
@@ -2514,7 +2906,8 @@ done:
* Find next and previous in sort order
*/
-static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack* stack, Eterm key) {
+static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root,
+ DbTreeStack* stack, Eterm key) {
TreeDbTerm *this;
TreeDbTerm *tmp;
Sint c;
@@ -2526,7 +2919,7 @@ static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack* stack, Eterm key) {
}
}
if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */
- if (( this = tb->root ) == NULL)
+ if (( this = root ) == NULL)
return NULL;
for (;;) {
PUSH_NODE(stack, this);
@@ -2539,7 +2932,7 @@ static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack* stack, Eterm key) {
this = this->right;
} else if (c < 0) {
if (this->left == NULL) /* Done */
- return this;
+ goto found_next;
else
this = this->left;
} else
@@ -2554,8 +2947,6 @@ static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack* stack, Eterm key) {
this = this->left;
PUSH_NODE(stack, this);
}
- if (stack->slot > 0)
- ++(stack->slot);
} else {
do {
tmp = POP_NODE(stack);
@@ -2564,13 +2955,17 @@ static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack* stack, Eterm key) {
return NULL;
}
} while (this->right == tmp);
- if (stack->slot > 0)
- ++(stack->slot);
}
+
+found_next:
+ if (stack->slot > 0)
+ ++(stack->slot);
+
return this;
}
-static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack* stack, Eterm key) {
+static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root,
+ DbTreeStack* stack, Eterm key) {
TreeDbTerm *this;
TreeDbTerm *tmp;
Sint c;
@@ -2582,7 +2977,7 @@ static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack* stack, Eterm key) {
}
}
if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */
- if (( this = tb->root ) == NULL)
+ if (( this = root ) == NULL)
return NULL;
for (;;) {
PUSH_NODE(stack, this);
@@ -2595,7 +2990,7 @@ static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack* stack, Eterm key) {
this = this->left;
} else if (c > 0) {
if (this->right == NULL) /* Done */
- return this;
+ goto found_prev;
else
this = this->right;
} else
@@ -2610,8 +3005,6 @@ static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack* stack, Eterm key) {
this = this->right;
PUSH_NODE(stack, this);
}
- if (stack->slot > 0)
- --(stack->slot);
} else {
do {
tmp = POP_NODE(stack);
@@ -2620,74 +3013,112 @@ static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack* stack, Eterm key) {
return NULL;
}
} while (this->left == tmp);
- if (stack->slot > 0)
- --(stack->slot);
}
+
+found_prev:
+ if (stack->slot > 0)
+ --(stack->slot);
+
return this;
}
-static TreeDbTerm *find_next_from_pb_key(DbTableTree *tb, DbTreeStack* stack,
- Eterm key)
+
+/* @brief Find object with smallest key of all larger than partially bound key.
+ * Can be used as a starting point for a reverse iteration with pb_key.
+ *
+ * @param pb_key The partially bound key. Example {42, '$1'}
+ * @param *rootpp Will return pointer to root pointer of tree with found object.
+ * @param iter Root iterator or NULL for plain DbTableTree.
+ * @param stack A stack to use. Will be cleared.
+ *
+ * @return found object or NULL if no such key exists.
+ */
+static TreeDbTerm *find_next_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp,
+ DbTreeStack* stack, Eterm pb_key,
+ CATreeRootIterator* iter)
{
+ TreeDbTerm* root;
TreeDbTerm *this;
- TreeDbTerm *tmp;
+ Uint candidate = 0;
Sint c;
+ if (iter) {
+ *rootpp = catree_find_next_from_pb_key_root(pb_key, iter);
+ ASSERT(*rootpp);
+ root = **rootpp;
+ }
+ else {
+ *rootpp = &tbl->tree.root;
+ root = tbl->tree.root;
+ }
+
/* spool the stack, we have to "re-search" */
stack->pos = stack->slot = 0;
- if (( this = tb->root ) == NULL)
+ if (( this = root ) == NULL)
return NULL;
for (;;) {
PUSH_NODE(stack, this);
- if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl))) >= 0) {
+ if (( c = cmp_partly_bound(pb_key,GETKEY(tbl, this->dbterm.tpl))) >= 0) {
if (this->right == NULL) {
- do {
- tmp = POP_NODE(stack);
- if (( this = TOP_NODE(stack)) == NULL) {
- return NULL;
- }
- } while (this->right == tmp);
- return this;
- } else
- this = this->right;
+ stack->pos = candidate;
+ return TOP_NODE(stack);
+ }
+ this = this->right;
} else /*if (c < 0)*/ {
if (this->left == NULL) /* Done */
return this;
- else
- this = this->left;
+ candidate = stack->pos;
+ this = this->left;
}
}
}
-static TreeDbTerm *find_prev_from_pb_key(DbTableTree *tb, DbTreeStack* stack,
- Eterm key)
+/* @brief Find object with largest key of all smaller than partially bound key.
+ * Can be used as a starting point for a forward iteration with pb_key.
+ *
+ * @param pb_key The partially bound key. Example {42, '$1'}
+ * @param *rootpp Will return pointer to root pointer of found object.
+ * @param iter Root iterator or NULL for plain DbTableTree.
+ * @param stack A stack to use. Will be cleared.
+ *
+ * @return found object or NULL if no such key exists.
+ */
+static TreeDbTerm *find_prev_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp,
+ DbTreeStack* stack, Eterm pb_key,
+ CATreeRootIterator* iter)
{
+ TreeDbTerm* root;
TreeDbTerm *this;
- TreeDbTerm *tmp;
+ Uint candidate = 0;
Sint c;
+ if (iter) {
+ *rootpp = catree_find_prev_from_pb_key_root(pb_key, iter);
+ ASSERT(*rootpp);
+ root = **rootpp;
+ }
+ else {
+ *rootpp = &tbl->tree.root;
+ root = tbl->tree.root;
+ }
+
/* spool the stack, we have to "re-search" */
stack->pos = stack->slot = 0;
- if (( this = tb->root ) == NULL)
+ if (( this = root ) == NULL)
return NULL;
for (;;) {
PUSH_NODE(stack, this);
- if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl))) <= 0) {
+ if (( c = cmp_partly_bound(pb_key,GETKEY(tbl, this->dbterm.tpl))) <= 0) {
if (this->left == NULL) {
- do {
- tmp = POP_NODE(stack);
- if (( this = TOP_NODE(stack)) == NULL) {
- return NULL;
- }
- } while (this->left == tmp);
- return this;
- } else
- this = this->left;
- } else /*if (c < 0)*/ {
+ stack->pos = candidate;
+ return TOP_NODE(stack);
+ }
+ this = this->left;
+ } else /*if (c > 0)*/ {
if (this->right == NULL) /* Done */
return this;
- else
- this = this->right;
+ candidate = stack->pos;
+ this = this->right;
}
}
}
@@ -2696,16 +3127,17 @@ static TreeDbTerm *find_prev_from_pb_key(DbTableTree *tb, DbTreeStack* stack,
/*
* Just lookup a node
*/
-static TreeDbTerm *find_node(DbTableTree *tb, Eterm key)
+static TreeDbTerm *find_node(DbTableCommon *tb, TreeDbTerm *root,
+ Eterm key, DbTableTree *stack_container)
{
TreeDbTerm *this;
Sint res;
- DbTreeStack* stack = get_static_stack(tb);
+ DbTreeStack* stack = get_static_stack(stack_container);
if(!stack || EMPTY_NODE(stack)
|| !cmp_key_eq(tb, key, (this=TOP_NODE(stack)))) {
- this = tb->root;
+ this = root;
while (this != NULL && (res = cmp_key(tb,key,this)) != 0) {
if (res < 0)
this = this->left;
@@ -2714,7 +3146,7 @@ static TreeDbTerm *find_node(DbTableTree *tb, Eterm key)
}
}
if (stack) {
- release_stack(tb,stack);
+ release_stack((DbTable*)tb,stack_container,stack);
}
return this;
}
@@ -2722,12 +3154,12 @@ static TreeDbTerm *find_node(DbTableTree *tb, Eterm key)
/*
* Lookup a node and return the address of the node pointer in the tree
*/
-static TreeDbTerm **find_node2(DbTableTree *tb, Eterm key)
+static TreeDbTerm **find_node2(DbTableCommon *tb, TreeDbTerm **root, Eterm key)
{
TreeDbTerm **this;
Sint res;
- this = &tb->root;
+ this = root;
while ((*this) != NULL && (res = cmp_key(tb, key, *this)) != 0) {
if (res < 0)
this = &((*this)->left);
@@ -2744,7 +3176,8 @@ static TreeDbTerm **find_node2(DbTableTree *tb, Eterm key)
* Tries to reuse the existing stack for performance.
*/
-static TreeDbTerm **find_ptr(DbTableTree *tb, DbTreeStack *stack, TreeDbTerm *this) {
+static TreeDbTerm **find_ptr(DbTableCommon *tb, TreeDbTerm **root,
+ DbTreeStack *stack, TreeDbTerm *this) {
Eterm key = GETKEY(tb, this->dbterm.tpl);
TreeDbTerm *tmp;
TreeDbTerm *parent;
@@ -2757,7 +3190,7 @@ static TreeDbTerm **find_ptr(DbTableTree *tb, DbTreeStack *stack, TreeDbTerm *th
}
}
if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */
- if (( tmp = tb->root ) == NULL)
+ if (( tmp = *root ) == NULL)
return NULL;
for (;;) {
PUSH_NODE(stack, tmp);
@@ -2783,7 +3216,7 @@ static TreeDbTerm **find_ptr(DbTableTree *tb, DbTreeStack *stack, TreeDbTerm *th
parent = TOPN_NODE(stack, 1);
if (parent == NULL)
- return ((this != tb->root) ? NULL : &(tb->root));
+ return ((this != *root) ? NULL : root);
if (parent->left == this)
return &(parent->left);
if (parent->right == this)
@@ -2791,12 +3224,11 @@ static TreeDbTerm **find_ptr(DbTableTree *tb, DbTreeStack *stack, TreeDbTerm *th
return NULL;
}
-static int
-db_lookup_dbterm_tree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
- DbUpdateHandle* handle)
+int db_lookup_dbterm_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root,
+ Eterm key, Eterm obj, DbUpdateHandle* handle,
+ DbTableTree *stack_container)
{
- DbTableTree *tb = &tbl->tree;
- TreeDbTerm **pp = find_node2(tb, key);
+ TreeDbTerm **pp = find_node2(&tbl->common, root, key);
int flags = 0;
if (pp == NULL) {
@@ -2807,18 +3239,19 @@ db_lookup_dbterm_tree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
int arity = arityval(*objp);
Eterm *htop, *hend;
- ASSERT(arity >= tb->common.keypos);
+ ASSERT(arity >= tbl->common.keypos);
htop = HAlloc(p, arity + 1);
hend = htop + arity + 1;
sys_memcpy(htop, objp, sizeof(Eterm) * (arity + 1));
- htop[tb->common.keypos] = key;
+ htop[tbl->common.keypos] = key;
obj = make_tuple(htop);
- if (db_put_tree(tbl, obj, 1) != DB_ERROR_NONE) {
+ if (db_put_tree_common(&tbl->common, root,
+ obj, 1, stack_container) != DB_ERROR_NONE) {
return 0;
}
- pp = find_node2(tb, key);
+ pp = find_node2(&tbl->common, root, key);
ASSERT(pp != NULL);
HRelease(p, hend, htop);
flags |= DB_NEW_OBJECT;
@@ -2833,21 +3266,28 @@ db_lookup_dbterm_tree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
return 1;
}
-static void
-db_finalize_dbterm_tree(int cret, DbUpdateHandle *handle)
+static int
+db_lookup_dbterm_tree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
+ DbUpdateHandle* handle)
{
- DbTable *tbl = handle->tb;
DbTableTree *tb = &tbl->tree;
+ return db_lookup_dbterm_tree_common(p, tbl, &tb->root, key, obj, handle, tb);
+}
+
+void db_finalize_dbterm_tree_common(int cret, DbUpdateHandle *handle,
+ DbTableTree *stack_container)
+{
+ DbTable *tbl = handle->tb;
TreeDbTerm *bp = (TreeDbTerm *) *handle->bp;
if (handle->flags & DB_NEW_OBJECT && cret != DB_ERROR_NONE) {
Eterm ret;
- db_erase_tree(tbl, GETKEY(tb, bp->dbterm.tpl), &ret);
+ db_erase_tree(tbl, GETKEY(&tbl->common, bp->dbterm.tpl), &ret);
} else if (handle->flags & DB_MUST_RESIZE) {
db_finalize_resize(handle, offsetof(TreeDbTerm,dbterm));
- reset_static_stack(tb);
+ reset_static_stack(stack_container);
- free_term(tb, bp);
+ free_term(tbl, bp);
}
#ifdef DEBUG
handle->dbterm = 0;
@@ -2855,156 +3295,207 @@ db_finalize_dbterm_tree(int cret, DbUpdateHandle *handle)
return;
}
+static void
+db_finalize_dbterm_tree(int cret, DbUpdateHandle *handle)
+{
+ DbTable *tbl = handle->tb;
+ DbTableTree *tb = &tbl->tree;
+ db_finalize_dbterm_tree_common(cret, handle, tb);
+}
+
/*
* Traverse the tree with a callback function, used by db_match_xxx
*/
-static void traverse_backwards(DbTableTree *tb,
+static void traverse_backwards(DbTableCommon *tb,
DbTreeStack* stack,
Eterm lastkey,
- int (*doit)(DbTableTree *,
- TreeDbTerm *,
- void *,
- int),
- void *context)
+ traverse_doit_funcT* doit,
+ struct select_common *context,
+ CATreeRootIterator* iter)
{
TreeDbTerm *this, *next;
+ TreeDbTerm** root = context->root;
if (lastkey == THE_NON_VALUE) {
- stack->pos = stack->slot = 0;
- if (( this = tb->root ) == NULL) {
- return;
- }
- while (this != NULL) {
- PUSH_NODE(stack, this);
- this = this->right;
- }
- this = TOP_NODE(stack);
- next = find_prev(tb, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 0)))
- return;
+ if (iter) {
+ while (*root == NULL) {
+ root = catree_find_prev_root(iter, NULL);
+ if (!root)
+ return;
+ }
+ context->root = root;
+ }
+ stack->pos = stack->slot = 0;
+ next = *root;
+ while (next != NULL) {
+ PUSH_NODE(stack, next);
+ next = next->right;
+ }
+ next = TOP_NODE(stack);
} else {
- next = find_prev(tb, stack, lastkey);
+ next = find_prev(tb, *root, stack, lastkey);
}
- while ((this = next) != NULL) {
- next = find_prev(tb, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 0)))
- return;
+ while (1) {
+ while (next) {
+ this = next;
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ next = find_prev(tb, *root, stack, lastkey);
+ if (!((*doit)(tb, this, context, 0)))
+ return;
+ }
+
+ if (!iter)
+ return;
+ ASSERT(is_value(lastkey));
+ root = catree_find_prev_root(iter, &lastkey);
+ if (!root)
+ return;
+ context->root = root;
+ stack->pos = stack->slot = 0;
+ next = find_prev(tb, *root, stack, lastkey);
}
}
/*
* Traverse the tree with a callback function, used by db_match_xxx
*/
-static void traverse_forward(DbTableTree *tb,
+static void traverse_forward(DbTableCommon *tb,
DbTreeStack* stack,
Eterm lastkey,
- int (*doit)(DbTableTree *,
- TreeDbTerm *,
- void *,
- int),
- void *context)
+ traverse_doit_funcT* doit,
+ struct select_common *context,
+ CATreeRootIterator* iter)
{
TreeDbTerm *this, *next;
+ TreeDbTerm **root = context->root;
if (lastkey == THE_NON_VALUE) {
- stack->pos = stack->slot = 0;
- if (( this = tb->root ) == NULL) {
- return;
- }
- while (this != NULL) {
- PUSH_NODE(stack, this);
- this = this->left;
- }
- this = TOP_NODE(stack);
- next = find_next(tb, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 1)))
- return;
+ if (iter) {
+ while (*root == NULL) {
+ root = catree_find_next_root(iter, NULL);
+ if (!root)
+ return;
+ }
+ context->root = root;
+ }
+ stack->pos = stack->slot = 0;
+ next = *root;
+ while (next != NULL) {
+ PUSH_NODE(stack, next);
+ next = next->left;
+ }
+ next = TOP_NODE(stack);
} else {
- next = find_next(tb, stack, lastkey);
+ next = find_next(tb, *root, stack, lastkey);
}
- while ((this = next) != NULL) {
- next = find_next(tb, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 1)))
- return;
+ while (1) {
+ while (next) {
+ this = next;
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ next = find_next(tb, *root, stack, lastkey);
+ if (!((*doit)(tb, this, context, 1)))
+ return;
+ }
+
+ if (!iter)
+ return;
+ ASSERT(is_value(lastkey));
+ root = catree_find_next_root(iter, &lastkey);
+ if (!root)
+ return;
+ context->root = root;
+ stack->pos = stack->slot = 0;
+ next = find_next(tb, *root, stack, lastkey);
}
}
/*
* Traverse the tree with an update callback function, used by db_select_replace
*/
-static void traverse_update_backwards(DbTableTree *tb,
+static void traverse_update_backwards(DbTableCommon *tb,
DbTreeStack* stack,
Eterm lastkey,
- int (*doit)(DbTableTree*,
+ int (*doit)(DbTableCommon*,
TreeDbTerm**,
- void*,
+ struct select_common*,
int),
- void* context)
+ struct select_common* context,
+ CATreeRootIterator* iter)
{
int res;
TreeDbTerm *this, *next, **this_ptr;
+ TreeDbTerm** root = context->root;
if (lastkey == THE_NON_VALUE) {
- stack->pos = stack->slot = 0;
- if (( this = tb->root ) == NULL) {
- return;
+ if (iter) {
+ while (*root == NULL) {
+ root = catree_find_prev_root(iter, NULL);
+ if (!root)
+ return;
+ context->root = root;
+ }
}
- while (this != NULL) {
- PUSH_NODE(stack, this);
- this = this->right;
+ stack->pos = stack->slot = 0;
+ next = *root;
+ while (next) {
+ PUSH_NODE(stack, next);
+ next = next->right;
}
- this = TOP_NODE(stack);
- this_ptr = find_ptr(tb, stack, this);
- ASSERT(this_ptr != NULL);
- res = (*doit)(tb, this_ptr, context, 0);
- REPLACE_TOP_NODE(stack, *this_ptr);
- next = find_prev(tb, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
- if (!res)
- return;
- } else {
- next = find_prev(tb, stack, lastkey);
+ next = TOP_NODE(stack);
}
+ else
+ next = find_prev(tb, *root, stack, lastkey);
+
+
+ while (1) {
+ while (next) {
+ this = next;
+ this_ptr = find_ptr(tb, root, stack, this);
+ ASSERT(this_ptr != NULL);
+ res = (*doit)(tb, this_ptr, context, 0);
+ this = *this_ptr;
+ REPLACE_TOP_NODE(stack, this);
+ if (!res)
+ return;
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ next = find_prev(tb, *root, stack, lastkey);
+ }
- while ((this = next) != NULL) {
- this_ptr = find_ptr(tb, stack, this);
- ASSERT(this_ptr != NULL);
- res = (*doit)(tb, this_ptr, context, 0);
- REPLACE_TOP_NODE(stack, *this_ptr);
- next = find_prev(tb, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
- if (!res)
+ if (!iter)
return;
+ ASSERT(is_value(lastkey));
+ root = catree_find_prev_root(iter, &lastkey);
+ if (!root)
+ return;
+ context->root = root;
+ stack->pos = stack->slot = 0;
+ next = find_prev(tb, *root, stack, lastkey);
}
}
-/*
- * Returns 0 if not given 1 if given and -1 on no possible match
- * if key is given; *ret is set to point to the object concerned.
- */
-static int key_given(DbTableTree *tb, Eterm pattern, TreeDbTerm ***ret,
- Eterm *partly_bound)
+static enum ms_key_boundness key_boundness(DbTableCommon *tb,
+ Eterm pattern, Eterm *keyp)
{
- TreeDbTerm **this;
Eterm key;
- ASSERT(ret != NULL);
if (pattern == am_Underscore || db_is_variable(pattern) != -1)
- return 0;
- key = db_getkey(tb->common.keypos, pattern);
+ return MS_KEY_UNBOUND;
+ key = db_getkey(tb->keypos, pattern);
if (is_non_value(key))
- return -1; /* can't possibly match anything */
+ return MS_KEY_IMPOSSIBLE; /* can't possibly match anything */
if (!db_has_variable(key)) { /* Bound key */
- if (( this = find_node2(tb, key) ) == NULL) {
- return -1;
- }
- *ret = this;
- return 1;
- } else if (partly_bound != NULL && key != am_Underscore &&
- db_is_variable(key) < 0 && !db_has_map(key))
- *partly_bound = key;
+ *keyp = key;
+ return MS_KEY_BOUND;
+ } else if (key != am_Underscore &&
+ db_is_variable(key) < 0 && !db_has_map(key)) {
+
+ *keyp = key;
+ return MS_KEY_PARTIALLY_BOUND;
+ }
- return 0;
+ return MS_KEY_UNBOUND;
}
@@ -3072,7 +3563,8 @@ static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done)
}
}
-static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key) {
+Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key)
+{
int done = 0;
Sint ret = do_cmp_partly_bound(partly_bound_key, bound_key, &done);
#ifdef HARDDEBUG
@@ -3118,7 +3610,7 @@ static int partly_bound_can_match_lesser(Eterm partly_bound_1,
if (ret)
erts_fprintf(stderr," can match lesser than ");
else
- erts_fprintf(stderr," can not match lesser than ");
+ erts_fprintf(stderr," cannot match lesser than ");
erts_fprintf(stderr,"%T\n",partly_bound_2);
#endif
return ret;
@@ -3136,7 +3628,7 @@ static int partly_bound_can_match_greater(Eterm partly_bound_1,
if (ret)
erts_fprintf(stderr," can match greater than ");
else
- erts_fprintf(stderr," can not match greater than ");
+ erts_fprintf(stderr," cannot match greater than ");
erts_fprintf(stderr,"%T\n",partly_bound_2);
#endif
return ret;
@@ -3288,7 +3780,8 @@ static int do_partly_bound_can_match_greater(Eterm a, Eterm b,
* Callback functions for the different match functions
*/
-static int doit_select(DbTableTree *tb, TreeDbTerm *this, void *ptr,
+static int doit_select(DbTableCommon *tb, TreeDbTerm *this,
+ struct select_common* ptr,
int forward)
{
struct select_context *sc = (struct select_context *) ptr;
@@ -3306,7 +3799,7 @@ static int doit_select(DbTableTree *tb, TreeDbTerm *this, void *ptr,
GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0))) {
return 0;
}
- ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &this->dbterm, &hp, 2);
+ ret = db_match_dbterm(tb, sc->p,sc->mp, &this->dbterm, &hp, 2);
if (is_value(ret)) {
sc->accum = CONS(hp, ret, sc->accum);
}
@@ -3316,7 +3809,8 @@ static int doit_select(DbTableTree *tb, TreeDbTerm *this, void *ptr,
return 1;
}
-static int doit_select_count(DbTableTree *tb, TreeDbTerm *this, void *ptr,
+static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this,
+ struct select_common* ptr,
int forward)
{
struct select_count_context *sc = (struct select_count_context *) ptr;
@@ -3330,7 +3824,7 @@ static int doit_select_count(DbTableTree *tb, TreeDbTerm *this, void *ptr,
GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0)) {
return 0;
}
- ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &this->dbterm, NULL, 0);
+ ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, NULL, 0);
if (ret == am_true) {
++(sc->got);
}
@@ -3340,7 +3834,8 @@ static int doit_select_count(DbTableTree *tb, TreeDbTerm *this, void *ptr,
return 1;
}
-static int doit_select_chunk(DbTableTree *tb, TreeDbTerm *this, void *ptr,
+static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this,
+ struct select_common* ptr,
int forward)
{
struct select_context *sc = (struct select_context *) ptr;
@@ -3359,7 +3854,7 @@ static int doit_select_chunk(DbTableTree *tb, TreeDbTerm *this, void *ptr,
return 0;
}
- ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &this->dbterm, &hp, 2);
+ ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, &hp, 2);
if (is_value(ret)) {
++(sc->got);
sc->accum = CONS(hp, ret, sc->accum);
@@ -3371,7 +3866,8 @@ static int doit_select_chunk(DbTableTree *tb, TreeDbTerm *this, void *ptr,
}
-static int doit_select_delete(DbTableTree *tb, TreeDbTerm *this, void *ptr,
+static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this,
+ struct select_common *ptr,
int forward)
{
struct select_delete_context *sc = (struct select_delete_context *) ptr;
@@ -3379,7 +3875,7 @@ static int doit_select_delete(DbTableTree *tb, TreeDbTerm *this, void *ptr,
Eterm key;
if (sc->erase_lastterm)
- free_term(tb, sc->lastterm);
+ free_term((DbTable*)tb, sc->lastterm);
sc->erase_lastterm = 0;
sc->lastterm = this;
@@ -3387,10 +3883,10 @@ static int doit_select_delete(DbTableTree *tb, TreeDbTerm *this, void *ptr,
cmp_partly_bound(sc->end_condition,
GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0)
return 0;
- ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &this->dbterm, NULL, 0);
+ ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, NULL, 0);
if (ret == am_true) {
key = GETKEY(sc->tb, this->dbterm.tpl);
- linkout_tree(sc->tb, key);
+ linkout_tree(sc->tb, sc->common.root, key, sc->stack);
sc->erase_lastterm = 1;
++sc->accum;
}
@@ -3400,7 +3896,8 @@ static int doit_select_delete(DbTableTree *tb, TreeDbTerm *this, void *ptr,
return 1;
}
-static int doit_select_replace(DbTableTree *tb, TreeDbTerm **this, void *ptr,
+static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this,
+ struct select_common* ptr,
int forward)
{
struct select_replace_context *sc = (struct select_replace_context *) ptr;
@@ -3414,13 +3911,13 @@ static int doit_select_replace(DbTableTree *tb, TreeDbTerm **this, void *ptr,
GETKEY_WITH_POS(sc->keypos, (*this)->dbterm.tpl)) > 0)) {
return 0;
}
- ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &(*this)->dbterm, NULL, 0);
+ ret = db_match_dbterm(tb, sc->p, sc->mp, &(*this)->dbterm, NULL, 0);
if (is_value(ret)) {
TreeDbTerm* new;
TreeDbTerm* old = *this;
#ifdef DEBUG
- Eterm key = db_getkey(tb->common.keypos, ret);
+ Eterm key = db_getkey(tb->keypos, ret);
ASSERT(is_value(key));
ASSERT(cmp_key(tb, key, old) == 0);
#endif
@@ -3430,7 +3927,7 @@ static int doit_select_replace(DbTableTree *tb, TreeDbTerm **this, void *ptr,
new->balance = old->balance;
sc->lastobj = new->dbterm.tpl;
*this = new;
- free_term(tb, old);
+ free_term((DbTable*)tb, old);
++(sc->replaced);
}
if (--(sc->max) <= 0) {
@@ -3439,8 +3936,14 @@ static int doit_select_replace(DbTableTree *tb, TreeDbTerm **this, void *ptr,
return 1;
}
+void
+erts_db_foreach_thr_prgr_offheap_tree(void (*func)(ErlOffHeap *, void *),
+ void *arg)
+{
+}
+
#ifdef TREE_DEBUG
-static void do_dump_tree2(DbTableTree* tb, int to, void *to_arg, int show,
+static void do_dump_tree2(DbTableCommon* tb, int to, void *to_arg, int show,
TreeDbTerm *t, int offset)
{
if (t == NULL)
@@ -3449,7 +3952,7 @@ static void do_dump_tree2(DbTableTree* tb, int to, void *to_arg, int show,
if (show) {
const char* prefix;
Eterm term;
- if (tb->common.compress) {
+ if (tb->compress) {
prefix = "key=";
term = GETKEY(tb, t->dbterm.tpl);
}
@@ -3504,7 +4007,7 @@ static void check_slot_pos(DbTableTree *tb)
"element position %d is really 0x%08X, when stack says "
"it's 0x%08X\n", tb->stack.slot, t,
tb->stack.array[tb->stack.pos - 1]);
- do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
+ do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
}
}
@@ -3519,14 +4022,14 @@ static void check_saved_stack(DbTableTree *tb)
if (t != stack->array[0]) {
erts_fprintf(stderr,"tb->stack[0] is 0x%08X, should be 0x%08X\n",
stack->array[0], t);
- do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
+ do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
return;
}
while (n < stack->pos) {
if (t == NULL) {
erts_fprintf(stderr, "NULL pointer in tree when stack not empty,"
" stack depth is %d\n", n);
- do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
+ do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
return;
}
n++;
@@ -3540,7 +4043,7 @@ static void check_saved_stack(DbTableTree *tb)
"represent child pointer in tree!"
"(left == 0x%08X, right == 0x%08X\n",
n, tb->stack[n], t->left, t->right);
- do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
+ do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
return;
}
}
@@ -3559,7 +4062,7 @@ static int check_table_tree(DbTableTree* tb, TreeDbTerm *t)
erts_fprintf(stderr,"balance = %d, left = 0x%08X, right = 0x%08X\n",
t->balance, t->left, t->right);
erts_fprintf(stderr,"\nDump:\n---------------------------------\n");
- do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, t, 0);
+ do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, t, 0);
erts_fprintf(stderr,"\n---------------------------------\n");
}
return ((rh > lh) ? rh : lh) + 1;