aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSverker Eriksson <[email protected]>2018-10-31 16:31:34 +0100
committerGitHub <[email protected]>2018-10-31 16:31:34 +0100
commitfa6994f5c8384a15196121fce2983291c9c21245 (patch)
tree0b01c4e37ce21c5863d7607214ca097445ed673d
parenta03b0b0298208216481fdc2a373e5cda893535d2 (diff)
parentcc18836780d7d047bf53b1ff8d94a6b31b58f98a (diff)
downloadotp-fa6994f5c8384a15196121fce2983291c9c21245.tar.gz
otp-fa6994f5c8384a15196121fce2983291c9c21245.tar.bz2
otp-fa6994f5c8384a15196121fce2983291c9c21245.zip
Merge PR-1997 from sverker/erts/ordered_set-select-improvements/OTP-15325
Even more scalable ETS ordered_set with write_concurrency
-rw-r--r--erts/emulator/beam/erl_alloc.c2
-rw-r--r--erts/emulator/beam/erl_bif_binary.c3
-rw-r--r--erts/emulator/beam/erl_bif_info.c8
-rw-r--r--erts/emulator/beam/erl_db.c28
-rw-r--r--erts/emulator/beam/erl_db.h1
-rw-r--r--erts/emulator/beam/erl_db_catree.c1414
-rw-r--r--erts/emulator/beam/erl_db_catree.h35
-rw-r--r--erts/emulator/beam/erl_db_hash.c4
-rw-r--r--erts/emulator/beam/erl_db_tree.c1081
-rw-r--r--erts/emulator/beam/erl_db_tree_util.h49
-rw-r--r--erts/emulator/beam/erl_db_util.h15
-rw-r--r--erts/emulator/beam/erl_lock_check.c34
-rw-r--r--lib/stdlib/doc/src/ets.xml14
-rw-r--r--lib/stdlib/src/stdlib.app.src2
-rw-r--r--lib/stdlib/test/ets_SUITE.erl512
15 files changed, 1847 insertions, 1355 deletions
diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c
index 1d4b33c598..e6169ebeaa 100644
--- a/erts/emulator/beam/erl_alloc.c
+++ b/erts/emulator/beam/erl_alloc.c
@@ -4041,7 +4041,7 @@ debug_realloc(ErtsAlcType_t type, void *extra, void *ptr, Uint size)
erts_hdbg_chk_blks();
#endif
- if (old_size > size)
+ if (ptr && old_size > size)
sys_memset((void *) (((char *) ptr) + size),
0xf,
sizeof(Uint) + old_size - size);
diff --git a/erts/emulator/beam/erl_bif_binary.c b/erts/emulator/beam/erl_bif_binary.c
index 5b3f091ccc..d465f6c6b9 100644
--- a/erts/emulator/beam/erl_bif_binary.c
+++ b/erts/emulator/beam/erl_bif_binary.c
@@ -1051,14 +1051,13 @@ static int do_binary_match_compile(Eterm argument, Eterm *tag, Binary **binp)
Uint bitoffs, bitsize;
byte *temp_alloc = NULL;
MyAllocator my;
- BMData *bmd;
Binary *bin;
ERTS_GET_BINARY_BYTES(comp_term, bytes, bitoffs, bitsize);
if (bitoffs != 0) {
bytes = erts_get_aligned_binary_bytes(comp_term, &temp_alloc);
}
- bmd = create_bmdata(&my, bytes, characters, &bin);
+ create_bmdata(&my, bytes, characters, &bin);
erts_free_aligned_binary_bytes(temp_alloc);
CHECK_ALLOCATOR(my);
*tag = am_bm;
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index 2a8e7e8858..3b0f0d33fa 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -4669,6 +4669,14 @@ BIF_RETTYPE erts_debug_set_internal_state_2(BIF_ALIST_2)
BIF_RET(am_notsup);
#endif
}
+ else if (ERTS_IS_ATOM_STR("ets_force_split", BIF_ARG_1)) {
+ if (is_tuple(BIF_ARG_2)) {
+ Eterm* tpl = tuple_val(BIF_ARG_2);
+
+ if (erts_ets_force_split(tpl[1], tpl[2] == am_true))
+ BIF_RET(am_ok);
+ }
+ }
}
BIF_ERROR(BIF_P, BADARG);
diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c
index 3653c0bf7c..df6f42edd3 100644
--- a/erts/emulator/beam/erl_db.c
+++ b/erts/emulator/beam/erl_db.c
@@ -4247,9 +4247,20 @@ static Eterm table_info(Process* p, DbTable* tb, Eterm What)
make_small(stats.max_chain_len),
make_small(stats.kept_items));
}
- else {
+ else if (IS_CATREE_TABLE(tb->common.status)) {
+ DbCATreeStats stats;
+ Eterm* hp;
+
+ db_calc_stats_catree(&tb->catree, &stats);
+ hp = HAlloc(p, 4);
+ ret = TUPLE3(hp,
+ make_small(stats.route_nodes),
+ make_small(stats.base_nodes),
+ make_small(stats.max_depth));
+
+ }
+ else
ret = am_false;
- }
}
return ret;
}
@@ -4454,3 +4465,16 @@ void erts_lcnt_update_db_locks(int enable) {
#ifdef ETS_DBG_FORCE_TRAP
erts_aint_t erts_ets_dbg_force_trap = 0;
#endif
+
+int erts_ets_force_split(Eterm tid, int on)
+{
+ DbTable* tb = tid2tab(tid);
+ if (!tb || !IS_CATREE_TABLE(tb->common.type))
+ return 0;
+
+ db_lock(tb, LCK_WRITE);
+ if (!(tb->common.status & DB_DELETE))
+ db_catree_force_split(&tb->catree, on);
+ db_unlock(tb, LCK_WRITE);
+ return 1;
+}
diff --git a/erts/emulator/beam/erl_db.h b/erts/emulator/beam/erl_db.h
index 7a915ccea2..5955d42aae 100644
--- a/erts/emulator/beam/erl_db.h
+++ b/erts/emulator/beam/erl_db.h
@@ -130,6 +130,7 @@ extern Export ets_select_continue_exp;
extern erts_atomic_t erts_ets_misc_mem_size;
Eterm erts_ets_colliding_names(Process*, Eterm name, Uint cnt);
+int erts_ets_force_split(Eterm tid, int on);
Uint erts_db_get_max_tabs(void);
Eterm erts_db_make_tid(Process *c_p, DbTableCommon *tb);
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c
index a8e48bce1b..b52a4a53fe 100644
--- a/erts/emulator/beam/erl_db_catree.c
+++ b/erts/emulator/beam/erl_db_catree.c
@@ -96,6 +96,12 @@
#include "erl_db_tree.h"
#include "erl_db_tree_util.h"
+#ifdef DEBUG
+# define IF_DEBUG(X) X
+#else
+# define IF_DEBUG(X)
+#endif
+
/*
** Forward declarations
*/
@@ -158,6 +164,14 @@ db_lookup_dbterm_catree(Process *, DbTable *, Eterm key, Eterm obj,
DbUpdateHandle*);
static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *);
+static void split_catree(DbTableCATree *tb,
+ DbTableCATreeNode* ERTS_RESTRICT base,
+ DbTableCATreeNode* ERTS_RESTRICT parent);
+static void join_catree(DbTableCATree *tb,
+ DbTableCATreeNode *thiz,
+ DbTableCATreeNode *parent);
+
+
/*
** External interface
*/
@@ -241,30 +255,6 @@ static ERTS_INLINE Sint cmp_key_route(Eterm key,
return CMP(key, GET_ROUTE_NODE_KEY(obj));
}
-static ERTS_INLINE void push_node_dyn_array(DbTable *tb,
- CATreeNodeStack *stack,
- DbTableCATreeNode *node)
-{
- int i;
- if (stack->pos == stack->size) {
- DbTableCATreeNode **newArray =
- erts_db_alloc(ERTS_ALC_T_DB_STK, tb,
- sizeof(DbTableCATreeNode*) * (stack->size*2));
- for (i = 0; i < stack->pos; i++) {
- newArray[i] = stack->array[i];
- }
- if (stack->size > STACK_NEED) {
- /* Dynamically allocated array that needs to be deallocated */
- erts_db_free(ERTS_ALC_T_DB_STK, tb,
- stack->array,
- sizeof(DbTableCATreeNode *) * stack->size);
- }
- stack->array = newArray;
- stack->size = stack->size*2;
- }
- PUSH_NODE(stack, node);
-}
-
/*
* Used by the split_tree function
*/
@@ -578,14 +568,14 @@ static TreeDbTerm* join_trees(TreeDbTerm *left_root_param,
p = *this;
if (dir == DIR_LEFT) {
switch (p->balance) {
- case 1:
+ case 1:
p->balance = 0;
state = 0;
break;
- case 0:
+ case 0:
p->balance = -1;
break;
- case -1: /* The icky case */
+ case -1: /* The icky case */
p1 = p->left;
if (p1->balance == -1) { /* Single LL rotation */
p->left = p1->right;
@@ -608,14 +598,14 @@ static TreeDbTerm* join_trees(TreeDbTerm *left_root_param,
}
} else { /* dir == DIR_RIGHT */
switch (p->balance) {
- case -1:
+ case -1:
p->balance = 0;
state = 0;
break;
- case 0:
+ case 0:
p->balance = 1;
break;
- case 1:
+ case 1:
p1 = p->right;
if (p1->balance == 1) { /* Single RR rotation */
p->right = p1->left;
@@ -646,6 +636,31 @@ static TreeDbTerm* join_trees(TreeDbTerm *left_root_param,
}
}
+#ifdef DEBUG
+# define FORCE_RANDOM_SPLIT_JOIN
+#endif
+#ifdef FORCE_RANDOM_SPLIT_JOIN
+static int dbg_fastrand(void)
+{
+ static int g_seed = 648835;
+ g_seed = (214013*g_seed+2531011);
+ return (g_seed>>16)&0x7FFF;
+}
+
+static void dbg_maybe_force_splitjoin(DbTableCATreeNode* base_node)
+{
+ switch (dbg_fastrand() % 8) {
+ case 1:
+ base_node->u.base.lock_statistics = 1+ERL_DB_CATREE_HIGH_CONTENTION_LIMIT;
+ break;
+ case 2:
+ base_node->u.base.lock_statistics = -1+ERL_DB_CATREE_LOW_CONTENTION_LIMIT;
+ break;
+ }
+}
+#else
+# define dbg_maybe_force_splitjoin(N)
+#endif /* FORCE_RANDOM_SPLIT_JOIN */
static ERTS_INLINE
int try_wlock_base_node(DbTableCATreeBaseNode *base_node)
@@ -657,9 +672,10 @@ int try_wlock_base_node(DbTableCATreeBaseNode *base_node)
* Locks a base node without adjusting the lock statistics
*/
static ERTS_INLINE
-void wlock_base_node_no_stats(DbTableCATreeBaseNode *base_node)
+void wlock_base_node_no_stats(DbTableCATreeNode *base_node)
{
- erts_rwmtx_rwlock(&base_node->lock);
+ ASSERT(base_node->is_base_node);
+ erts_rwmtx_rwlock(&base_node->u.base.lock);
}
/*
@@ -667,33 +683,57 @@ void wlock_base_node_no_stats(DbTableCATreeBaseNode *base_node)
* the lock was contended or not
*/
static ERTS_INLINE
-void wlock_base_node(DbTableCATreeBaseNode *base_node)
+void wlock_base_node(DbTableCATreeNode *base_node)
{
- if (try_wlock_base_node(base_node)) {
+ ASSERT(base_node->is_base_node);
+ if (try_wlock_base_node(&base_node->u.base)) {
/* The lock is contended */
wlock_base_node_no_stats(base_node);
- base_node->lock_statistics += ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION;
+ base_node->u.base.lock_statistics += ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION;
} else {
- base_node->lock_statistics += ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION;
+ base_node->u.base.lock_statistics += ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION;
}
}
static ERTS_INLINE
-void wunlock_base_node(DbTableCATreeBaseNode *base_node)
+void wunlock_base_node(DbTableCATreeNode *base_node)
{
- erts_rwmtx_rwunlock(&base_node->lock);
+ erts_rwmtx_rwunlock(&base_node->u.base.lock);
}
static ERTS_INLINE
-void rlock_base_node(DbTableCATreeBaseNode *base_node)
+void wunlock_adapt_base_node(DbTableCATree* tb,
+ DbTableCATreeNode* node,
+ DbTableCATreeNode* parent,
+ int current_level)
+{
+ dbg_maybe_force_splitjoin(node);
+ if ((!node->u.base.root && parent && !(tb->common.status
+ & DB_CATREE_FORCE_SPLIT))
+ || node->u.base.lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) {
+ join_catree(tb, node, parent);
+ }
+ else if (node->u.base.lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT
+ && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) {
+ split_catree(tb, node, parent);
+ }
+ else {
+ wunlock_base_node(node);
+ }
+}
+
+static ERTS_INLINE
+void rlock_base_node(DbTableCATreeNode *base_node)
{
- erts_rwmtx_rlock(&base_node->lock);
+ ASSERT(base_node->is_base_node);
+ erts_rwmtx_rlock(&base_node->u.base.lock);
}
static ERTS_INLINE
-void runlock_base_node(DbTableCATreeBaseNode *base_node)
+void runlock_base_node(DbTableCATreeNode *base_node)
{
- erts_rwmtx_runlock(&base_node->lock);
+ ASSERT(base_node->is_base_node);
+ erts_rwmtx_runlock(&base_node->u.base.lock);
}
static ERTS_INLINE
@@ -710,80 +750,121 @@ void unlock_route_node(DbTableCATreeNode *route_node)
erts_mtx_unlock(&route_node->u.route.lock);
}
+static ERTS_INLINE
+void init_root_iterator(DbTableCATree* tb, CATreeRootIterator* iter,
+ int read_only)
+{
+ iter->tb = tb;
+ iter->read_only = read_only;
+ iter->locked_bnode = NULL;
+ iter->next_route_key = THE_NON_VALUE;
+ iter->search_key = NULL;
+}
-/*
- * The following macros are used to create the ETS functions that only
- * need to access one element (e.g. db_put_catree, db_get_catree and
- * db_erase_catree).
- */
+static ERTS_INLINE
+void lock_iter_base_node(CATreeRootIterator* iter,
+ DbTableCATreeNode *base_node,
+ DbTableCATreeNode *parent,
+ int current_level)
+{
+ ASSERT(!iter->locked_bnode);
+ if (iter->read_only)
+ rlock_base_node(base_node);
+ else {
+ wlock_base_node(base_node);
+ iter->bnode_parent = parent;
+ iter->bnode_level = current_level;
+ }
+ iter->locked_bnode = base_node;
+}
-#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(LOCK,UNLOCK) \
- int retry; \
- DbTableCATreeNode *current_node; \
- DbTableCATreeNode *prev_node; \
- DbTableCATreeBaseNode *base_node; \
- int current_level; \
- (void)prev_node; \
- do { \
- retry = 0; \
- current_node = GET_ROOT_ACQB(tb); \
- prev_node = NULL; \
- current_level = 0; \
- while ( ! current_node->is_base_node ) { \
- current_level = current_level + 1; \
- prev_node = current_node; \
- if (cmp_key_route(key,current_node) < 0) { \
- current_node = GET_LEFT_ACQB(current_node); \
- } else { \
- current_node = GET_RIGHT_ACQB(current_node); \
- } \
- } \
- base_node = &current_node->u.base; \
- LOCK(base_node); \
- if ( ! base_node->is_valid ) { \
- /* Retry */ \
- UNLOCK(base_node); \
- retry = 1; \
- } \
- } while(retry);
-
-
-#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \
- if (base_node->lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT \
- && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) { \
- split_catree(tb, prev_node, current_node); \
- } else if (base_node->lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) { \
- join_catree(tb, prev_node, current_node); \
- } else { \
- wunlock_base_node(base_node); \
+static ERTS_INLINE
+void unlock_iter_base_node(CATreeRootIterator* iter)
+{
+ ASSERT(iter->locked_bnode);
+ if (iter->read_only)
+ runlock_base_node(iter->locked_bnode);
+ else if (iter->locked_bnode->u.base.is_valid) {
+ wunlock_adapt_base_node(iter->tb, iter->locked_bnode,
+ iter->bnode_parent, iter->bnode_level);
}
+ else
+ wunlock_base_node(iter->locked_bnode);
+ iter->locked_bnode = NULL;
+}
+
+static ERTS_INLINE
+void destroy_root_iterator(CATreeRootIterator* iter)
+{
+ if (iter->locked_bnode)
+ unlock_iter_base_node(iter);
+ if (iter->search_key)
+ erts_free(ERTS_ALC_T_DB_TMP, iter->search_key);
+}
-#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \
- static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \
- Eterm key, \
- PARAMETERS){ \
- int result; \
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node) \
- result = SEQUENTAIL_CALL; \
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \
- return result; \
+
+typedef struct
+{
+ DbTableCATreeNode *parent;
+ int current_level;
+} FindBaseNode;
+
+static ERTS_INLINE
+DbTableCATreeNode* find_base_node(DbTableCATree* tb, Eterm key,
+ FindBaseNode* fbn)
+{
+ DbTableCATreeNode* ERTS_RESTRICT node = GET_ROOT_ACQB(tb);
+ if (fbn) {
+ fbn->parent = NULL;
+ fbn->current_level = 0;
}
+ while (!node->is_base_node) {
+ if (fbn) {
+ fbn->current_level++;
+ fbn->parent = node;
+ }
+ if (cmp_key_route(key, node) < 0) {
+ node = GET_LEFT_ACQB(node);
+ } else {
+ node = GET_RIGHT_ACQB(node);
+ }
+ }
+ return node;
+}
+static ERTS_INLINE
+DbTableCATreeNode* find_rlock_valid_base_node(DbTableCATree* tb, Eterm key)
+{
+ DbTableCATreeNode* base_node;
-#define ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \
- static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \
- Eterm key, \
- PARAMETERS){ \
- int result; \
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(rlock_base_node,runlock_base_node) \
- result = SEQUENTAIL_CALL; \
- runlock_base_node(base_node); \
- return result; \
+ while (1) {
+ base_node = find_base_node(tb, key, NULL);
+ rlock_base_node(base_node);
+ if (base_node->u.base.is_valid)
+ break;
+ runlock_base_node(base_node);
}
+ return base_node;
+}
+
+static ERTS_INLINE
+DbTableCATreeNode* find_wlock_valid_base_node(DbTableCATree* tb, Eterm key,
+ FindBaseNode* fbn)
+{
+ DbTableCATreeNode* base_node;
+ while (1) {
+ base_node = find_base_node(tb, key, fbn);
+ wlock_base_node(base_node);
+ if (base_node->u.base.is_valid)
+ break;
+ wunlock_base_node(base_node);
+ }
+ return base_node;
+}
static ERTS_INLINE
-void copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size)
+Eterm copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size)
{
dst->size = key_size;
if (key_size != 0) {
@@ -798,6 +879,7 @@ void copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size)
dst->term = key;
dst->oh = NULL;
}
+ return dst->term;
}
static ERTS_INLINE
@@ -812,27 +894,21 @@ void destroy_route_key(DbRouteKey* key)
#ifdef ERTS_ENABLE_LOCK_CHECK
-# define sizeof_base_node(KEY_SZ) \
- (offsetof(DbTableCATreeNode, u.base.lc_key.heap) \
- + (KEY_SZ)*sizeof(Eterm))
# define LC_ORDER(ORDER) ORDER
#else
-# define sizeof_base_node(KEY_SZ) \
- offsetof(DbTableCATreeNode, u.base.end_of_struct__)
# define LC_ORDER(ORDER) NIL
#endif
+#define sizeof_base_node() \
+ offsetof(DbTableCATreeNode, u.base.end_of_struct__)
+
static DbTableCATreeNode *create_base_node(DbTableCATree *tb,
- TreeDbTerm* root,
- Eterm lc_key)
+ TreeDbTerm* root)
{
DbTableCATreeNode *p;
erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
-#ifdef ERTS_ENABLE_LOCK_CHECK
- Eterm lc_key_size = size_object(lc_key);
-#endif
p = erts_db_alloc(ERTS_ALC_T_DB_TABLE, (DbTable *) tb,
- sizeof_base_node(lc_key_size));
+ sizeof_base_node());
p->is_base_node = 1;
p->u.base.root = root;
@@ -841,32 +917,16 @@ static DbTableCATreeNode *create_base_node(DbTableCATree *tb,
if (erts_ets_rwmtx_spin_count >= 0)
rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
-#ifdef ERTS_ENABLE_LOCK_CHECK
- copy_route_key(&p->u.base.lc_key, lc_key, lc_key_size);
-#endif
erts_rwmtx_init_opt(&p->u.base.lock, &rwmtx_opt,
"erl_db_catree_base_node",
- lc_key,
+ NIL,
ERTS_LOCK_FLAGS_CATEGORY_DB);
- p->u.base.lock_statistics = 0;
+ p->u.base.lock_statistics = ((tb->common.status & DB_CATREE_FORCE_SPLIT)
+ ? INT_MAX : 0);
p->u.base.is_valid = 1;
return p;
}
-static ERTS_INLINE
-DbTableCATreeNode *create_wlocked_base_node(DbTableCATree *tb,
- TreeDbTerm* root,
- Eterm lc_key)
-{
- DbTableCATreeNode* p = create_base_node(tb, root, lc_key);
- ethr_rwmutex_rwlock(&p->u.base.lock.rwmtx);
-#ifdef ERTS_ENABLE_LOCK_CHECK
- erts_lc_trylock_flg(-1, &p->u.base.lock.lc, ERTS_LOCK_OPTIONS_RDWR);
-#endif
- return p;
-}
-
-
static ERTS_INLINE Uint sizeof_route_node(Uint key_size)
{
return (offsetof(DbTableCATreeNode, u.route.key.heap)
@@ -913,16 +973,13 @@ static void do_free_base_node(void* vptr)
DbTableCATreeNode *p = (DbTableCATreeNode *)vptr;
ASSERT(p->is_base_node);
erts_rwmtx_destroy(&p->u.base.lock);
-#ifdef ERTS_ENABLE_LOCK_CHECK
- destroy_route_key(&p->u.base.lc_key);
-#endif
erts_free(ERTS_ALC_T_DB_TABLE, p);
}
static void free_catree_base_node(DbTableCATree* tb, DbTableCATreeNode* p)
{
ASSERT(p->is_base_node);
- ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_base_node(p->u.base.lc_key.size), 0);
+ ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_base_node(), 0);
do_free_base_node(p);
}
@@ -1014,154 +1071,6 @@ rightmost_route_node(DbTableCATreeNode *root)
return prev_node;
}
-static ERTS_INLINE DbTableCATreeNode*
-leftmost_base_node_and_path(DbTableCATreeNode *root, CATreeNodeStack * stack)
-{
- DbTableCATreeNode * node = root;
- while (!node->is_base_node) {
- PUSH_NODE(stack, node);
- node = GET_LEFT_ACQB(node);
- }
- return node;
-}
-
-static ERTS_INLINE DbTableCATreeNode*
-get_next_base_node_and_path(DbTableCATreeNode *base_node,
- CATreeNodeStack *stack)
-{
- if (EMPTY_NODE(stack)) { /* The parent of b is the root */
- return NULL;
- } else {
- if (GET_LEFT(TOP_NODE(stack)) == base_node) {
- return leftmost_base_node_and_path(
- GET_RIGHT_ACQB(TOP_NODE(stack)),
- stack);
- } else {
- Eterm pkey =
- TOP_NODE(stack)->u.route.key.term; /* pKey = key of parent */
- POP_NODE(stack);
- while (!EMPTY_NODE(stack)) {
- if (TOP_NODE(stack)->u.route.is_valid &&
- cmp_key_route(pkey, TOP_NODE(stack)) <= 0) {
- return leftmost_base_node_and_path(GET_RIGHT_ACQB(TOP_NODE(stack)), stack);
- } else {
- POP_NODE(stack);
- }
- }
- }
- return NULL;
- }
-}
-
-static ERTS_INLINE void
-clone_stack(CATreeNodeStack *search_stack_ptr,
- CATreeNodeStack *search_stack_copy_ptr)
-{
- int i;
- search_stack_copy_ptr->pos = search_stack_ptr->pos;
- for (i = 0; i < search_stack_ptr->pos; i++) {
- search_stack_copy_ptr->array[i] = search_stack_ptr->array[i];
- }
-}
-
-
-
-static ERTS_INLINE DbTableCATreeNode*
-lock_first_base_node(DbTable *tbl,
- CATreeNodeStack *search_stack_ptr,
- CATreeNodeStack *locked_base_nodes_stack_ptr)
-{
- DbTableCATreeNode *current_node;
- DbTableCATreeBaseNode *base_node;
- DbTableCATree* tb = &tbl->catree;
- while (1) {
- current_node = GET_ROOT_ACQB(tb);
- while ( ! current_node->is_base_node ) {
- PUSH_NODE(search_stack_ptr, current_node);
- current_node = GET_LEFT_ACQB(current_node);
- }
- base_node = &current_node->u.base;
- rlock_base_node(base_node);
- if (base_node->is_valid)
- break;
- /* Retry */
- runlock_base_node(base_node);
- search_stack_ptr->pos = 0;
- }
- push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
- return current_node;
-}
-
-static ERTS_INLINE DbTableCATreeBaseNode*
-find_and_lock_next_base_node_and_path(DbTable *tbl,
- CATreeNodeStack **search_stack_ptr_ptr,
- CATreeNodeStack **search_stack_copy_ptr_ptr,
- CATreeNodeStack *locked_base_nodes_stack_ptr)
-{
- DbTableCATreeNode *current_node;
- DbTableCATreeBaseNode *base_node;
- CATreeNodeStack * tmp_stack_ptr;
-
- while (1) {
- current_node = TOP_NODE(locked_base_nodes_stack_ptr);
- clone_stack(*search_stack_ptr_ptr, *search_stack_copy_ptr_ptr);
- current_node =
- get_next_base_node_and_path(current_node, *search_stack_ptr_ptr);
- if (current_node == NULL) {
- return NULL;
- }
- base_node = &current_node->u.base;
- rlock_base_node(base_node);
- if (base_node->is_valid)
- break;
-
- /* Retry */
- runlock_base_node(base_node);
- /* Revert to previous state */
- current_node = TOP_NODE(locked_base_nodes_stack_ptr);
- tmp_stack_ptr = *search_stack_ptr_ptr;
- *search_stack_ptr_ptr = *search_stack_copy_ptr_ptr;
- *search_stack_copy_ptr_ptr = tmp_stack_ptr;
- }
-
- push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
- return base_node;
-}
-
-static ERTS_INLINE
-void unlock_and_release_locked_base_node_stack(DbTable *tbl,
- CATreeNodeStack *locked_base_nodes_stack_ptr)
-{
- DbTableCATreeNode *current_node;
- DbTableCATreeBaseNode *base_node;
- int i;
- for (i = 0; i < locked_base_nodes_stack_ptr->pos; i++) {
- current_node = locked_base_nodes_stack_ptr->array[i];
- base_node = &current_node->u.base;
- if (locked_base_nodes_stack_ptr->pos > 1) {
- base_node->lock_statistics = /* This is not atomic which is fine as */
- base_node->lock_statistics + /* correctness does not depend on that. */
- ERL_DB_CATREE_LOCK_MORE_THAN_ONE_CONTRIBUTION;
- }
- runlock_base_node(base_node);
- }
- if (locked_base_nodes_stack_ptr->size > STACK_NEED) {
- erts_db_free(ERTS_ALC_T_DB_STK, tbl,
- locked_base_nodes_stack_ptr->array,
- sizeof(DbTableCATreeNode *) * locked_base_nodes_stack_ptr->size);
- }
-}
-
-static ERTS_INLINE
-void init_stack(CATreeNodeStack *stack,
- DbTableCATreeNode **stack_array,
- Uint init_size)
-{
- stack->array = stack_array;
- stack->pos = 0;
- stack->size = init_size;
-}
-
static ERTS_INLINE
void init_tree_stack(DbTreeStack *stack,
TreeDbTerm **stack_array,
@@ -1172,194 +1081,9 @@ void init_tree_stack(DbTreeStack *stack,
stack->slot = init_slot;
}
-#define DEC_ROUTE_NODE_STACK_AND_ARRAY(STACK_NAME) \
- CATreeNodeStack STACK_NAME; \
- CATreeNodeStack * STACK_NAME##_ptr = &(STACK_NAME); \
- DbTableCATreeNode *STACK_NAME##_array[STACK_NEED];
-
-#define DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS \
-DEC_ROUTE_NODE_STACK_AND_ARRAY(search_stack) \
-DEC_ROUTE_NODE_STACK_AND_ARRAY(search_stack_copy) \
-DEC_ROUTE_NODE_STACK_AND_ARRAY(locked_base_nodes_stack) \
-init_stack(&search_stack, search_stack_array, 0); \
-init_stack(&search_stack_copy, search_stack_copy_array, 0); \
-init_stack(&locked_base_nodes_stack, locked_base_nodes_stack_array, STACK_NEED);/* Abuse as stack array size*/
-
-/*
- * Locks and returns the base node that contains the specified key if
- * it is present. The function saves the search path to the found base
- * node in search_stack_ptr and adds the found base node to
- * locked_base_nodes_stack_ptr.
- */
-static ERTS_INLINE DbTableCATreeBaseNode *
-lock_base_node_with_key(DbTable *tbl,
- Eterm key,
- CATreeNodeStack * search_stack_ptr,
- CATreeNodeStack * locked_base_nodes_stack_ptr)
-{
- int retry;
- DbTableCATreeNode *current_node;
- DbTableCATreeBaseNode *base_node;
- DbTableCATree* tb = &tbl->catree;
- do {
- retry = 0;
- current_node = GET_ROOT_ACQB(tb);
- while ( ! current_node->is_base_node ) {
- PUSH_NODE(search_stack_ptr, current_node);
- if( cmp_key_route(key,current_node) < 0 ) {
- current_node = GET_LEFT_ACQB(current_node);
- } else {
- current_node = GET_RIGHT_ACQB(current_node);
- }
- }
- base_node = &current_node->u.base;
- rlock_base_node(base_node);
- if ( ! base_node->is_valid ) {
- /* Retry */
- runlock_base_node(base_node);
- retry = 1;
- }
- } while(retry);
- push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
- return base_node;
-}
-
-/*
- * Joins a base node with it's right neighbor. Returns the base node
- * resulting from the join in locked state or NULL if there is no base
- * node to join with.
- */
-static DbTableCATreeNode*
-erl_db_catree_force_join_right(DbTableCATree *tb,
- DbTableCATreeNode *parent,
- DbTableCATreeNode *thiz,
- DbTableCATreeNode **result_parent_wb)
-{
- DbTableCATreeNode *gparent;
- DbTableCATreeNode *neighbor;
- DbTableCATreeNode *new_neighbor;
- DbTableCATreeNode *neighbor_parent;
- TreeDbTerm* new_root;
-
- if (parent == NULL) {
- return NULL;
- }
- ASSERT(thiz == GET_LEFT(parent));
- while (1) {
- neighbor = leftmost_base_node(GET_RIGHT_ACQB(parent));
- wlock_base_node_no_stats(&neighbor->u.base);
- if (neighbor->u.base.is_valid)
- break;
- wunlock_base_node(&neighbor->u.base);
- }
- lock_route_node(parent);
- parent->u.route.is_valid = 0;
- neighbor->u.base.is_valid = 0;
- thiz->u.base.is_valid = 0;
- while (1) {
- gparent = parent_of(tb, parent);
- if (gparent == NULL)
- break;
- lock_route_node(gparent);
- if (gparent->u.route.is_valid)
- break;
- unlock_route_node(gparent);
- }
- if (gparent == NULL) {
- SET_ROOT_RELB(tb, GET_RIGHT(parent));
- } else if (GET_LEFT(gparent) == parent) {
- SET_LEFT_RELB(gparent, GET_RIGHT(parent));
- } else {
- SET_RIGHT_RELB(gparent, GET_RIGHT(parent));
- }
- unlock_route_node(parent);
- if (gparent != NULL) {
- unlock_route_node(gparent);
- }
-
- new_root = join_trees(thiz->u.base.root, neighbor->u.base.root);
- new_neighbor = create_wlocked_base_node(tb, new_root,
- LC_ORDER(thiz->u.base.lc_key.term));
-
- if (GET_RIGHT(parent) == neighbor) {
- neighbor_parent = gparent;
- } else {
- neighbor_parent = leftmost_route_node(GET_RIGHT(parent));
- }
- if(neighbor_parent == NULL) {
- SET_ROOT_RELB(tb, new_neighbor);
- } else if (GET_LEFT(neighbor_parent) == neighbor) {
- SET_LEFT_RELB(neighbor_parent, new_neighbor);
- } else {
- SET_RIGHT_RELB(neighbor_parent, new_neighbor);
- }
- wunlock_base_node(&thiz->u.base);
- wunlock_base_node(&neighbor->u.base);
- /* Free the parent and base */
- erts_schedule_db_free(&tb->common,
- do_free_route_node,
- parent,
- &parent->u.route.free_item,
- sizeof_route_node(parent->u.route.key.size));
- erts_schedule_db_free(&tb->common,
- do_free_base_node,
- thiz,
- &thiz->u.base.free_item,
- sizeof_base_node(thiz->u.base.lc_key.size));
- erts_schedule_db_free(&tb->common,
- do_free_base_node,
- neighbor,
- &neighbor->u.base.free_item,
- sizeof_base_node(neighbor->u.base.lc_key.size));
-
- *result_parent_wb = neighbor_parent;
- return new_neighbor;
-}
-
-/*
- * Used to merge together all base nodes for operations such as last
- * and select_*. Returns the base node resulting from the merge in
- * locked state.
- */
-static DbTableCATreeNode *
-merge_to_one_locked_base_node(DbTableCATree* tb)
-{
- DbTableCATreeNode *parent;
- DbTableCATreeNode *new_parent;
- DbTableCATreeNode *base;
- DbTableCATreeNode *new_base;
- int is_not_valid;
- /* Find first base node */
- do {
- parent = NULL;
- base = GET_ROOT_ACQB(tb);
- while ( ! base->is_base_node ) {
- parent = base;
- base = GET_LEFT_ACQB(base);
- }
- wlock_base_node_no_stats(&(base->u.base));
- is_not_valid = ! base->u.base.is_valid;
- if (is_not_valid) {
- wunlock_base_node(&(base->u.base));
- }
- } while(is_not_valid);
- do {
- new_base = erl_db_catree_force_join_right(tb,
- parent,
- base,
- &new_parent);
- if (new_base != NULL) {
- base = new_base;
- parent = new_parent;
- }
- } while(new_base != NULL);
- return base;
-}
-
-
static void join_catree(DbTableCATree *tb,
- DbTableCATreeNode *parent,
- DbTableCATreeNode *thiz)
+ DbTableCATreeNode *thiz,
+ DbTableCATreeNode *parent)
{
DbTableCATreeNode *gparent;
DbTableCATreeNode *neighbor;
@@ -1369,7 +1093,7 @@ static void join_catree(DbTableCATree *tb,
ASSERT(thiz->is_base_node);
if (parent == NULL) {
thiz->u.base.lock_statistics = 0;
- wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(thiz);
return;
}
ASSERT(!parent->is_base_node);
@@ -1378,12 +1102,12 @@ static void join_catree(DbTableCATree *tb,
if (try_wlock_base_node(&neighbor->u.base)) {
/* Failed to acquire lock */
thiz->u.base.lock_statistics = 0;
- wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(thiz);
return;
} else if (!neighbor->u.base.is_valid) {
thiz->u.base.lock_statistics = 0;
- wunlock_base_node(&thiz->u.base);
- wunlock_base_node(&neighbor->u.base);
+ wunlock_base_node(thiz);
+ wunlock_base_node(neighbor);
return;
} else {
lock_route_node(parent);
@@ -1414,8 +1138,7 @@ static void join_catree(DbTableCATree *tb,
{
TreeDbTerm* new_root = join_trees(thiz->u.base.root,
neighbor->u.base.root);
- new_neighbor = create_base_node(tb, new_root,
- LC_ORDER(thiz->u.base.lc_key.term));
+ new_neighbor = create_base_node(tb, new_root);
}
if (GET_RIGHT(parent) == neighbor) {
neighbor_parent = gparent;
@@ -1429,12 +1152,12 @@ static void join_catree(DbTableCATree *tb,
if (try_wlock_base_node(&neighbor->u.base)) {
/* Failed to acquire lock */
thiz->u.base.lock_statistics = 0;
- wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(thiz);
return;
} else if (!neighbor->u.base.is_valid) {
thiz->u.base.lock_statistics = 0;
- wunlock_base_node(&thiz->u.base);
- wunlock_base_node(&neighbor->u.base);
+ wunlock_base_node(thiz);
+ wunlock_base_node(neighbor);
return;
} else {
lock_route_node(parent);
@@ -1467,8 +1190,7 @@ static void join_catree(DbTableCATree *tb,
{
TreeDbTerm* new_root = join_trees(neighbor->u.base.root,
thiz->u.base.root);
- new_neighbor = create_base_node(tb, new_root,
- LC_ORDER(thiz->u.base.lc_key.term));
+ new_neighbor = create_base_node(tb, new_root);
}
if (GET_LEFT(parent) == neighbor) {
neighbor_parent = gparent;
@@ -1486,8 +1208,8 @@ static void join_catree(DbTableCATree *tb,
} else {
SET_RIGHT_RELB(neighbor_parent, new_neighbor);
}
- wunlock_base_node(&thiz->u.base);
- wunlock_base_node(&neighbor->u.base);
+ wunlock_base_node(thiz);
+ wunlock_base_node(neighbor);
/* Free the parent and base */
erts_schedule_db_free(&tb->common,
do_free_route_node,
@@ -1498,25 +1220,27 @@ static void join_catree(DbTableCATree *tb,
do_free_base_node,
thiz,
&thiz->u.base.free_item,
- sizeof_base_node(thiz->u.base.lc_key.size));
+ sizeof_base_node());
erts_schedule_db_free(&tb->common,
do_free_base_node,
neighbor,
&neighbor->u.base.free_item,
- sizeof_base_node(neighbor->u.base.lc_key.size));
+ sizeof_base_node());
}
static void split_catree(DbTableCATree *tb,
- DbTableCATreeNode* ERTS_RESTRICT parent,
- DbTableCATreeNode* ERTS_RESTRICT base) {
+ DbTableCATreeNode* ERTS_RESTRICT base,
+ DbTableCATreeNode* ERTS_RESTRICT parent)
+{
TreeDbTerm *splitOutWriteBack;
DbTableCATreeNode* ERTS_RESTRICT new_left;
DbTableCATreeNode* ERTS_RESTRICT new_right;
DbTableCATreeNode* ERTS_RESTRICT new_route;
if (less_than_two_elements(base->u.base.root)) {
- base->u.base.lock_statistics = 0;
- wunlock_base_node(&base->u.base);
+ if (!(tb->common.status & DB_CATREE_FORCE_SPLIT))
+ base->u.base.lock_statistics = 0;
+ wunlock_base_node(base);
return;
} else {
TreeDbTerm *left_tree;
@@ -1525,10 +1249,8 @@ static void split_catree(DbTableCATree *tb,
split_tree(tb, base->u.base.root, &splitOutWriteBack,
&left_tree, &right_tree);
- new_left = create_base_node(tb, left_tree,
- LC_ORDER(GETKEY(tb, left_tree->dbterm.tpl)));
- new_right = create_base_node(tb, right_tree,
- LC_ORDER(GETKEY(tb, right_tree->dbterm.tpl)));
+ new_left = create_base_node(tb, left_tree);
+ new_right = create_base_node(tb, right_tree);
new_route = create_route_node(tb,
new_left,
new_right,
@@ -1542,12 +1264,12 @@ static void split_catree(DbTableCATree *tb,
SET_RIGHT_RELB(parent, new_route);
}
base->u.base.is_valid = 0;
- wunlock_base_node(&base->u.base);
+ wunlock_base_node(base);
erts_schedule_db_free(&tb->common,
do_free_base_node,
base,
&base->u.base.free_item,
- sizeof_base_node(base->u.base.lc_key.size));
+ sizeof_base_node());
}
}
@@ -1675,7 +1397,9 @@ void db_initialize_catree(void)
int db_create_catree(Process *p, DbTable *tbl)
{
DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *root = create_base_node(tb, NULL, NIL);
+ DbTableCATreeNode *root;
+
+ root = create_base_node(tb, NULL);
tb->deletion = 0;
tb->base_nodes_to_free_list = NULL;
erts_atomic_init_relb(&(tb->root), (erts_aint_t)root);
@@ -1684,162 +1408,416 @@ int db_create_catree(Process *p, DbTable *tbl)
static int db_first_catree(Process *p, DbTable *tbl, Eterm *ret)
{
- DbTableCATreeBaseNode *base_node;
+ TreeDbTerm *root;
+ CATreeRootIterator iter;
int result;
- DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS;
- /* Find first base node */
- base_node = &(lock_first_base_node(tbl, &search_stack, &locked_base_nodes_stack)->u.base);
- /* Find next base node until non-empty base node is found */
- while (base_node != NULL && base_node->root == NULL) {
- base_node = find_and_lock_next_base_node_and_path(tbl, &search_stack_ptr, &search_stack_copy_ptr, locked_base_nodes_stack_ptr);
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ root = *catree_find_first_root(&iter);
+ if (!root) {
+ TreeDbTerm **pp = catree_find_next_root(&iter, NULL);
+ root = pp ? *pp : NULL;
}
- /* Get the return value */
- result = db_first_tree_common(p, tbl, (base_node == NULL ? NULL : base_node->root), ret, NULL);
- /* Unlock base nodes */
- unlock_and_release_locked_base_node_stack(tbl, locked_base_nodes_stack_ptr);
+
+ result = db_first_tree_common(p, tbl, root, ret, NULL);
+
+ destroy_root_iterator(&iter);
return result;
}
static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
- DbTreeStack next_search_stack;
- TreeDbTerm *next_search_stack_array[STACK_NEED];
- DbTableCATreeBaseNode *base_node;
- int result = 0;
- DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS;
- init_tree_stack(&next_search_stack, next_search_stack_array, 0);
- /* Lock base node with key */
- base_node = lock_base_node_with_key(tbl, key, &search_stack, &locked_base_nodes_stack);
- /* Continue until key is not >= greatest key in base_node */
- while (base_node != NULL) {
- result = db_next_tree_common(p, tbl, base_node->root, key,
- ret, &next_search_stack);
- if (result != DB_ERROR_NONE || *ret != am_EOT) {
+ DbTreeStack stack;
+ TreeDbTerm * stack_array[STACK_NEED];
+ TreeDbTerm **rootp;
+ CATreeRootIterator iter;
+ int result;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ iter.next_route_key = key;
+ rootp = catree_find_next_root(&iter, NULL);
+
+ do {
+ init_tree_stack(&stack, stack_array, 0);
+ result = db_next_tree_common(p, tbl, (rootp ? *rootp : NULL), key, ret, &stack);
+ if (result != DB_ERROR_NONE || *ret != am_EOT)
break;
- }
- base_node =
- find_and_lock_next_base_node_and_path(tbl,
- &search_stack_ptr,
- &search_stack_copy_ptr,
- locked_base_nodes_stack_ptr);
- }
- unlock_and_release_locked_base_node_stack(tbl, &locked_base_nodes_stack);
+
+ rootp = catree_find_next_root(&iter, NULL);
+ } while (rootp);
+
+ destroy_root_iterator(&iter);
return result;
}
static int db_last_catree(Process *p, DbTable *tbl, Eterm *ret)
{
- DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree);
- int result = db_last_tree_common(p, tbl, base->u.base.root, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ TreeDbTerm *root;
+ CATreeRootIterator iter;
+ int result;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ root = *catree_find_last_root(&iter);
+ if (!root) {
+ TreeDbTerm **pp = catree_find_prev_root(&iter, NULL);
+ root = pp ? *pp : NULL;
+ }
+
+ result = db_last_tree_common(p, tbl, root, ret, NULL);
+
+ destroy_root_iterator(&iter);
return result;
-
}
static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
+ TreeDbTerm **rootp;
+ CATreeRootIterator iter;
int result;
- DbTableCATreeNode *base;
- init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_prev_tree_common(p, tbl, base->u.base.root, key, ret, &stack);
- wunlock_base_node(&(base->u.base));
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ iter.next_route_key = key;
+ rootp = catree_find_prev_root(&iter, NULL);
+
+ do {
+ init_tree_stack(&stack, stack_array, 0);
+ result = db_prev_tree_common(p, tbl, (rootp ? *rootp : NULL), key, ret,
+ &stack);
+ if (result != DB_ERROR_NONE || *ret != am_EOT)
+ break;
+ rootp = catree_find_prev_root(&iter, NULL);
+ } while (rootp);
+
+ destroy_root_iterator(&iter);
return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_PUT_PARAMS Eterm obj, int key_clash_fail
-ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(put,
- ERL_DB_CATREE_DO_OPERATION_PUT_PARAMS,
- db_put_tree_common(&tb->common,
- &base_node->root,
- obj,
- key_clash_fail,
- NULL);)
-
static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail)
{
DbTableCATree *tb = &tbl->catree;
Eterm key = GETKEY(&tb->common, tuple_val(obj));
- return erl_db_catree_do_operation_put(tb,
- key,
- obj,
- key_clash_fail);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_put_tree_common(&tb->common, &node->u.base.root, obj,
+ key_clash_fail, NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_GET_PARAMS Process *p, Eterm *ret
-ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(get,
- ERL_DB_CATREE_DO_OPERATION_GET_PARAMS,
- db_get_tree_common(p, &tb->common,
- base_node->root,
- key, ret, NULL);)
-
static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_get(tb, key, p, ret);
+ DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
+ int result = db_get_tree_common(p, &tb->common,
+ node->u.base.root,
+ key, ret, NULL);
+ runlock_base_node(node);
+ return result;
+}
+
+TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator* iter)
+{
+ FindBaseNode fbn;
+ DbTableCATreeNode* base_node;
+
+ while (1) {
+ base_node = find_base_node(iter->tb, key, &fbn);
+ lock_iter_base_node(iter, base_node, fbn.parent, fbn.current_level);
+ if (base_node->u.base.is_valid)
+ break;
+ unlock_iter_base_node(iter);
+ }
+ return &base_node->u.base.root;
+}
+
+
+TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key,
+ CATreeRootIterator* iter)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode *parent;
+ DbTableCATreeNode* next_route_node;
+ int current_level;
+
+ ASSERT(!iter->locked_bnode);
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ parent = NULL;
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ current_level++;
+ parent = node;
+ if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) <= 0) {
+ next_route_node = node;
+ node = GET_LEFT_ACQB(node);
+ } else {
+ node = GET_RIGHT_ACQB(node);
+ }
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, node, parent, current_level);
+ if (node->u.base.is_valid) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ return &node->u.base.root;
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+static Eterm copy_iter_search_key(CATreeRootIterator* iter, Eterm key)
+{
+ Uint key_size;
+
+ if (is_immed(key))
+ return key;
+
+ if (iter->search_key) {
+ ASSERT(key != iter->search_key->term);
+ destroy_route_key(iter->search_key);
+ }
+ key_size = size_object(key);
+ if (!iter->search_key || key_size > iter->search_key->size) {
+ iter->search_key = erts_realloc(ERTS_ALC_T_DB_TMP,
+ iter->search_key,
+ (offsetof(DbRouteKey, heap)
+ + key_size*sizeof(Eterm)));
+ }
+ return copy_route_key(iter->search_key, key, key_size);
}
-#define ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS Eterm *ret
-ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(member,
- ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS,
- db_member_tree_common(&tb->common,
- base_node->root,
- key,
- ret,
- NULL);)
+TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter, int next,
+ Eterm *keyp)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_invalid = NULL;
+ DbTableCATreeNode *rejected_empty = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode *parent;
+ DbTableCATreeNode* next_route_node;
+ Eterm key = iter->next_route_key;
+ int current_level;
+
+ if (iter->locked_bnode) {
+ if (keyp)
+ *keyp = copy_iter_search_key(iter, *keyp);
+ unlock_iter_base_node(iter);
+ }
+
+ if (is_non_value(key))
+ return NULL;
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ parent = NULL;
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ current_level++;
+ parent = node;
+ if (next) {
+ if (cmp_key_route(key,node) < 0) {
+ next_route_node = node;
+ node = GET_LEFT_ACQB(node);
+ } else {
+ node = GET_RIGHT_ACQB(node);
+ }
+ }
+ else {
+ if (cmp_key_route(key,node) > 0) {
+ next_route_node = node;
+ node = GET_RIGHT_ACQB(node);
+ } else {
+ node = GET_LEFT_ACQB(node);
+ }
+ }
+ }
+ ASSERT(node != rejected_invalid);
+ lock_iter_base_node(iter, node, parent, current_level);
+ if (node->u.base.is_valid) {
+ ASSERT(node != rejected_empty);
+ if (node->u.base.root) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ iter->locked_bnode = node;
+ return &node->u.base.root;
+ }
+ if (!next_route_node) {
+ unlock_iter_base_node(iter);
+ return NULL;
+ }
+ key = next_route_node->u.route.key.term;
+ IF_DEBUG(rejected_empty = node);
+ }
+ else
+ IF_DEBUG(rejected_invalid = node);
+
+ /* Retry */
+ unlock_iter_base_node(iter);
+ }
+}
+
+TreeDbTerm** catree_find_next_root(CATreeRootIterator *iter, Eterm* keyp)
+{
+ return catree_find_nextprev_root(iter, 1, keyp);
+}
+
+TreeDbTerm** catree_find_prev_root(CATreeRootIterator *iter, Eterm* keyp)
+{
+ return catree_find_nextprev_root(iter, 0, keyp);
+}
+
+
+TreeDbTerm** catree_find_next_from_pb_key_root(Eterm key,
+ CATreeRootIterator* iter)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode *parent;
+ DbTableCATreeNode* next_route_node;
+ int current_level;
+
+ ASSERT(!iter->locked_bnode);
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ parent = NULL;
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ current_level++;
+ parent = node;
+ if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) >= 0) {
+ next_route_node = node;
+ node = GET_RIGHT_ACQB(node);
+ } else {
+ node = GET_LEFT_ACQB(node);
+ }
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, node, parent, current_level);
+ if (node->u.base.is_valid) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ return &node->u.base.root;
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+static TreeDbTerm** catree_find_firstlast_root(CATreeRootIterator* iter,
+ int first)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode* next_route_node;
+ int current_level;
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ current_level++;
+ next_route_node = node;
+ node = first ? GET_LEFT_ACQB(node) : GET_RIGHT_ACQB(node);
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, node, next_route_node, current_level);
+ if (node->u.base.is_valid) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ return &node->u.base.root;
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+TreeDbTerm** catree_find_first_root(CATreeRootIterator* iter)
+{
+ return catree_find_firstlast_root(iter, 1);
+}
+
+TreeDbTerm** catree_find_last_root(CATreeRootIterator* iter)
+{
+ return catree_find_firstlast_root(iter, 0);
+}
static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_member(tb, key, ret);
+ DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
+ int result = db_member_tree_common(&tb->common,
+ node->u.base.root,
+ key, ret, NULL);
+ runlock_base_node(node);
+ return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_GET_ELEMENT_PARAMS Process *p, int ndex, Eterm *ret
-ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(get_element,
- ERL_DB_CATREE_DO_OPERATION_GET_ELEMENT_PARAMS,
- db_get_element_tree_common(p, &tb->common,
- base_node->root,
- key, ndex,
- ret, NULL))
-
static int db_get_element_catree(Process *p, DbTable *tbl,
Eterm key, int ndex, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_get_element(tb, key, p, ndex, ret);
+ DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
+ int result = db_get_element_tree_common(p, &tb->common,
+ node->u.base.root,
+ key, ndex, ret, NULL);
+ runlock_base_node(node);
+ return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_ERASE_PARAMS DbTable *tbl, Eterm *ret
-ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(erase,
- ERL_DB_CATREE_DO_OPERATION_ERASE_PARAMS,
- db_erase_tree_common(tbl,
- &base_node->root,
- key,
- ret,
- NULL);)
-
static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_erase(tb, key, tbl, ret);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_erase_tree_common(tbl, &node->u.base.root, key,
+ ret, NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_ERASE_OBJECT_PARAMS DbTable *tbl, Eterm object, Eterm *ret
-ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(erase_object,
- ERL_DB_CATREE_DO_OPERATION_ERASE_OBJECT_PARAMS,
- db_erase_object_tree_common(tbl,
- &base_node->root,
- object,
- ret,
- NULL);)
-
static int db_erase_object_catree(DbTable *tbl, Eterm object, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
Eterm key = GETKEY(&tb->common, tuple_val(object));
- return erl_db_catree_do_operation_erase_object(tb, key, tbl, object, ret);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_erase_object_tree_common(tbl,
+ &node->u.base.root,
+ object,
+ ret,
+ NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
}
@@ -1847,11 +1825,12 @@ static int db_slot_catree(Process *p, DbTable *tbl,
Eterm slot_term, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_slot_tree_common(p, tbl, base->u.base.root,
- slot_term, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_slot_tree_common(p, tbl, *catree_find_first_root(&iter),
+ slot_term, ret, NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1861,25 +1840,25 @@ static int db_select_continue_catree(Process *p,
Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_continue_tree_common(p, &tbl->common, &base->u.base.root,
- continuation, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_continue_tree_common(p, &tbl->common,
+ continuation, ret, NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
-
static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, int reverse, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_tree_common(p, &tbl->common, &base->u.base.root,
- tid, pattern, reverse, ret,
- NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_tree_common(p, tbl, tid, pattern, reverse, ret,
+ NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1889,12 +1868,13 @@ static int db_select_count_continue_catree(Process *p,
Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_count_continue_tree_common(p, &tbl->common,
- &base->u.base.root,
- continuation, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_count_continue_tree_common(p, tbl,
+ continuation, ret, NULL,
+ &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1902,11 +1882,12 @@ static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_count_tree_common(p, &tbl->common, &base->u.base.root,
- tid, pattern, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_count_tree_common(p, tbl,
+ tid, pattern, ret, NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1915,11 +1896,13 @@ static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
int reversed, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_chunk_tree_common(p, &tbl->common, &base->u.base.root,
- tid, pattern, chunk_size, reversed, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_chunk_tree_common(p, tbl,
+ tid, pattern, chunk_size, reversed, ret,
+ NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1931,12 +1914,13 @@ static int db_select_delete_continue_catree(Process *p,
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
int result;
- DbTableCATreeNode *base;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_delete_continue_tree_common(p, tbl, &base->u.base.root,
- continuation, ret, &stack);
- wunlock_base_node(&(base->u.base));
+ result = db_select_delete_continue_tree_common(p, tbl, continuation, ret,
+ &stack, &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1946,12 +1930,14 @@ static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
int result;
- DbTableCATreeNode *base;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_delete_tree_common(p, tbl, &base->u.base.root,
- tid, pattern, ret, &stack);
- wunlock_base_node(&(base->u.base));
+ result = db_select_delete_tree_common(p, tbl,
+ tid, pattern, ret, &stack,
+ &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1959,12 +1945,12 @@ static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_replace_tree_common(p, &tbl->common,
- &base->u.base.root,
- tid, pattern, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
+ result = db_select_replace_tree_common(p, tbl,
+ tid, pattern, ret, NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
@@ -1972,26 +1958,24 @@ static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret)
{
int result;
- DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tbl->catree);
- result = db_select_replace_continue_tree_common(p, &tbl->common,
- &base->u.base.root,
- continuation, ret, NULL);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
+ result = db_select_replace_continue_tree_common(p, tbl, continuation, ret,
+ NULL, &iter);
+ destroy_root_iterator(&iter);
return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_TAKE_PARAMS Process *p, Eterm *ret, DbTable *tbl
-ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(take,
- ERL_DB_CATREE_DO_OPERATION_TAKE_PARAMS,
- db_take_tree_common(p, tbl,
- &base_node->root,
- key, ret, NULL);)
-
static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_take(tb, key, p, ret, tbl);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_take_tree_common(p, tbl, &node->u.base.root, key,
+ ret, NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
}
/*
@@ -2003,9 +1987,16 @@ static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
static void db_print_catree(fmtfn_t to, void *to_arg,
int show, DbTable *tbl)
{
- DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree);
- db_print_tree_common(to, to_arg, show, base->u.base.root, tbl);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+ TreeDbTerm** root;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ root = catree_find_first_root(&iter);
+ do {
+ db_print_tree_common(to, to_arg, show, *root, tbl);
+ root = catree_find_next_root(&iter, NULL);
+ } while (root);
+ destroy_root_iterator(&iter);
}
/* Release all memory occupied by a single table */
@@ -2081,25 +2072,33 @@ static void db_foreach_offheap_catree(DbTable *tbl,
void (*func)(ErlOffHeap *, void *),
void *arg)
{
- DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree);
- db_foreach_offheap_tree_common(base->u.base.root, func, arg);
- wunlock_base_node(&(base->u.base));
+ CATreeRootIterator iter;
+ TreeDbTerm** root;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ root = catree_find_first_root(&iter);
+ do {
+ db_foreach_offheap_tree_common(*root, func, arg);
+ root = catree_find_next_root(&iter, NULL);
+ } while (root);
+ destroy_root_iterator(&iter);
}
static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
DbUpdateHandle *handle)
{
DbTableCATree *tb = &tbl->catree;
- int res;
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node);
- res = db_lookup_dbterm_tree_common(p, tbl, &base_node->root, key, obj, handle, NULL);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int res = db_lookup_dbterm_tree_common(p, tbl, &node->u.base.root, key,
+ obj, handle, NULL);
if (res == 0) {
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART;
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
} else {
/* db_finalize_dbterm_catree will unlock */
- handle->lck = prev_node;
- handle->lck2 = current_node;
- handle->current_level = current_level;
+ handle->u.catree.base_node = node;
+ handle->u.catree.parent = fbn.parent;
+ handle->u.catree.current_level = fbn.current_level;
}
return res;
}
@@ -2107,12 +2106,10 @@ static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm ob
static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *handle)
{
DbTableCATree *tb = &(handle->tb->catree);
- DbTableCATreeNode *prev_node = handle->lck;
- DbTableCATreeNode *current_node = handle->lck2;
- int current_level = handle->current_level;
- DbTableCATreeBaseNode *base_node = &current_node->u.base;
db_finalize_dbterm_tree_common(cret, handle, NULL);
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART;
+ wunlock_adapt_base_node(tb, handle->u.catree.base_node,
+ handle->u.catree.parent,
+ handle->u.catree.current_level);
return;
}
@@ -2146,6 +2143,61 @@ void erts_lcnt_enable_db_catree_lock_count(DbTableCATree *tb, int enable)
}
#endif /* ERTS_ENABLE_LOCK_COUNT */
+void db_catree_force_split(DbTableCATree* tb, int on)
+{
+ CATreeRootIterator iter;
+ TreeDbTerm** root;
+
+ init_root_iterator(tb, &iter, 1);
+ root = catree_find_first_root(&iter);
+ do {
+ iter.locked_bnode->u.base.lock_statistics = (on ? INT_MAX : 0);
+ root = catree_find_next_root(&iter, NULL);
+ } while (root);
+ destroy_root_iterator(&iter);
+
+ if (on)
+ tb->common.status |= DB_CATREE_FORCE_SPLIT;
+ else
+ tb->common.status &= ~DB_CATREE_FORCE_SPLIT;
+}
+
+void db_calc_stats_catree(DbTableCATree* tb, DbCATreeStats* stats)
+{
+ DbTableCATreeNode* stack[ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT];
+ DbTableCATreeNode* node;
+ Uint depth = 0;
+
+ stats->route_nodes = 0;
+ stats->base_nodes = 0;
+ stats->max_depth = 0;
+
+ node = GET_ROOT(tb);
+ do {
+ while (!node->is_base_node) {
+ stats->route_nodes++;
+ ASSERT(depth < sizeof(stack)/sizeof(*stack));
+ stack[depth++] = node; /* PUSH parent */
+ if (stats->max_depth < depth)
+ stats->max_depth = depth;
+ node = GET_LEFT(node);
+ }
+ stats->base_nodes++;
+
+ while (depth > 0) {
+ DbTableCATreeNode* parent = stack[depth-1];
+ if (node == GET_LEFT(parent)) {
+ node = GET_RIGHT(parent);
+ break;
+ }
+ else {
+ ASSERT(node == GET_RIGHT(parent));
+ node = parent;
+ depth--; /* POP parent */
+ }
+ }
+ } while (depth > 0);
+}
#ifdef HARDDEBUG
diff --git a/erts/emulator/beam/erl_db_catree.h b/erts/emulator/beam/erl_db_catree.h
index f9c0784289..418837be8e 100644
--- a/erts/emulator/beam/erl_db_catree.h
+++ b/erts/emulator/beam/erl_db_catree.h
@@ -48,9 +48,6 @@ typedef struct {
ErtsThrPrgrLaterOp free_item; /* Used when freeing using thread progress */
struct DbTableCATreeNode * next; /* Used when gradually deleting */
-#ifdef ERTS_ENABLE_LOCK_CHECK
- DbRouteKey lc_key;
-#endif
char end_of_struct__;
} DbTableCATreeBaseNode;
@@ -92,13 +89,45 @@ typedef struct db_table_catree {
int is_routing_nodes_freed;
} DbTableCATree;
+typedef struct {
+ DbTableCATree* tb;
+ Eterm next_route_key;
+ DbTableCATreeNode* locked_bnode;
+ DbTableCATreeNode* bnode_parent;
+ int bnode_level;
+ int read_only;
+ DbRouteKey* search_key;
+} CATreeRootIterator;
+
+
void db_initialize_catree(void);
int db_create_catree(Process *p, DbTable *tbl);
+TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator*);
+
+TreeDbTerm** catree_find_next_from_pb_key_root(Eterm key, CATreeRootIterator*);
+TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key, CATreeRootIterator*);
+TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator*, int next, Eterm* keyp);
+TreeDbTerm** catree_find_next_root(CATreeRootIterator*, Eterm* keyp);
+TreeDbTerm** catree_find_prev_root(CATreeRootIterator*, Eterm* keyp);
+TreeDbTerm** catree_find_first_root(CATreeRootIterator*);
+TreeDbTerm** catree_find_last_root(CATreeRootIterator*);
+
+
#ifdef ERTS_ENABLE_LOCK_COUNT
void erts_lcnt_enable_db_catree_lock_count(DbTableCATree *tb, int enable);
#endif
+void db_catree_force_split(DbTableCATree*, int on);
+
+typedef struct {
+ Uint route_nodes;
+ Uint base_nodes;
+ Uint max_depth;
+} DbCATreeStats;
+void db_calc_stats_catree(DbTableCATree*, DbCATreeStats*);
+
+
#endif /* _DB_CATREE_H */
diff --git a/erts/emulator/beam/erl_db_hash.c b/erts/emulator/beam/erl_db_hash.c
index 61806876a7..f05a3b51c9 100644
--- a/erts/emulator/beam/erl_db_hash.c
+++ b/erts/emulator/beam/erl_db_hash.c
@@ -3103,7 +3103,7 @@ Ldone:
handle->dbterm = &b->dbterm;
handle->flags = flags;
handle->new_size = b->dbterm.size;
- handle->lck = lck;
+ handle->u.hash.lck = lck;
return 1;
}
@@ -3116,7 +3116,7 @@ db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle)
DbTableHash *tb = &tbl->hash;
HashDbTerm **bp = (HashDbTerm **) handle->bp;
HashDbTerm *b = *bp;
- erts_rwmtx_t* lck = (erts_rwmtx_t*) handle->lck;
+ erts_rwmtx_t* lck = handle->u.hash.lck;
HashDbTerm* free_me = NULL;
ERTS_LC_ASSERT(IS_HASH_WLOCKED(tb, lck)); /* locked by db_lookup_dbterm_hash */
diff --git a/erts/emulator/beam/erl_db_tree.c b/erts/emulator/beam/erl_db_tree.c
index c6f1a0fc6d..f5fac9dcb6 100644
--- a/erts/emulator/beam/erl_db_tree.c
+++ b/erts/emulator/beam/erl_db_tree.c
@@ -70,8 +70,10 @@
*/
ERTS_INLINE static DbTreeStack* get_static_stack(DbTableTree* tb)
{
- if (tb != NULL && !erts_atomic_xchg_acqb(&tb->is_stack_busy, 1)) {
- return &tb->static_stack;
+ if (tb != NULL) {
+ ASSERT(IS_TREE_TABLE(tb->common.type));
+ if (!erts_atomic_xchg_acqb(&tb->is_stack_busy, 1))
+ return &tb->static_stack;
}
return NULL;
}
@@ -82,9 +84,10 @@ ERTS_INLINE static DbTreeStack* get_static_stack(DbTableTree* tb)
static DbTreeStack* get_any_stack(DbTable* tb, DbTableTree* stack_container)
{
DbTreeStack* stack;
- if (stack_container != NULL &&
- !erts_atomic_xchg_acqb(&stack_container->is_stack_busy, 1)) {
- return &stack_container->static_stack;
+ if (stack_container != NULL) {
+ ASSERT(IS_TREE_TABLE(stack_container->common.type));
+ if (!erts_atomic_xchg_acqb(&stack_container->is_stack_busy, 1))
+ return &stack_container->static_stack;
}
stack = erts_db_alloc(ERTS_ALC_T_DB_STK, tb,
sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
@@ -96,14 +99,16 @@ static DbTreeStack* get_any_stack(DbTable* tb, DbTableTree* stack_container)
static void release_stack(DbTable* tb, DbTableTree* stack_container, DbTreeStack* stack)
{
- if (stack_container != NULL && stack == &stack_container->static_stack) {
- ASSERT(erts_atomic_read_nob(&stack_container->is_stack_busy) == 1);
- erts_atomic_set_relb(&stack_container->is_stack_busy, 0);
- }
- else {
- erts_db_free(ERTS_ALC_T_DB_STK, tb,
- (void *) stack, sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
+ if (stack_container != NULL) {
+ ASSERT(IS_TREE_TABLE(stack_container->common.type));
+ if (stack == &stack_container->static_stack) {
+ ASSERT(erts_atomic_read_nob(&stack_container->is_stack_busy) == 1);
+ erts_atomic_set_relb(&stack_container->is_stack_busy, 0);
+ return;
+ }
}
+ erts_db_free(ERTS_ALC_T_DB_STK, tb,
+ (void *) stack, sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
}
static ERTS_INLINE void reset_stack(DbTreeStack* stack)
@@ -117,6 +122,7 @@ static ERTS_INLINE void reset_stack(DbTreeStack* stack)
static ERTS_INLINE void reset_static_stack(DbTableTree* tb)
{
if (tb != NULL) {
+ ASSERT(IS_TREE_TABLE(tb->common.type));
reset_stack(&tb->static_stack);
}
}
@@ -185,31 +191,37 @@ static void do_dump_tree2(DbTableTree*, int to, void *to_arg, int show,
** Datatypes
*/
+enum ms_key_boundness {
+ /* Order significant, larger means more "boundness" => less iteration */
+ MS_KEY_UNBOUND = 0,
+ MS_KEY_PARTIALLY_BOUND = 1,
+ MS_KEY_BOUND = 2,
+ MS_KEY_IMPOSSIBLE = 3
+};
+
/*
* This structure is filled in by analyze_pattern() for the select
* functions.
*/
struct mp_info {
- int something_can_match; /* The match_spec is not "impossible" */
- int some_limitation; /* There is some limitation on the search
- * area, i. e. least and/or most is set.*/
- int got_partial; /* The limitation has a partially bound
- * key */
+ enum ms_key_boundness key_boundness;
Eterm least; /* The lowest matching key (possibly
* partially bound expression) */
Eterm most; /* The highest matching key (possibly
* partially bound expression) */
-
- TreeDbTerm **save_term; /* If the key is completely bound, this
- * will be the Tree node we're searching
- * for, otherwise it will be useless */
Binary *mp; /* The compiled match program */
};
+struct select_common {
+ TreeDbTerm **root;
+};
+
+
/*
* Used by doit_select(_chunk)
*/
struct select_context {
+ struct select_common common;
Process *p;
Eterm accum;
Binary *mp;
@@ -225,6 +237,7 @@ struct select_context {
* Used by doit_select_count
*/
struct select_count_context {
+ struct select_common common;
Process *p;
Binary *mp;
Eterm end_condition;
@@ -238,9 +251,9 @@ struct select_count_context {
* Used by doit_select_delete
*/
struct select_delete_context {
+ struct select_common common;
Process *p;
DbTableCommon *tb;
- TreeDbTerm **root;
DbTreeStack *stack;
Uint accum;
Binary *mp;
@@ -255,9 +268,9 @@ struct select_delete_context {
* Used by doit_select_replace
*/
struct select_replace_context {
+ struct select_common common;
Process *p;
DbTableCommon *tb;
- TreeDbTerm **root;
Binary *mp;
Eterm end_condition;
Eterm *lastobj;
@@ -282,7 +295,8 @@ int tree_balance_left(TreeDbTerm **this);
int tree_balance_right(TreeDbTerm **this);
static int delsub(TreeDbTerm **this);
static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root, Sint slot,
- DbTable *tb, DbTableTree *stack_container);
+ DbTable *tb, DbTableTree *stack_container,
+ CATreeRootIterator *iter);
static TreeDbTerm *find_node(DbTableCommon *tb, TreeDbTerm *root,
Eterm key, DbTableTree *stack_container);
static TreeDbTerm **find_node2(DbTableCommon *tb, TreeDbTerm **root, Eterm key);
@@ -292,64 +306,62 @@ static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root,
DbTreeStack* stack, Eterm key);
static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root,
DbTreeStack* stack, Eterm key);
-static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
- DbTreeStack* stack, Eterm key);
-static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
- DbTreeStack* stack, Eterm key);
+static TreeDbTerm *find_next_from_pb_key(DbTable*, TreeDbTerm*** rootpp,
+ DbTreeStack* stack, Eterm key,
+ CATreeRootIterator*);
+static TreeDbTerm *find_prev_from_pb_key(DbTable*, TreeDbTerm*** rootpp,
+ DbTreeStack* stack, Eterm key,
+ CATreeRootIterator*);
+typedef int traverse_doit_funcT(DbTableCommon*, TreeDbTerm*,
+ struct select_common*, int forward);
+
static void traverse_backwards(DbTableCommon *tb,
- TreeDbTerm **root,
DbTreeStack*,
Eterm lastkey,
- int (*doit)(DbTableCommon *,
- TreeDbTerm *,
- void *,
- int),
- void *context);
+ traverse_doit_funcT*,
+ struct select_common *context,
+ CATreeRootIterator*);
static void traverse_forward(DbTableCommon *tb,
- TreeDbTerm **root,
DbTreeStack*,
Eterm lastkey,
- int (*doit)(DbTableCommon *tb,
- TreeDbTerm *,
- void *,
- int),
- void *context);
+ traverse_doit_funcT*,
+ struct select_common *context,
+ CATreeRootIterator*);
static void traverse_update_backwards(DbTableCommon *tb,
- TreeDbTerm **root,
DbTreeStack*,
Eterm lastkey,
int (*doit)(DbTableCommon *tb,
TreeDbTerm **, // out
- void *,
+ struct select_common*,
int),
- void *context);
-static int key_given(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
- TreeDbTerm ***ret, Eterm *partly_bound);
-static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key);
+ struct select_common*,
+ CATreeRootIterator*);
+static enum ms_key_boundness key_boundness(DbTableCommon *tb,
+ Eterm pattern, Eterm *keyp);
static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done);
-static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
+static int analyze_pattern(DbTableCommon *tb, Eterm pattern,
extra_match_validator_t extra_validator, /* Optional callback */
struct mp_info *mpi);
static int doit_select(DbTableCommon *tb,
- TreeDbTerm *this,
- void *ptr,
+ TreeDbTerm *this,
+ struct select_common* ptr,
int forward);
static int doit_select_count(DbTableCommon *tb,
TreeDbTerm *this,
- void *ptr,
+ struct select_common*,
int forward);
static int doit_select_chunk(DbTableCommon *tb,
TreeDbTerm *this,
- void *ptr,
+ struct select_common*,
int forward);
static int doit_select_delete(DbTableCommon *tb,
TreeDbTerm *this,
- void *ptr,
+ struct select_common*,
int forward);
static int doit_select_replace(DbTableCommon *tb,
TreeDbTerm **this_ptr,
- void *ptr,
+ struct select_common*,
int forward);
static int partly_bound_can_match_lesser(Eterm partly_bound_1,
@@ -536,7 +548,7 @@ int db_next_tree_common(Process *p, DbTable *tbl,
{
TreeDbTerm *this;
- if (is_atom(key) && key == am_EOT)
+ if (key == am_EOT)
return DB_ERROR_BADKEY;
this = find_next(&tbl->common, root, stack, key);
if (this == NULL) {
@@ -594,7 +606,7 @@ int db_prev_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, Eterm key,
{
TreeDbTerm *this;
- if (is_atom(key) && key == am_EOT)
+ if (key == am_EOT)
return DB_ERROR_BADKEY;
this = find_prev(&tbl->common, root, stack, key);
if (this == NULL) {
@@ -862,7 +874,8 @@ static int db_erase_object_tree(DbTable *tbl, Eterm object, Eterm *ret)
int db_slot_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
Eterm slot_term, Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator *iter)
{
Sint slot;
TreeDbTerm *st;
@@ -892,7 +905,7 @@ int db_slot_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
* are counted from 1 and up.
*/
++slot;
- st = slot_search(p, root, slot, tbl, stack_container);
+ st = slot_search(p, root, slot, tbl, stack_container, iter);
if (st == NULL) {
*ret = am_false;
return DB_ERROR_UNSPEC;
@@ -910,7 +923,7 @@ static int db_slot_tree(Process *p, DbTable *tbl,
Eterm slot_term, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- return db_slot_tree_common(p, tbl, tb->root, slot_term, ret, tb);
+ return db_slot_tree_common(p, tbl, tb->root, slot_term, ret, tb, NULL);
}
@@ -981,10 +994,10 @@ static BIF_RETTYPE bif_trap3(Export *bif,
int db_select_continue_tree_common(Process *p,
DbTableCommon *tb,
- TreeDbTerm **root,
Eterm continuation,
Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
DbTreeStack* stack;
struct select_context sc;
@@ -998,7 +1011,6 @@ int db_select_continue_tree_common(Process *p,
Sint chunk_size;
Sint reverse;
-
#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);
/* Decode continuation. We know it's a tuple but not the arity or
@@ -1032,23 +1044,32 @@ int db_select_continue_tree_common(Process *p,
reverse = unsigned_val(tptr[7]);
sc.got = signed_val(tptr[8]);
- stack = get_any_stack((DbTable*)tb, stack_container);
- if (chunk_size) {
- if (reverse) {
- traverse_backwards(tb, root, stack, lastkey, &doit_select_chunk, &sc);
- } else {
- traverse_forward(tb, root, stack, lastkey, &doit_select_chunk, &sc);
- }
- } else {
- if (reverse) {
- traverse_forward(tb, root, stack, lastkey, &doit_select, &sc);
- } else {
- traverse_backwards(tb, root, stack, lastkey, &doit_select, &sc);
- }
+ if (iter) {
+ iter->next_route_key = lastkey;
+ sc.common.root = catree_find_nextprev_root(iter, !!reverse != !!chunk_size, NULL);
}
- release_stack((DbTable*)tb,stack_container,stack);
+ else
+ sc.common.root = &((DbTableTree*)tb)->root;
+
+ if (sc.common.root) {
+ stack = get_any_stack((DbTable*)tb, stack_container);
+ if (chunk_size) {
+ if (reverse) {
+ traverse_backwards(tb, stack, lastkey, &doit_select_chunk, &sc.common, iter);
+ } else {
+ traverse_forward(tb, stack, lastkey, &doit_select_chunk, &sc.common, iter);
+ }
+ } else {
+ if (reverse) {
+ traverse_forward(tb, stack, lastkey, &doit_select, &sc.common, iter);
+ } else {
+ traverse_backwards(tb, stack, lastkey, &doit_select, &sc.common, iter);
+ }
+ }
+ release_stack((DbTable*)tb,stack_container,stack);
- BUMP_REDS(p, 1000 - sc.max);
+ BUMP_REDS(p, 1000 - sc.max);
+ }
if (sc.max > 0 || (chunk_size && sc.got == chunk_size)) {
if (chunk_size) {
@@ -1142,13 +1163,14 @@ static int db_select_continue_tree(Process *p,
Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- return db_select_continue_tree_common(p, &tb->common, &tb->root,
- continuation, ret, tb);
+ return db_select_continue_tree_common(p, &tb->common,
+ continuation, ret, tb, NULL);
}
-int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+int db_select_tree_common(Process *p, DbTable *tb,
Eterm tid, Eterm pattern, int reverse, Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
/* Strategy: Traverse backwards to build resulting list from tail to head */
DbTreeStack* stack;
@@ -1179,44 +1201,64 @@ int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
sc.p = p;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->keypos;
+ sc.keypos = tb->common.keypos;
sc.got = 0;
sc.chunk_size = 0;
- if ((errcode = analyze_pattern(tb, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
- if (!mpi.something_can_match) {
+ if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
RET_TO_BIF(NIL,DB_ERROR_NONE);
/* can't possibly match anything */
}
sc.mp = mpi.mp;
- if (!mpi.got_partial && mpi.some_limitation &&
- CMP_EQ(mpi.least,mpi.most)) {
- doit_select(tb,*(mpi.save_term),&sc,0 /* direction doesn't matter */);
+ if (mpi.key_boundness == MS_KEY_BOUND) {
+ ASSERT(CMP_EQ(mpi.least, mpi.most));
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &tb->tree.root;
+ this = find_node(&tb->common, *sc.common.root, mpi.least, NULL);
+ if (this)
+ doit_select(&tb->common, this, &sc.common, 0 /* direction doesn't matter */);
RET_TO_BIF(sc.accum,DB_ERROR_NONE);
}
stack = get_any_stack((DbTable*)tb,stack_container);
if (reverse) {
- if (mpi.some_limitation) {
- if ((this = find_prev_from_pb_key(tb, *root, stack, mpi.least)) != NULL) {
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_prev_from_pb_key(tb, &sc.common.root, stack, mpi.least, iter);
+ if (this)
lastkey = GETKEY(tb, this->dbterm.tpl);
- }
sc.end_condition = mpi.most;
}
- traverse_forward(tb, root, stack, lastkey, &doit_select, &sc);
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_first_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
+ traverse_forward(&tb->common, stack, lastkey, &doit_select, &sc.common, iter);
} else {
- if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(tb, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
- traverse_backwards(tb, root, stack, lastkey, &doit_select, &sc);
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
+ traverse_backwards(&tb->common, stack, lastkey, &doit_select, &sc.common, iter);
}
release_stack((DbTable*)tb,stack_container,stack);
#ifdef HARDDEBUG
@@ -1257,17 +1299,16 @@ int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
static int db_select_tree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, int reverse, Eterm *ret)
{
- DbTableTree *tb = &tbl->tree;
- return db_select_tree_common(p, &tb->common, &tb->root, tid,
- pattern, reverse, ret, tb);
+ return db_select_tree_common(p, tbl, tid,
+ pattern, reverse, ret, &tbl->tree, NULL);
}
int db_select_count_continue_tree_common(Process *p,
- DbTableCommon *tb,
- TreeDbTerm **root,
+ DbTable *tb,
Eterm continuation,
Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
DbTreeStack* stack;
struct select_count_context sc;
@@ -1280,7 +1321,6 @@ int db_select_count_continue_tree_common(Process *p,
Eterm *tptr;
Eterm egot;
-
#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);
/* Decode continuation. We know it's a tuple and everything else as
@@ -1305,18 +1345,28 @@ int db_select_count_continue_tree_common(Process *p,
sc.end_condition = NIL;
sc.lastobj = NULL;
sc.max = 1000;
- sc.keypos = tb->keypos;
+ sc.keypos = tb->common.keypos;
if (is_big(tptr[5])) {
sc.got = big_to_uint32(tptr[5]);
} else {
sc.got = unsigned_val(tptr[5]);
}
- stack = get_any_stack((DbTable*)tb, stack_container);
- traverse_backwards(tb, root, stack, lastkey, &doit_select_count, &sc);
- release_stack((DbTable*)tb,stack_container,stack);
+ if (iter) {
+ iter->next_route_key = lastkey;
+ sc.common.root = catree_find_prev_root(iter, NULL);
+ }
+ else {
+ sc.common.root = &tb->tree.root;
+ }
- BUMP_REDS(p, 1000 - sc.max);
+ if (sc.common.root) {
+ stack = get_any_stack(tb, stack_container);
+ traverse_backwards(&tb->common, stack, lastkey, &doit_select_count, &sc.common, iter);
+ release_stack(tb,stack_container,stack);
+
+ BUMP_REDS(p, 1000 - sc.max);
+ }
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.got,p), DB_ERROR_NONE);
@@ -1361,14 +1411,15 @@ static int db_select_count_continue_tree(Process *p,
Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- return db_select_count_continue_tree_common(p, &tb->common, &tb->root,
- continuation, ret, tb);
+ return db_select_count_continue_tree_common(p, tbl,
+ continuation, ret, tb, NULL);
}
-int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+int db_select_count_tree_common(Process *p, DbTable *tb,
Eterm tid, Eterm pattern, Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
DbTreeStack* stack;
struct select_count_context sc;
@@ -1383,7 +1434,6 @@ int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root
Eterm egot;
Eterm mpb;
-
#define RET_TO_BIF(Term,RetVal) do { \
if (mpi.mp != NULL) { \
erts_bin_free(mpi.mp); \
@@ -1398,35 +1448,48 @@ int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root
sc.p = p;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->keypos;
+ sc.keypos = tb->common.keypos;
sc.got = 0;
- if ((errcode = analyze_pattern(tb, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
- if (!mpi.something_can_match) {
+ if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
RET_TO_BIF(make_small(0),DB_ERROR_NONE);
/* can't possibly match anything */
}
sc.mp = mpi.mp;
- if (!mpi.got_partial && mpi.some_limitation &&
- CMP_EQ(mpi.least,mpi.most)) {
- doit_select_count(tb,*(mpi.save_term),&sc,0 /* dummy */);
+ if (mpi.key_boundness == MS_KEY_BOUND) {
+ ASSERT(CMP_EQ(mpi.least, mpi.most));
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &((DbTable*)tb)->tree.root;
+ this = find_node(&tb->common, *sc.common.root, mpi.least, NULL);
+ if (this)
+ doit_select_count(&tb->common, this, &sc.common, 0 /* dummy */);
RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE);
}
stack = get_any_stack((DbTable*)tb, stack_container);
- if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(tb, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
- traverse_backwards(tb, root, stack, lastkey, &doit_select_count, &sc);
+ traverse_backwards(&tb->common, stack, lastkey, &doit_select_count, &sc.common, iter);
release_stack((DbTable*)tb,stack_container,stack);
BUMP_REDS(p, 1000 - sc.max);
if (sc.max > 0) {
@@ -1467,15 +1530,16 @@ static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- return db_select_count_tree_common(p, &tb->common, &tb->root,
- tid, pattern, ret, tb);
+ return db_select_count_tree_common(p, tbl,
+ tid, pattern, ret, tb, NULL);
}
-int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+int db_select_chunk_tree_common(Process *p, DbTable *tb,
Eterm tid, Eterm pattern, Sint chunk_size,
int reverse, Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
DbTreeStack* stack;
struct select_context sc;
@@ -1489,7 +1553,6 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root
int errcode;
Eterm mpb;
-
#define RET_TO_BIF(Term,RetVal) do { \
if (mpi.mp != NULL) { \
erts_bin_free(mpi.mp); \
@@ -1505,24 +1568,30 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root
sc.p = p;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->keypos;
+ sc.keypos = tb->common.keypos;
sc.got = 0;
sc.chunk_size = chunk_size;
- if ((errcode = analyze_pattern(tb, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tb->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
- if (!mpi.something_can_match) {
+ if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
RET_TO_BIF(am_EOT,DB_ERROR_NONE);
/* can't possibly match anything */
}
sc.mp = mpi.mp;
- if (!mpi.got_partial && mpi.some_limitation &&
- CMP_EQ(mpi.least,mpi.most)) {
- doit_select(tb,*(mpi.save_term),&sc, 0 /* direction doesn't matter */);
+ if (mpi.key_boundness == MS_KEY_BOUND) {
+ ASSERT(CMP_EQ(mpi.least, mpi.most));
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &tb->tree.root;
+ this = find_node(&tb->common, *sc.common.root, mpi.least, NULL);
+ if (this)
+ doit_select(&tb->common, this, &sc.common, 0 /* direction doesn't matter */);
if (sc.accum != NIL) {
hp=HAlloc(p, 3);
RET_TO_BIF(TUPLE2(hp,sc.accum,am_EOT),DB_ERROR_NONE);
@@ -1533,21 +1602,35 @@ int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root
stack = get_any_stack((DbTable*)tb,stack_container);
if (reverse) {
- if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_next_from_pb_key(tb, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(tb, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
- traverse_backwards(tb, root, stack, lastkey, &doit_select_chunk, &sc);
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
+ traverse_backwards(&tb->common, stack, lastkey, &doit_select_chunk, &sc.common, iter);
} else {
- if (mpi.some_limitation) {
- if ((this = find_prev_from_pb_key(tb, *root, stack, mpi.least)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_prev_from_pb_key(tb, &sc.common.root, stack, mpi.least, iter);
+ if (this)
+ lastkey = GETKEY(tb, this->dbterm.tpl);
sc.end_condition = mpi.most;
}
- traverse_forward(tb, root, stack, lastkey, &doit_select_chunk, &sc);
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_first_root(iter);
+ else
+ sc.common.root = &tb->tree.root;
+ }
+ traverse_forward(&tb->common, stack, lastkey, &doit_select_chunk, &sc.common, iter);
}
release_stack((DbTable*)tb,stack_container,stack);
@@ -1624,18 +1707,18 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- return db_select_chunk_tree_common(p, &tb->common, &tb->root,
+ return db_select_chunk_tree_common(p, tbl,
tid, pattern, chunk_size,
- reverse, ret, tb);
+ reverse, ret, tb, NULL);
}
int db_select_delete_continue_tree_common(Process *p,
DbTable *tbl,
- TreeDbTerm **root,
Eterm continuation,
Eterm *ret,
- DbTreeStack* stack)
+ DbTreeStack* stack,
+ CATreeRootIterator* iter)
{
struct select_delete_context sc;
unsigned sz;
@@ -1647,7 +1730,6 @@ int db_select_delete_continue_tree_common(Process *p,
Eterm *tptr;
Eterm eaccsum;
-
#define RET_TO_BIF(Term, State) do { \
if (sc.erase_lastterm) { \
free_term(tbl, sc.lastterm); \
@@ -1670,7 +1752,6 @@ int db_select_delete_continue_tree_common(Process *p,
mp = erts_db_get_match_prog_binary_unchecked(tptr[4]);
sc.p = p;
sc.tb = &tbl->common;
- sc.root = root;
sc.stack = stack;
if (is_big(tptr[5])) {
sc.accum = big_to_uint32(tptr[5]);
@@ -1682,9 +1763,19 @@ int db_select_delete_continue_tree_common(Process *p,
sc.max = 1000;
sc.keypos = tbl->common.keypos;
- traverse_backwards(&tbl->common, root, stack, lastkey, &doit_select_delete, &sc);
+ if (iter) {
+ iter->next_route_key = lastkey;
+ sc.common.root = catree_find_prev_root(iter, NULL);
+ }
+ else {
+ sc.common.root = &tbl->tree.root;
+ }
- BUMP_REDS(p, 1000 - sc.max);
+ if (sc.common.root) {
+ traverse_backwards(&tbl->common, stack, lastkey, &doit_select_delete, &sc.common, iter);
+
+ BUMP_REDS(p, 1000 - sc.max);
+ }
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.accum, p), DB_ERROR_NONE);
@@ -1726,16 +1817,15 @@ static int db_select_delete_continue_tree(Process *p,
{
DbTableTree *tb = &tbl->tree;
ASSERT(!erts_atomic_read_nob(&tb->is_stack_busy));
- return db_select_delete_continue_tree_common(p, tbl, &tb->root,
- continuation, ret,
- &tb->static_stack);
+ return db_select_delete_continue_tree_common(p, tbl, continuation, ret,
+ &tb->static_stack, NULL);
}
int db_select_delete_tree_common(Process *p, DbTable *tbl,
- TreeDbTerm **root,
Eterm tid, Eterm pattern,
Eterm *ret,
- DbTreeStack* stack)
+ DbTreeStack* stack,
+ CATreeRootIterator* iter)
{
struct select_delete_context sc;
struct mp_info mpi;
@@ -1770,35 +1860,48 @@ int db_select_delete_tree_common(Process *p, DbTable *tbl,
sc.end_condition = NIL;
sc.keypos = tbl->common.keypos;
sc.tb = &tbl->common;
- sc.root = root;
sc.stack = stack;
- if ((errcode = analyze_pattern(&tbl->common, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tbl->common, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(0,errcode);
}
- if (!mpi.something_can_match) {
+ if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
RET_TO_BIF(make_small(0),DB_ERROR_NONE);
/* can't possibly match anything */
}
sc.mp = mpi.mp;
- if (!mpi.got_partial && mpi.some_limitation &&
- CMP_EQ(mpi.least,mpi.most)) {
- doit_select_delete(&tbl->common,*(mpi.save_term),&sc, 0 /* direction doesn't
+ if (mpi.key_boundness == MS_KEY_BOUND) {
+ ASSERT(CMP_EQ(mpi.least, mpi.most));
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &tbl->tree.root;
+ this = find_node(&tbl->common, *sc.common.root, mpi.least, NULL);
+ if (this)
+ doit_select_delete(&tbl->common, this, &sc.common, 0 /* direction doesn't
matter */);
RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE);
}
- if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(&tbl->common, *root, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(&tbl->common, this->dbterm.tpl);
- }
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_next_from_pb_key(tbl, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(&tbl->common, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tbl->tree.root;
+ }
- traverse_backwards(&tbl->common, root, stack, lastkey, &doit_select_delete, &sc);
+ traverse_backwards(&tbl->common, stack, lastkey,
+ &doit_select_delete, &sc.common, iter);
BUMP_REDS(p, 1000 - sc.max);
if (sc.max > 0) {
@@ -1842,16 +1945,16 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- return db_select_delete_tree_common(p, tbl, &tb->root, tid, pattern, ret,
- &tb->static_stack);
+ return db_select_delete_tree_common(p, tbl, tid, pattern, ret,
+ &tb->static_stack, NULL);
}
int db_select_replace_continue_tree_common(Process *p,
- DbTableCommon *tb,
- TreeDbTerm **root,
+ DbTable *tbl,
Eterm continuation,
Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
DbTreeStack* stack;
struct select_replace_context sc;
@@ -1888,7 +1991,7 @@ int db_select_replace_continue_tree_common(Process *p,
sc.end_condition = NIL;
sc.lastobj = NULL;
sc.max = 1000;
- sc.keypos = tb->keypos;
+ sc.keypos = tbl->common.keypos;
if (is_big(tptr[5])) {
sc.replaced = big_to_uint32(tptr[5]);
} else {
@@ -1896,9 +1999,18 @@ int db_select_replace_continue_tree_common(Process *p,
}
prev_replaced = sc.replaced;
- stack = get_any_stack((DbTable*)tb,stack_container);
- traverse_update_backwards(tb, root, stack, lastkey, &doit_select_replace, &sc);
- release_stack((DbTable*)tb,stack_container,stack);
+ if (iter) {
+ iter->next_route_key = lastkey;
+ sc.common.root = catree_find_prev_root(iter, NULL);
+ }
+ else {
+ sc.common.root = &tbl->tree.root;
+ }
+
+ stack = get_any_stack(tbl, stack_container);
+ traverse_update_backwards(&tbl->common, stack, lastkey, &doit_select_replace,
+ &sc.common, iter);
+ release_stack(tbl, stack_container,stack);
// the more objects we've replaced, the more reductions we've consumed
BUMP_REDS(p, MIN(2000, (1000 - sc.max) + (sc.replaced - prev_replaced)));
@@ -1906,7 +2018,7 @@ int db_select_replace_continue_tree_common(Process *p,
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.replaced,p), DB_ERROR_NONE);
}
- key = GETKEY(tb, sc.lastobj);
+ key = GETKEY(tbl, sc.lastobj);
if (end_condition != NIL &&
(cmp_partly_bound(end_condition,key) > 0)) {
/* done anyway */
@@ -1942,14 +2054,14 @@ static int db_select_replace_continue_tree(Process *p,
Eterm continuation,
Eterm *ret)
{
- DbTableTree *tb = &tbl->tree;
- return db_select_replace_continue_tree_common(p, &tb->common, &tb->root,
- continuation, ret, tb);
+ return db_select_replace_continue_tree_common(p, tbl, continuation, ret,
+ &tbl->tree, NULL);
}
-int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+int db_select_replace_tree_common(Process *p, DbTable *tbl,
Eterm tid, Eterm pattern, Eterm *ret,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter)
{
DbTreeStack* stack;
struct select_replace_context sc;
@@ -1977,49 +2089,64 @@ int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **ro
sc.lastobj = NULL;
sc.p = p;
- sc.tb = tb;
- sc.root = root;
+ sc.tb = &tbl->common;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->keypos;
+ sc.keypos = tbl->common.keypos;
sc.replaced = 0;
- if ((errcode = analyze_pattern(tb, root, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tbl->common, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
- if (!mpi.something_can_match) {
+ if (mpi.key_boundness == MS_KEY_IMPOSSIBLE) {
RET_TO_BIF(make_small(0),DB_ERROR_NONE);
/* can't possibly match anything */
}
sc.mp = mpi.mp;
- if (!mpi.got_partial && mpi.some_limitation &&
- CMP_EQ(mpi.least,mpi.most)) {
- doit_select_replace(tb,mpi.save_term,&sc,0 /* dummy */);
- reset_static_stack(stack_container); /* may refer replaced term */
+ if (mpi.key_boundness == MS_KEY_BOUND) {
+ TreeDbTerm** pp;
+ ASSERT(CMP_EQ(mpi.least, mpi.most));
+ if (iter)
+ sc.common.root = catree_find_root(mpi.least, iter);
+ else
+ sc.common.root = &tbl->tree.root;
+ pp = find_node2(&tbl->common, sc.common.root, mpi.least);
+ if (pp) {
+ doit_select_replace(&tbl->common, pp, &sc.common, 0 /* dummy */);
+ reset_static_stack(stack_container); /* may refer replaced term */
+ }
RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE);
}
- stack = get_any_stack((DbTable*)tb,stack_container);
+ stack = get_any_stack(tbl,stack_container);
- if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
- }
+ if (mpi.key_boundness == MS_KEY_PARTIALLY_BOUND) {
+ this = find_next_from_pb_key(tbl, &sc.common.root, stack, mpi.most, iter);
+ if (this)
+ lastkey = GETKEY(tbl, this->dbterm.tpl);
sc.end_condition = mpi.least;
}
+ else {
+ ASSERT(mpi.key_boundness == MS_KEY_UNBOUND);
+ if (iter)
+ sc.common.root = catree_find_last_root(iter);
+ else
+ sc.common.root = &tbl->tree.root;
+ }
- traverse_update_backwards(tb, root, stack, lastkey, &doit_select_replace, &sc);
- release_stack((DbTable*)tb,stack_container,stack);
+ traverse_update_backwards(&tbl->common, stack, lastkey, &doit_select_replace,
+ &sc.common, iter);
+ release_stack(tbl,stack_container,stack);
// the more objects we've replaced, the more reductions we've consumed
BUMP_REDS(p, MIN(2000, (1000 - sc.max) + sc.replaced));
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE);
}
- key = GETKEY(tb, sc.lastobj);
+ key = GETKEY(tbl, sc.lastobj);
sz = size_object(key);
if (IS_USMALL(0, sc.replaced)) {
hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6);
@@ -2052,9 +2179,8 @@ int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **ro
static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
- DbTableTree *tb = &tbl->tree;
- return db_select_replace_tree_common(p, &tb->common, &tb->root,
- tid, pattern, ret, tb);
+ return db_select_replace_tree_common(p, tbl, tid, pattern, ret,
+ &tbl->tree, NULL);
}
int db_take_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root,
@@ -2328,7 +2454,7 @@ static TreeDbTerm *linkout_object_tree(DbTableCommon *tb, TreeDbTerm **root,
** For the select functions, analyzes the pattern and determines which
** part of the tree should be searched. Also compiles the match program
*/
-static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
+static int analyze_pattern(DbTableCommon *tb, Eterm pattern,
extra_match_validator_t extra_validator, /* Optional callback */
struct mp_info *mpi)
{
@@ -2339,17 +2465,12 @@ static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
Eterm *ptpl;
int i;
int num_heads = 0;
- Eterm key;
- Eterm partly_bound;
- int res;
- Eterm least = 0;
- Eterm most = 0;
+ Eterm least = THE_NON_VALUE;
+ Eterm most = THE_NON_VALUE;
+ enum ms_key_boundness boundness;
- mpi->some_limitation = 1;
- mpi->got_partial = 0;
- mpi->something_can_match = 0;
+ mpi->key_boundness = MS_KEY_IMPOSSIBLE;
mpi->mp = NULL;
- mpi->save_term = NULL;
for (lst = pattern; is_list(lst); lst = CDR(list_val(lst)))
++num_heads;
@@ -2370,6 +2491,7 @@ static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
Eterm match;
Eterm guard;
Eterm body;
+ Eterm key;
ttpl = CAR(list_val(lst));
if (!is_tuple(ttpl)) {
@@ -2401,30 +2523,29 @@ static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
}
++i;
- partly_bound = NIL;
- res = key_given(tb, root, tpl, &(mpi->save_term), &partly_bound);
- if ( res >= 0 ) { /* Can match something */
- key = 0;
- mpi->something_can_match = 1;
- if (res > 0) {
- key = GETKEY(tb,tuple_val(tpl));
- } else if (partly_bound != NIL) {
- mpi->got_partial = 1;
- key = partly_bound;
- } else {
- mpi->some_limitation = 0;
- }
- if (key != 0) {
- if (least == 0 ||
- partly_bound_can_match_lesser(key,least)) {
- least = key;
- }
- if (most == 0 ||
- partly_bound_can_match_greater(key,most)) {
- most = key;
- }
- }
- }
+ boundness = key_boundness(tb, tpl, &key);
+ switch (boundness)
+ {
+ case MS_KEY_BOUND:
+ case MS_KEY_PARTIALLY_BOUND:
+ if (is_non_value(least) || partly_bound_can_match_lesser(key,least)) {
+ least = key;
+ }
+ if (is_non_value(most) || partly_bound_can_match_greater(key,most)) {
+ most = key;
+ }
+ break;
+ case MS_KEY_IMPOSSIBLE:
+ case MS_KEY_UNBOUND:
+ break;
+ }
+ if (mpi->key_boundness > boundness)
+ mpi->key_boundness = boundness;
+ }
+
+ if (mpi->key_boundness == MS_KEY_BOUND && !CMP_EQ(least, most)) {
+ /* Several different bound keys */
+ mpi->key_boundness = MS_KEY_PARTIALLY_BOUND;
}
mpi->least = least;
mpi->most = most;
@@ -2608,11 +2729,27 @@ static int delsub(TreeDbTerm **this)
static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root,
Sint slot, DbTable *tb,
- DbTableTree *stack_container)
+ DbTableTree *stack_container,
+ CATreeRootIterator *iter)
{
TreeDbTerm *this;
TreeDbTerm *tmp;
- DbTreeStack* stack = get_any_stack(tb,stack_container);
+ TreeDbTerm *lastobj;
+ Eterm lastkey;
+ TreeDbTerm **pp;
+ DbTreeStack* stack;
+
+ if (iter) {
+ /* Find first non-empty tree */
+ while (!root) {
+ TreeDbTerm** pp = catree_find_next_root(iter, NULL);
+ if (!pp)
+ return NULL;
+ root = *pp;
+ }
+ }
+
+ stack = get_any_stack(tb,stack_container);
ASSERT(stack != NULL);
if (slot == 1) { /* Don't search from where we are if we are
@@ -2624,56 +2761,83 @@ static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root,
are not recorded */
stack->pos = 0;
}
- if (EMPTY_NODE(stack)) {
- this = root;
- if (this == NULL)
- goto done;
- while (this->left != NULL){
- PUSH_NODE(stack, this);
- this = this->left;
- }
- PUSH_NODE(stack, this);
- stack->slot = 1;
- }
- this = TOP_NODE(stack);
- while (stack->slot != slot && this != NULL) {
- if (slot > stack->slot) {
- if (this->right != NULL) {
- this = this->right;
- while (this->left != NULL) {
- PUSH_NODE(stack, this);
- this = this->left;
- }
- PUSH_NODE(stack, this);
- } else {
- for (;;) {
- tmp = POP_NODE(stack);
- this = TOP_NODE(stack);
- if (this == NULL || this->left == tmp)
- break;
- }
- }
- ++(stack->slot);
- } else {
- if (this->left != NULL) {
- this = this->left;
- while (this->right != NULL) {
- PUSH_NODE(stack, this);
- this = this->right;
- }
- PUSH_NODE(stack, this);
- } else {
- for (;;) {
- tmp = POP_NODE(stack);
- this = TOP_NODE(stack);
- if (this == NULL || this->right == tmp)
- break;
- }
- }
- --(stack->slot);
- }
+ while (1) {
+ if (EMPTY_NODE(stack)) {
+ this = root;
+ if (this == NULL)
+ goto next_root;
+ while (this->left != NULL){
+ PUSH_NODE(stack, this);
+ this = this->left;
+ }
+ PUSH_NODE(stack, this);
+ stack->slot++;
+ }
+ this = TOP_NODE(stack);
+ while (stack->slot != slot) {
+ ASSERT(this);
+ lastobj = this;
+ if (slot > stack->slot) {
+ if (this->right != NULL) {
+ this = this->right;
+ while (this->left != NULL) {
+ PUSH_NODE(stack, this);
+ this = this->left;
+ }
+ PUSH_NODE(stack, this);
+ } else {
+ for (;;) {
+ tmp = POP_NODE(stack);
+ this = TOP_NODE(stack);
+ if (!this)
+ goto next_root;
+ if (this->left == tmp)
+ break;
+ }
+ }
+ ++(stack->slot);
+ } else {
+ if (this->left != NULL) {
+ this = this->left;
+ while (this->right != NULL) {
+ PUSH_NODE(stack, this);
+ this = this->right;
+ }
+ PUSH_NODE(stack, this);
+ } else {
+ for (;;) {
+ tmp = POP_NODE(stack);
+ this = TOP_NODE(stack);
+ if (!this)
+ goto next_root;
+ if (this->right == tmp)
+ break;
+ }
+ }
+ --(stack->slot);
+ }
+ }
+ /* Found slot */
+ ASSERT(this);
+ break;
+
+next_root:
+ if (!iter)
+ break; /* EOT */
+
+ ASSERT(slot > stack->slot);
+ if (lastobj) {
+ lastkey = GETKEY(tb, lastobj->dbterm.tpl);
+ lastobj = NULL;
+ }
+ pp = catree_find_next_root(iter, &lastkey);
+ if (!pp)
+ break; /* EOT */
+ root = *pp;
+ stack->pos = 0;
+ find_next(&tb->common, root, stack, lastkey);
}
-done:
+
release_stack(tb,stack_container,stack);
return this;
}
@@ -2708,7 +2872,7 @@ static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root,
this = this->right;
} else if (c < 0) {
if (this->left == NULL) /* Done */
- return this;
+ goto found_next;
else
this = this->left;
} else
@@ -2723,8 +2887,6 @@ static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root,
this = this->left;
PUSH_NODE(stack, this);
}
- if (stack->slot > 0)
- ++(stack->slot);
} else {
do {
tmp = POP_NODE(stack);
@@ -2733,9 +2895,12 @@ static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root,
return NULL;
}
} while (this->right == tmp);
- if (stack->slot > 0)
- ++(stack->slot);
}
+
+found_next:
+ if (stack->slot > 0)
+ ++(stack->slot);
+
return this;
}
@@ -2765,7 +2930,7 @@ static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root,
this = this->left;
} else if (c > 0) {
if (this->right == NULL) /* Done */
- return this;
+ goto found_prev;
else
this = this->right;
} else
@@ -2780,8 +2945,6 @@ static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root,
this = this->right;
PUSH_NODE(stack, this);
}
- if (stack->slot > 0)
- --(stack->slot);
} else {
do {
tmp = POP_NODE(stack);
@@ -2790,74 +2953,92 @@ static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root,
return NULL;
}
} while (this->left == tmp);
- if (stack->slot > 0)
- --(stack->slot);
}
+
+found_prev:
+ if (stack->slot > 0)
+ --(stack->slot);
+
return this;
}
-static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
- DbTreeStack* stack, Eterm key)
+
+static TreeDbTerm *find_next_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp,
+ DbTreeStack* stack, Eterm key,
+ CATreeRootIterator* iter)
{
+ TreeDbTerm* root;
TreeDbTerm *this;
- TreeDbTerm *tmp;
+ Uint candidate = 0;
Sint c;
+ if (iter) {
+ *rootpp = catree_find_next_from_pb_key_root(key, iter);
+ ASSERT(*rootpp);
+ root = **rootpp;
+ }
+ else {
+ *rootpp = &tbl->tree.root;
+ root = tbl->tree.root;
+ }
+
/* spool the stack, we have to "re-search" */
stack->pos = stack->slot = 0;
if (( this = root ) == NULL)
return NULL;
for (;;) {
PUSH_NODE(stack, this);
- if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl))) >= 0) {
+ if (( c = cmp_partly_bound(key,GETKEY(tbl, this->dbterm.tpl))) >= 0) {
if (this->right == NULL) {
- do {
- tmp = POP_NODE(stack);
- if (( this = TOP_NODE(stack)) == NULL) {
- return NULL;
- }
- } while (this->right == tmp);
- return this;
- } else
- this = this->right;
+ stack->pos = candidate;
+ return TOP_NODE(stack);
+ }
+ this = this->right;
} else /*if (c < 0)*/ {
if (this->left == NULL) /* Done */
return this;
- else
- this = this->left;
+ candidate = stack->pos;
+ this = this->left;
}
}
}
-static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
- DbTreeStack* stack, Eterm key)
+static TreeDbTerm *find_prev_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp,
+ DbTreeStack* stack, Eterm key,
+ CATreeRootIterator* iter)
{
+ TreeDbTerm* root;
TreeDbTerm *this;
- TreeDbTerm *tmp;
+ Uint candidate = 0;
Sint c;
+ if (iter) {
+ *rootpp = catree_find_prev_from_pb_key_root(key, iter);
+ ASSERT(*rootpp);
+ root = **rootpp;
+ }
+ else {
+ *rootpp = &tbl->tree.root;
+ root = tbl->tree.root;
+ }
+
/* spool the stack, we have to "re-search" */
stack->pos = stack->slot = 0;
if (( this = root ) == NULL)
return NULL;
for (;;) {
PUSH_NODE(stack, this);
- if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl))) <= 0) {
+ if (( c = cmp_partly_bound(key,GETKEY(tbl, this->dbterm.tpl))) <= 0) {
if (this->left == NULL) {
- do {
- tmp = POP_NODE(stack);
- if (( this = TOP_NODE(stack)) == NULL) {
- return NULL;
- }
- } while (this->left == tmp);
- return this;
- } else
- this = this->left;
- } else /*if (c < 0)*/ {
+ stack->pos = candidate;
+ return TOP_NODE(stack);
+ }
+ this = this->left;
+ } else /*if (c > 0)*/ {
if (this->right == NULL) /* Done */
return this;
- else
- this = this->right;
+ candidate = stack->pos;
+ this = this->right;
}
}
}
@@ -3046,38 +3227,53 @@ db_finalize_dbterm_tree(int cret, DbUpdateHandle *handle)
* Traverse the tree with a callback function, used by db_match_xxx
*/
static void traverse_backwards(DbTableCommon *tb,
- TreeDbTerm **root,
DbTreeStack* stack,
Eterm lastkey,
- int (*doit)(DbTableCommon *,
- TreeDbTerm *,
- void *,
- int),
- void *context)
+ traverse_doit_funcT* doit,
+ struct select_common *context,
+ CATreeRootIterator* iter)
{
TreeDbTerm *this, *next;
+ TreeDbTerm** root = context->root;
if (lastkey == THE_NON_VALUE) {
- stack->pos = stack->slot = 0;
- if (( this = *root ) == NULL) {
- return;
- }
- while (this != NULL) {
- PUSH_NODE(stack, this);
- this = this->right;
- }
- this = TOP_NODE(stack);
- next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 0)))
- return;
+ if (iter) {
+ while (*root == NULL) {
+ root = catree_find_prev_root(iter, NULL);
+ if (!root)
+ return;
+ }
+ context->root = root;
+ }
+ stack->pos = stack->slot = 0;
+ next = *root;
+ while (next != NULL) {
+ PUSH_NODE(stack, next);
+ next = next->right;
+ }
+ next = TOP_NODE(stack);
} else {
- next = find_prev(tb, *root, stack, lastkey);
+ next = find_prev(tb, *root, stack, lastkey);
}
- while ((this = next) != NULL) {
- next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 0)))
- return;
+ while (1) {
+ while (next) {
+ this = next;
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ next = find_prev(tb, *root, stack, lastkey);
+ if (!((*doit)(tb, this, context, 0)))
+ return;
+ }
+
+ if (!iter)
+ return;
+ ASSERT(is_value(lastkey));
+ root = catree_find_prev_root(iter, &lastkey);
+ if (!root)
+ return;
+ context->root = root;
+ stack->pos = stack->slot = 0;
+ next = find_prev(tb, *root, stack, lastkey);
}
}
@@ -3085,38 +3281,53 @@ static void traverse_backwards(DbTableCommon *tb,
* Traverse the tree with a callback function, used by db_match_xxx
*/
static void traverse_forward(DbTableCommon *tb,
- TreeDbTerm **root,
DbTreeStack* stack,
Eterm lastkey,
- int (*doit)(DbTableCommon *,
- TreeDbTerm *,
- void *,
- int),
- void *context)
+ traverse_doit_funcT* doit,
+ struct select_common *context,
+ CATreeRootIterator* iter)
{
TreeDbTerm *this, *next;
+ TreeDbTerm **root = context->root;
if (lastkey == THE_NON_VALUE) {
- stack->pos = stack->slot = 0;
- if (( this = *root ) == NULL) {
- return;
- }
- while (this != NULL) {
- PUSH_NODE(stack, this);
- this = this->left;
- }
- this = TOP_NODE(stack);
- next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 1)))
- return;
+ if (iter) {
+ while (*root == NULL) {
+ root = catree_find_next_root(iter, NULL);
+ if (!root)
+ return;
+ }
+ context->root = root;
+ }
+ stack->pos = stack->slot = 0;
+ next = *root;
+ while (next != NULL) {
+ PUSH_NODE(stack, next);
+ next = next->left;
+ }
+ next = TOP_NODE(stack);
} else {
- next = find_next(tb, *root, stack, lastkey);
+ next = find_next(tb, *root, stack, lastkey);
}
- while ((this = next) != NULL) {
- next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 1)))
- return;
+ while (1) {
+ while (next) {
+ this = next;
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ next = find_next(tb, *root, stack, lastkey);
+ if (!((*doit)(tb, this, context, 1)))
+ return;
+ }
+
+ if (!iter)
+ return;
+ ASSERT(is_value(lastkey));
+ root = catree_find_next_root(iter, &lastkey);
+ if (!root)
+ return;
+ context->root = root;
+ stack->pos = stack->slot = 0;
+ next = find_next(tb, *root, stack, lastkey);
}
}
@@ -3124,77 +3335,87 @@ static void traverse_forward(DbTableCommon *tb,
* Traverse the tree with an update callback function, used by db_select_replace
*/
static void traverse_update_backwards(DbTableCommon *tb,
- TreeDbTerm **root,
DbTreeStack* stack,
Eterm lastkey,
int (*doit)(DbTableCommon*,
TreeDbTerm**,
- void*,
+ struct select_common*,
int),
- void* context)
+ struct select_common* context,
+ CATreeRootIterator* iter)
{
int res;
TreeDbTerm *this, *next, **this_ptr;
+ TreeDbTerm** root = context->root;
if (lastkey == THE_NON_VALUE) {
- stack->pos = stack->slot = 0;
- if (( this = *root ) == NULL) {
- return;
+ if (iter) {
+ while (*root == NULL) {
+ root = catree_find_prev_root(iter, NULL);
+ if (!root)
+ return;
+ context->root = root;
+ }
}
- while (this != NULL) {
- PUSH_NODE(stack, this);
- this = this->right;
+ stack->pos = stack->slot = 0;
+ next = *root;
+ while (next) {
+ PUSH_NODE(stack, next);
+ next = next->right;
}
- this = TOP_NODE(stack);
- this_ptr = find_ptr(tb, root, stack, this);
- ASSERT(this_ptr != NULL);
- res = (*doit)(tb, this_ptr, context, 0);
- REPLACE_TOP_NODE(stack, *this_ptr);
- next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
- if (!res)
- return;
- } else {
- next = find_prev(tb, *root, stack, lastkey);
+ next = TOP_NODE(stack);
}
+ else
+ next = find_prev(tb, *root, stack, lastkey);
+
- while ((this = next) != NULL) {
- this_ptr = find_ptr(tb, root, stack, this);
- ASSERT(this_ptr != NULL);
- res = (*doit)(tb, this_ptr, context, 0);
- REPLACE_TOP_NODE(stack, *this_ptr);
- next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
- if (!res)
+ while (1) {
+ while (next) {
+ this = next;
+ this_ptr = find_ptr(tb, root, stack, this);
+ ASSERT(this_ptr != NULL);
+ res = (*doit)(tb, this_ptr, context, 0);
+ this = *this_ptr;
+ REPLACE_TOP_NODE(stack, this);
+ if (!res)
+ return;
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ next = find_prev(tb, *root, stack, lastkey);
+ }
+
+ if (!iter)
+ return;
+ ASSERT(is_value(lastkey));
+ root = catree_find_prev_root(iter, &lastkey);
+ if (!root)
return;
+ context->root = root;
+ stack->pos = stack->slot = 0;
+ next = find_prev(tb, *root, stack, lastkey);
}
}
-/*
- * Returns 0 if not given 1 if given and -1 on no possible match
- * if key is given; *ret is set to point to the object concerned.
- */
-static int key_given(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
- TreeDbTerm ***ret, Eterm *partly_bound)
+static enum ms_key_boundness key_boundness(DbTableCommon *tb,
+ Eterm pattern, Eterm *keyp)
{
- TreeDbTerm **this;
Eterm key;
- ASSERT(ret != NULL);
if (pattern == am_Underscore || db_is_variable(pattern) != -1)
- return 0;
+ return MS_KEY_UNBOUND;
key = db_getkey(tb->keypos, pattern);
if (is_non_value(key))
- return -1; /* can't possibly match anything */
+ return MS_KEY_IMPOSSIBLE; /* can't possibly match anything */
if (!db_has_variable(key)) { /* Bound key */
- if (( this = find_node2(tb, root, key) ) == NULL) {
- return -1;
- }
- *ret = this;
- return 1;
- } else if (partly_bound != NULL && key != am_Underscore &&
- db_is_variable(key) < 0 && !db_has_map(key))
- *partly_bound = key;
+ *keyp = key;
+ return MS_KEY_BOUND;
+ } else if (key != am_Underscore &&
+ db_is_variable(key) < 0 && !db_has_map(key)) {
+
+ *keyp = key;
+ return MS_KEY_PARTIALLY_BOUND;
+ }
- return 0;
+ return MS_KEY_UNBOUND;
}
@@ -3262,7 +3483,8 @@ static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done)
}
}
-static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key) {
+Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key)
+{
int done = 0;
Sint ret = do_cmp_partly_bound(partly_bound_key, bound_key, &done);
#ifdef HARDDEBUG
@@ -3478,7 +3700,8 @@ static int do_partly_bound_can_match_greater(Eterm a, Eterm b,
* Callback functions for the different match functions
*/
-static int doit_select(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
+static int doit_select(DbTableCommon *tb, TreeDbTerm *this,
+ struct select_common* ptr,
int forward)
{
struct select_context *sc = (struct select_context *) ptr;
@@ -3513,7 +3736,8 @@ static int doit_select(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
return 1;
}
-static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
+static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this,
+ struct select_common* ptr,
int forward)
{
struct select_count_context *sc = (struct select_count_context *) ptr;
@@ -3537,7 +3761,8 @@ static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
return 1;
}
-static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
+static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this,
+ struct select_common* ptr,
int forward)
{
struct select_context *sc = (struct select_context *) ptr;
@@ -3575,7 +3800,8 @@ static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
}
-static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
+static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this,
+ struct select_common *ptr,
int forward)
{
struct select_delete_context *sc = (struct select_delete_context *) ptr;
@@ -3594,7 +3820,7 @@ static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, NULL, 0);
if (ret == am_true) {
key = GETKEY(sc->tb, this->dbterm.tpl);
- linkout_tree(sc->tb, sc->root, key, sc->stack);
+ linkout_tree(sc->tb, sc->common.root, key, sc->stack);
sc->erase_lastterm = 1;
++sc->accum;
}
@@ -3604,7 +3830,8 @@ static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
return 1;
}
-static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this, void *ptr,
+static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this,
+ struct select_common* ptr,
int forward)
{
struct select_replace_context *sc = (struct select_replace_context *) ptr;
diff --git a/erts/emulator/beam/erl_db_tree_util.h b/erts/emulator/beam/erl_db_tree_util.h
index fbd9a9124a..02df74678d 100644
--- a/erts/emulator/beam/erl_db_tree_util.h
+++ b/erts/emulator/beam/erl_db_tree_util.h
@@ -92,49 +92,54 @@ int db_erase_object_tree_common(DbTable *tbl, TreeDbTerm **root, Eterm object,
Eterm *ret, DbTableTree *stack_container);
int db_slot_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
Eterm slot_term, Eterm *ret,
- DbTableTree *stack_container);
-int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+ DbTableTree *stack_container,
+ CATreeRootIterator*);
+int db_select_chunk_tree_common(Process *p, DbTable *tb,
Eterm tid, Eterm pattern, Sint chunk_size,
int reverse, Eterm *ret,
- DbTableTree *stack_container);
-int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+ DbTableTree *stack_container,
+ CATreeRootIterator*);
+int db_select_tree_common(Process *p, DbTable *tb,
Eterm tid, Eterm pattern, int reverse, Eterm *ret,
- DbTableTree *stack_container);
+ DbTableTree *stack_container,
+ CATreeRootIterator*);
int db_select_delete_tree_common(Process *p, DbTable *tbl,
- TreeDbTerm **root,
Eterm tid, Eterm pattern,
Eterm *ret,
- DbTreeStack* stack);
+ DbTreeStack* stack,
+ CATreeRootIterator* iter);
int db_select_continue_tree_common(Process *p,
DbTableCommon *tb,
- TreeDbTerm **root,
Eterm continuation,
Eterm *ret,
- DbTableTree *stack_container);
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter);
int db_select_delete_continue_tree_common(Process *p,
DbTable *tbl,
- TreeDbTerm **root,
Eterm continuation,
Eterm *ret,
- DbTreeStack* stack);
-int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+ DbTreeStack* stack,
+ CATreeRootIterator* iter);
+int db_select_count_tree_common(Process *p, DbTable *tb,
Eterm tid, Eterm pattern, Eterm *ret,
- DbTableTree *stack_container);
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter);
int db_select_count_continue_tree_common(Process *p,
- DbTableCommon *tb,
- TreeDbTerm **root,
+ DbTable *tb,
Eterm continuation,
Eterm *ret,
- DbTableTree *stack_container);
-int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter);
+int db_select_replace_tree_common(Process *p, DbTable*,
Eterm tid, Eterm pattern, Eterm *ret,
- DbTableTree *stack_container);
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter);
int db_select_replace_continue_tree_common(Process *p,
- DbTableCommon *tb,
- TreeDbTerm **root,
+ DbTable*,
Eterm continuation,
Eterm *ret,
- DbTableTree *stack_container);
+ DbTableTree *stack_container,
+ CATreeRootIterator* iter);
int db_take_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root,
Eterm key, Eterm *ret,
DbTreeStack *stack /* NULL if no static stack */);
@@ -148,4 +153,6 @@ int db_lookup_dbterm_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root,
DbTableTree *stack_container);
void db_finalize_dbterm_tree_common(int cret, DbUpdateHandle *handle,
DbTableTree *stack_container);
+Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key);
+
#endif /* _DB_TREE_UTIL_H */
diff --git a/erts/emulator/beam/erl_db_util.h b/erts/emulator/beam/erl_db_util.h
index be74bc795c..e1af9210ea 100644
--- a/erts/emulator/beam/erl_db_util.h
+++ b/erts/emulator/beam/erl_db_util.h
@@ -89,9 +89,16 @@ typedef struct {
void** bp; /* {Hash|Tree}DbTerm** */
Uint new_size;
int flags;
- void* lck;
- void* lck2;
- int current_level;
+ union {
+ struct {
+ erts_rwmtx_t* lck;
+ } hash;
+ struct {
+ struct DbTableCATreeNode* base_node;
+ struct DbTableCATreeNode* parent;
+ int current_level;
+ } catree;
+ } u;
} DbUpdateHandle;
@@ -290,6 +297,8 @@ typedef struct db_table_common {
#define DB_NAMED_TABLE (1 << 11)
#define DB_BUSY (1 << 12)
+#define DB_CATREE_FORCE_SPLIT (1 << 31) /* erts_debug */
+
#define IS_HASH_TABLE(Status) (!!((Status) & \
(DB_BAG | DB_SET | DB_DUPLICATE_BAG)))
#define IS_TREE_TABLE(Status) (!!((Status) & \
diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c
index 987e370341..cfa63e7cd9 100644
--- a/erts/emulator/beam/erl_lock_check.c
+++ b/erts/emulator/beam/erl_lock_check.c
@@ -92,7 +92,7 @@ static erts_lc_lock_order_t erts_lock_order[] = {
{ "db_tab", "address" },
{ "db_tab_fix", "address" },
{ "db_hash_slot", "address" },
- { "erl_db_catree_base_node", "term" },
+ { "erl_db_catree_base_node", NULL },
{ "erl_db_catree_route_node", "index" },
{ "resource_monitors", "address" },
{ "driver_list", NULL },
@@ -194,6 +194,12 @@ struct lc_locked_lock_t_ {
unsigned int line;
erts_lock_flags_t flags;
erts_lock_options_t taken_options;
+ /*
+ * Pointer back to the lock instance if it exists or NULL for proc locks.
+ * If set, we use it to allow trylock of other lock instance
+ * but with identical lock order as an already locked lock.
+ */
+ erts_lc_lock_t *lck;
};
typedef struct {
@@ -407,6 +413,10 @@ new_locked_lock(lc_thread_t* thr,
ll->line = line;
ll->flags = lck->flags;
ll->taken_options = options;
+ if ((lck->flags & ERTS_LOCK_FLAGS_MASK_TYPE) == ERTS_LOCK_FLAGS_TYPE_PROCLOCK)
+ ll->lck = NULL;
+ else
+ ll->lck = lck;
return ll;
}
@@ -1005,17 +1015,14 @@ erts_lc_trylock_force_busy_flg(erts_lc_lock_t *lck, erts_lock_options_t options)
*/
/* Check that we are not trying to lock this lock twice */
- while (1) {
- if (order <= 0) {
- if (order == 0)
- lock_twice("Trylocking", thr, lck, options);
- break;
- }
+ do {
+ if (order == 0 && (ll->lck == lck || !ll->lck))
+ lock_twice("Trylocking", thr, lck, options);
ll = ll->prev;
if (!ll)
break;
order = compare_locked_by_id_extra(ll, lck);
- }
+ } while (order >= 0);
#ifndef ERTS_LC_ALLWAYS_FORCE_BUSY_TRYLOCK_ON_LOCK_ORDER_VIOLATION
/* We only force busy if a lock order violation would occur
@@ -1034,11 +1041,6 @@ erts_lc_trylock_force_busy_flg(erts_lc_lock_t *lck, erts_lock_options_t options)
#endif
}
-/*
- * locked = 0 trylock failed
- * locked > 0 trylock succeeded
- * locked < 0 prelocking of newly created lock (no lock order check)
- */
void erts_lc_trylock_flg_x(int locked, erts_lc_lock_t *lck, erts_lock_options_t options,
char *file, unsigned int line)
{
@@ -1069,8 +1071,8 @@ void erts_lc_trylock_flg_x(int locked, erts_lc_lock_t *lck, erts_lock_options_t
for (tl_lck = thr->locked.last; tl_lck; tl_lck = tl_lck->prev) {
int order = compare_locked_by_id_extra(tl_lck, lck);
if (order <= 0) {
- if (order == 0 && locked >= 0)
- lock_twice("Trylocking", thr, lck, options);
+ if (order == 0 && (tl_lck->lck == lck || !tl_lck->lck))
+ lock_twice("Trylocking", thr, lck, options);
if (locked) {
ll->next = tl_lck->next;
ll->prev = tl_lck;
@@ -1210,7 +1212,7 @@ void erts_lc_lock_flg_x(erts_lc_lock_t *lck, erts_lock_options_t options,
thr->locked.last->next = new_ll;
thr->locked.last = new_ll;
}
- else if (thr->locked.last->id == lck->id && thr->locked.last->extra == lck->extra)
+ else if (order == 0)
lock_twice("Locking", thr, lck, options);
else
lock_order_violation(thr, lck);
diff --git a/lib/stdlib/doc/src/ets.xml b/lib/stdlib/doc/src/ets.xml
index 57a19ef2ca..611b176613 100644
--- a/lib/stdlib/doc/src/ets.xml
+++ b/lib/stdlib/doc/src/ets.xml
@@ -611,9 +611,8 @@ Error: fun containing local Erlang function calls
</item>
<item>
<p><c>Item=stats, Value=tuple()</c></p>
- <p>Returns internal statistics about <c>set</c>, <c>bag</c>, and
- <c>duplicate_bag</c> tables on an internal format used by OTP
- test suites. Not for production use.</p></item>
+ <p>Returns internal statistics about tables on an internal format
+ used by OTP test suites. Not for production use.</p></item>
</list>
</desc>
</func>
@@ -1140,16 +1139,11 @@ ets:select(Table, MatchSpec),</code>
<c>set</c>, <c>bag</c> and <c>duplicate_bag</c>. For
<c>ordered_set</c> the memory overhead depends on the number
of inserted objects and the amount of actual detected
- concurrency. The memory overhead can be especially large when both
- options are combined.</p>
+ concurrency in runtime. The memory overhead can be especially
+ large when both options are combined.</p>
<note>
<p>Prior to stdlib-3.7 (OTP-22.0) <c>write_concurrency</c> had no
effect on <c>ordered_set</c>.</p>
- <p>The current implementation of <c>write_concurrency</c> for
- <c>ordered_set</c> does only improve explicit single key
- operations. Mixing single key operations with operations
- potentially accessing multiple keys may even yield worse
- performance with <c>write_concurrency</c> on <c>ordered_set</c>.</p>
</note>
<marker id="new_2_read_concurrency"></marker>
</item>
diff --git a/lib/stdlib/src/stdlib.app.src b/lib/stdlib/src/stdlib.app.src
index cd09872b87..9cd425db9a 100644
--- a/lib/stdlib/src/stdlib.app.src
+++ b/lib/stdlib/src/stdlib.app.src
@@ -108,7 +108,7 @@
dets]},
{applications, [kernel]},
{env, []},
- {runtime_dependencies, ["sasl-3.0","kernel-6.0","erts-10.0","crypto-3.3",
+ {runtime_dependencies, ["sasl-3.0","kernel-6.0","erts-@OTP-15128@","crypto-3.3",
"compiler-5.0"]}
]}.
diff --git a/lib/stdlib/test/ets_SUITE.erl b/lib/stdlib/test/ets_SUITE.erl
index 2c0692855f..e49181b12f 100644
--- a/lib/stdlib/test/ets_SUITE.erl
+++ b/lib/stdlib/test/ets_SUITE.erl
@@ -66,6 +66,7 @@
meta_lookup_named_read/1, meta_lookup_named_write/1,
meta_newdel_unnamed/1, meta_newdel_named/1]).
-export([smp_insert/1, smp_fixed_delete/1, smp_unfix_fix/1, smp_select_delete/1,
+ smp_ordered_iteration/1,
smp_select_replace/1, otp_8166/1, otp_8732/1, delete_unfix_race/1]).
-export([throughput_benchmark/0, test_throughput_benchmark/1]).
-export([exit_large_table_owner/1,
@@ -133,7 +134,8 @@ all() ->
otp_5340, otp_6338, otp_6842_select_1000, otp_7665,
otp_8732, meta_wb, grow_shrink, grow_pseudo_deleted,
shrink_pseudo_deleted, {group, meta_smp}, smp_insert,
- smp_fixed_delete, smp_unfix_fix, smp_select_replace,
+ smp_fixed_delete, smp_unfix_fix, smp_select_replace,
+ smp_ordered_iteration,
smp_select_delete, otp_8166, exit_large_table_owner,
exit_many_large_table_owner, exit_many_tables_owner,
exit_many_many_tables_owner, write_concurrency, heir,
@@ -796,15 +798,16 @@ t_delete_all_objects(Config) when is_list(Config) ->
get_kept_objects(T) ->
case ets:info(T,stats) of
- false ->
- 0;
{_,_,_,_,_,_,KO} ->
- KO
+ KO;
+ _ ->
+ 0
end.
t_delete_all_objects_do(Opts) ->
- T=ets_new(x,Opts),
- filltabint(T,4000),
+ KeyRange = 4000,
+ T=ets_new(x, Opts, KeyRange),
+ filltabint(T,KeyRange),
O=ets:first(T),
ets:next(T,O),
ets:safe_fixtable(T,true),
@@ -813,13 +816,13 @@ t_delete_all_objects_do(Opts) ->
0 = ets:info(T,size),
case ets:info(T,type) of
ordered_set -> ok;
- _ -> 4000 = get_kept_objects(T)
+ _ -> KeyRange = get_kept_objects(T)
end,
ets:safe_fixtable(T,false),
0 = ets:info(T,size),
0 = get_kept_objects(T),
- filltabint(T,4000),
- 4000 = ets:info(T,size),
+ filltabint(T, KeyRange),
+ KeyRange = ets:info(T,size),
true = ets:delete_all_objects(T),
0 = ets:info(T,size),
ets:delete(T),
@@ -3104,18 +3107,18 @@ setbag(Config) when is_list(Config) ->
%% Test case to check proper return values for illegal ets_new() calls.
badnew(Config) when is_list(Config) ->
EtsMem = etsmem(),
- {'EXIT',{badarg,_}} = (catch ets_new(12,[])),
- {'EXIT',{badarg,_}} = (catch ets_new({a,b},[])),
- {'EXIT',{badarg,_}} = (catch ets_new(name,[foo])),
- {'EXIT',{badarg,_}} = (catch ets_new(name,{bag})),
- {'EXIT',{badarg,_}} = (catch ets_new(name,bag)),
+ {'EXIT',{badarg,_}} = (catch ets:new(12,[])),
+ {'EXIT',{badarg,_}} = (catch ets:new({a,b},[])),
+ {'EXIT',{badarg,_}} = (catch ets:new(name,[foo])),
+ {'EXIT',{badarg,_}} = (catch ets:new(name,{bag})),
+ {'EXIT',{badarg,_}} = (catch ets:new(name,bag)),
verify_etsmem(EtsMem).
%% OTP-2314. Test case to check that a non-proper list does not
%% crash the emulator.
verybadnew(Config) when is_list(Config) ->
EtsMem = etsmem(),
- {'EXIT',{badarg,_}} = (catch ets_new(verybad,[set|protected])),
+ {'EXIT',{badarg,_}} = (catch ets:new(verybad,[set|protected])),
verify_etsmem(EtsMem).
%% Small check to see if named tables work.
@@ -3464,9 +3467,11 @@ delete_tab_do(Opts) ->
%% Check that ets:delete/1 works and that other processes can run.
delete_large_tab(Config) when is_list(Config) ->
ct:timetrap({minutes,60}), %% valgrind needs a lot
- Data = [{erlang:phash2(I, 16#ffffff),I} || I <- lists:seq(1, 200000)],
+ KeyRange = 16#ffffff,
+ Data = [{erlang:phash2(I, KeyRange),I} || I <- lists:seq(1, 200000)],
EtsMem = etsmem(),
- repeat_for_opts(fun(Opts) -> delete_large_tab_do(Opts,Data) end),
+ repeat_for_opts(fun(Opts) -> delete_large_tab_do(key_range(Opts,KeyRange),
+ Data) end),
verify_etsmem(EtsMem).
delete_large_tab_do(Opts,Data) ->
@@ -3542,9 +3547,13 @@ delete_large_tab_2(Name, Flags, Data, Fix) ->
%% Delete a large name table and try to create a new table with
%% the same name in another process.
delete_large_named_table(Config) when is_list(Config) ->
- Data = [{erlang:phash2(I, 16#ffffff),I} || I <- lists:seq(1, 200000)],
+ KeyRange = 16#ffffff,
+ Data = [{erlang:phash2(I, KeyRange),I} || I <- lists:seq(1, 200000)],
EtsMem = etsmem(),
- repeat_for_opts(fun(Opts) -> delete_large_named_table_do(Opts,Data) end),
+ repeat_for_opts(fun(Opts) ->
+ delete_large_named_table_do(key_range(Opts,KeyRange),
+ Data)
+ end),
verify_etsmem(EtsMem),
ok.
@@ -3585,8 +3594,12 @@ delete_large_named_table_2(Name, Flags, Data, Fix) ->
%% Delete a large table, and kill the process during the delete.
evil_delete(Config) when is_list(Config) ->
- Data = [{I,I*I} || I <- lists:seq(1, 100000)],
- repeat_for_opts(fun(Opts) -> evil_delete_do(Opts,Data) end).
+ KeyRange = 100000,
+ Data = [{I,I*I} || I <- lists:seq(1, KeyRange)],
+ repeat_for_opts(fun(Opts) ->
+ evil_delete_do(key_range(Opts,KeyRange),
+ Data)
+ end).
evil_delete_do(Opts,Data) ->
EtsMem = etsmem(),
@@ -4154,19 +4167,12 @@ match_object2(Config) when is_list(Config) ->
match_object2_do(Opts) ->
EtsMem = etsmem(),
- Tab = ets_new(foo, [bag, {keypos, 2} | Opts]),
- fill_tab2(Tab, 0, 13005), % match_db_object does 1000
+ KeyRange = 13005,
+ Tab = ets_new(foo, [{keypos, 2} | Opts], KeyRange),
+ fill_tab2(Tab, 0, KeyRange), % match_db_object does 1000
% elements per pass, might
% change in the future.
- case catch ets:match_object(Tab, {hej, '$1'}) of
- {'EXIT', _} ->
- ets:delete(Tab),
- ct:fail("match_object EXIT:ed");
- [] ->
- io:format("Nothing matched.");
- List ->
- io:format("Matched:~p~n",[List])
- end,
+ [] = ets:match_object(Tab, {hej, '$1'}),
ets:delete(Tab),
verify_etsmem(EtsMem).
@@ -4411,10 +4417,11 @@ tab2file2(Config) when is_list(Config) ->
tab2file2_do(Opts, Config) ->
EtsMem = etsmem(),
- Tab = ets_new(ets_SUITE_foo_tab, [named_table, private,
- {keypos, 2} | Opts]),
+ KeyRange = 10000,
+ Tab = ets_new(ets_SUITE_foo_tab, [named_table, private, {keypos, 2} | Opts],
+ KeyRange),
FName = filename:join([proplists:get_value(priv_dir, Config),"tab2file2_case"]),
- ok = fill_tab2(Tab, 0, 10000), % Fill up the table (grucho mucho!)
+ ok = fill_tab2(Tab, 0, KeyRange), % Fill up the table (grucho mucho!)
Len = length(ets:tab2list(Tab)),
Mem = ets:info(Tab, memory),
Type = ets:info(Tab, type),
@@ -4473,8 +4480,9 @@ tabfile_ext1(Config) when is_list(Config) ->
tabfile_ext1_do(Opts,Config) ->
FName = filename:join([proplists:get_value(priv_dir, Config),"nisse.dat"]),
FName2 = filename:join([proplists:get_value(priv_dir, Config),"countflip.dat"]),
- L = lists:seq(1,10),
- T = ets_new(x,Opts),
+ KeyRange = 10,
+ L = lists:seq(1,KeyRange),
+ T = ets_new(x,Opts,KeyRange),
Name = make_ref(),
[ets:insert(T,{X,integer_to_list(X)}) || X <- L],
ok = ets:tab2file(T,FName,[{extended_info,[object_count]}]),
@@ -4511,8 +4519,9 @@ tabfile_ext2(Config) when is_list(Config) ->
tabfile_ext2_do(Opts,Config) ->
FName = filename:join([proplists:get_value(priv_dir, Config),"olle.dat"]),
FName2 = filename:join([proplists:get_value(priv_dir, Config),"bitflip.dat"]),
- L = lists:seq(1,10),
- T = ets_new(x,Opts),
+ KeyRange = 10,
+ L = lists:seq(1, KeyRange),
+ T = ets_new(x, Opts, KeyRange),
Name = make_ref(),
[ets:insert(T,{X,integer_to_list(X)}) || X <- L],
ok = ets:tab2file(T,FName,[{extended_info,[md5sum]}]),
@@ -4681,9 +4690,10 @@ heavy_lookup(Config) when is_list(Config) ->
heavy_lookup_do(Opts) ->
EtsMem = etsmem(),
- Tab = ets_new(foobar_table, [{keypos, 2} | Opts]),
- ok = fill_tab2(Tab, 0, 7000),
- _ = [do_lookup(Tab, 6999) || _ <- lists:seq(1, 50)],
+ KeyRange = 7000,
+ Tab = ets_new(foobar_table, [{keypos, 2} | Opts], KeyRange),
+ ok = fill_tab2(Tab, 0, KeyRange),
+ _ = [do_lookup(Tab, KeyRange-1) || _ <- lists:seq(1, 50)],
true = ets:delete(Tab),
verify_etsmem(EtsMem).
@@ -4704,11 +4714,12 @@ heavy_lookup_element(Config) when is_list(Config) ->
heavy_lookup_element_do(Opts) ->
EtsMem = etsmem(),
- Tab = ets_new(foobar_table, [{keypos, 2} | Opts]),
- ok = fill_tab2(Tab, 0, 7000),
+ KeyRange = 7000,
+ Tab = ets_new(foobar_table, [{keypos, 2} | Opts], KeyRange),
+ ok = fill_tab2(Tab, 0, KeyRange),
%% lookup ALL elements 50 times
Laps = 50 div syrup_factor(),
- _ = [do_lookup_element(Tab, 6999, 1) || _ <- lists:seq(1, Laps)],
+ _ = [do_lookup_element(Tab, KeyRange-1, 1) || _ <- lists:seq(1, Laps)],
true = ets:delete(Tab),
verify_etsmem(EtsMem).
@@ -4731,11 +4742,11 @@ heavy_concurrent(Config) when is_list(Config) ->
repeat_for_opts_all_set_table_types(fun do_heavy_concurrent/1).
do_heavy_concurrent(Opts) ->
- Size = 10000,
+ KeyRange = 10000,
Laps = 10000 div syrup_factor(),
EtsMem = etsmem(),
- Tab = ets_new(blupp, [public, {keypos, 2} | Opts]),
- ok = fill_tab2(Tab, 0, Size),
+ Tab = ets_new(blupp, [public, {keypos, 2} | Opts], KeyRange),
+ ok = fill_tab2(Tab, 0, KeyRange),
Procs = lists:map(
fun (N) ->
my_spawn_link(
@@ -5014,6 +5025,7 @@ filltabint(Tab,0) ->
filltabint(Tab,N) ->
ets:insert(Tab,{N,integer_to_list(N)}),
filltabint(Tab,N-1).
+
filltabint2(Tab,0) ->
Tab;
filltabint2(Tab,N) ->
@@ -5230,8 +5242,9 @@ gen_dets_filename(Config,N) ->
otp_6842_select_1000(Config) when is_list(Config) ->
repeat_for_opts_all_ord_set_table_types(
fun(Opts) ->
- Tab = ets_new(xxx,Opts),
- [ets:insert(Tab,{X,X}) || X <- lists:seq(1,10000)],
+ KeyRange = 10000,
+ Tab = ets_new(xxx, Opts, KeyRange),
+ [ets:insert(Tab,{X,X}) || X <- lists:seq(1,KeyRange)],
AllTrue = lists:duplicate(10,true),
AllTrue =
[ length(
@@ -5420,7 +5433,7 @@ grow_shrink(Config) when is_list(Config) ->
fun(Opts) ->
EtsMem = etsmem(),
- Set = ets_new(a, Opts),
+ Set = ets_new(a, Opts, 5000),
grow_shrink_0(0, 3071, 3000, 5000, Set),
ets:delete(Set),
@@ -5449,14 +5462,13 @@ grow_shrink_3(N, ShrinkTo, T) ->
true = ets:delete(T, N),
grow_shrink_3(N-1, ShrinkTo, T).
-%% Grow a table that still contains pseudo-deleted objects.
+%% Grow a hash table that still contains pseudo-deleted objects.
grow_pseudo_deleted(Config) when is_list(Config) ->
only_if_smp(fun() -> grow_pseudo_deleted_do() end).
grow_pseudo_deleted_do() ->
lists:foreach(fun(Type) -> grow_pseudo_deleted_do(Type) end,
- [set,cat_ord_set,stim_cat_ord_set,
- ordered_set,bag,duplicate_bag]).
+ [set,bag,duplicate_bag]).
grow_pseudo_deleted_do(Type) ->
process_flag(scheduler,1),
@@ -5471,12 +5483,7 @@ grow_pseudo_deleted_do(Type) ->
[true]}]),
Left = Mult*(Mod-1),
Left = ets:info(T,size),
- case Type of
- cat_ord_set -> ok;
- stim_cat_ord_set -> ok;
- ordered_set -> ok;
- _ -> Mult = get_kept_objects(T)
- end,
+ Mult = get_kept_objects(T),
filltabstr(T,Mult),
my_spawn_opt(
fun() ->
@@ -5508,14 +5515,13 @@ grow_pseudo_deleted_do(Type) ->
ets:delete(T),
process_flag(scheduler,0).
-%% Shrink a table that still contains pseudo-deleted objects.
+%% Shrink a hash table that still contains pseudo-deleted objects.
shrink_pseudo_deleted(Config) when is_list(Config) ->
only_if_smp(fun()->shrink_pseudo_deleted_do() end).
shrink_pseudo_deleted_do() ->
lists:foreach(fun(Type) -> shrink_pseudo_deleted_do(Type) end,
- [set,cat_ord_set,stim_cat_ord_set,
- ordered_set,bag,duplicate_bag]).
+ [set,bag,duplicate_bag]).
shrink_pseudo_deleted_do(Type) ->
process_flag(scheduler,1),
@@ -5529,12 +5535,7 @@ shrink_pseudo_deleted_do(Type) ->
[{'>', '$1', Half}],
[true]}]),
Half = ets:info(T,size),
- case Type of
- cat_ord_set -> ok;
- stim_cat_ord_set -> ok;
- ordered_set -> ok;
- _ -> Half = get_kept_objects(T)
- end,
+ Half = get_kept_objects(T),
my_spawn_opt(
fun()-> true = ets:info(T,fixed),
Self ! start,
@@ -5638,9 +5639,11 @@ smp_insert(Config) when is_list(Config) ->
[[set,ordered_set,stim_cat_ord_set]]).
smp_insert_do(Opts) ->
- ets_new(smp_insert,[named_table,public,{write_concurrency,true}|Opts]),
+ KeyRange = 10000,
+ ets_new(smp_insert,[named_table,public,{write_concurrency,true}|Opts],
+ KeyRange),
InitF = fun(_) -> ok end,
- ExecF = fun(_) -> true = ets:insert(smp_insert,{rand:uniform(10000)})
+ ExecF = fun(_) -> true = ets:insert(smp_insert,{rand:uniform(KeyRange)})
end,
FiniF = fun(_) -> ok end,
run_smp_workers(InitF,ExecF,FiniF,100000),
@@ -5649,41 +5652,36 @@ smp_insert_do(Opts) ->
%% Concurrent deletes on same fixated table.
smp_fixed_delete(Config) when is_list(Config) ->
- only_if_smp(fun()->
- repeat_for_opts(fun smp_fixed_delete_do/1,
- [[set,ordered_set,stim_cat_ord_set]])
- end).
-
-smp_fixed_delete_do(Opts) ->
- begin
- T = ets_new(foo,[public,{write_concurrency,true}|Opts]),
- %%Mem = ets:info(T,memory),
- NumOfObjs = 100000,
- filltabint(T,NumOfObjs),
- ets:safe_fixtable(T,true),
- Buckets = num_of_buckets(T),
- InitF = fun([ProcN,NumOfProcs|_]) -> {ProcN,NumOfProcs} end,
- ExecF = fun({Key,_}) when Key > NumOfObjs ->
- [end_of_work];
- ({Key,Increment}) ->
- true = ets:delete(T,Key),
- {Key+Increment,Increment}
- end,
- FiniF = fun(_) -> ok end,
- run_sched_workers(InitF,ExecF,FiniF,NumOfObjs),
- 0 = ets:info(T,size),
- true = ets:info(T,fixed),
- Buckets = num_of_buckets(T),
- case ets:info(T,type) of
- set -> NumOfObjs = get_kept_objects(T);
- _ -> ok
- end,
- ets:safe_fixtable(T,false),
- %% Will fail as unfix does not shrink the table:
- %%Mem = ets:info(T,memory),
- %%verify_table_load(T),
- ets:delete(T)
- end.
+ only_if_smp(fun() -> smp_fixed_delete_do() end).
+
+smp_fixed_delete_do() ->
+ T = ets_new(foo,[public,{write_concurrency,true}]),
+ %%Mem = ets:info(T,memory),
+ NumOfObjs = 100000,
+ filltabint(T,NumOfObjs),
+ ets:safe_fixtable(T,true),
+ Buckets = num_of_buckets(T),
+ InitF = fun([ProcN,NumOfProcs|_]) -> {ProcN,NumOfProcs} end,
+ ExecF = fun({Key,_}) when Key > NumOfObjs ->
+ [end_of_work];
+ ({Key,Increment}) ->
+ true = ets:delete(T,Key),
+ {Key+Increment,Increment}
+ end,
+ FiniF = fun(_) -> ok end,
+ run_sched_workers(InitF,ExecF,FiniF,NumOfObjs),
+ 0 = ets:info(T,size),
+ true = ets:info(T,fixed),
+ Buckets = num_of_buckets(T),
+ case ets:info(T,type) of
+ set -> NumOfObjs = get_kept_objects(T);
+ _ -> ok
+ end,
+ ets:safe_fixtable(T,false),
+ %% Will fail as unfix does not shrink the table:
+ %%Mem = ets:info(T,memory),
+ %%verify_table_load(T),
+ ets:delete(T).
%% ERL-720
%% Provoke race between ets:delete and table unfix (by select_count)
@@ -5928,8 +5926,10 @@ verify_table_load(T) ->
otp_8732(Config) when is_list(Config) ->
repeat_for_all_ord_set_table_types(
fun(Opts) ->
- Tab = ets_new(noname,Opts),
- filltabstr(Tab,999),
+ KeyRange = 999,
+ KeyFun = fun(K) -> integer_to_list(K) end,
+ Tab = ets_new(noname,Opts, KeyRange, KeyFun),
+ filltabstr(Tab, KeyRange),
ets:insert(Tab,{[],"nasty NIL object"}),
[] = ets:match(Tab,{'_',nomatch}) %% Will hang if bug not fixed
end),
@@ -5939,11 +5939,14 @@ otp_8732(Config) when is_list(Config) ->
%% Run concurrent select_delete (and inserts) on same table.
smp_select_delete(Config) when is_list(Config) ->
repeat_for_opts(fun smp_select_delete_do/1,
- [[set,ordered_set,stim_cat_ord_set], read_concurrency, compressed]).
+ [[set,ordered_set,stim_cat_ord_set],
+ read_concurrency, compressed]).
smp_select_delete_do(Opts) ->
+ KeyRange = 10000,
begin % indentation
- T = ets_new(smp_select_delete,[named_table,public,{write_concurrency,true}|Opts]),
+ T = ets_new(smp_select_delete,[named_table,public,{write_concurrency,true}|Opts],
+ KeyRange),
Mod = 17,
Zeros = erlang:make_tuple(Mod,0),
InitF = fun(_) -> Zeros end,
@@ -5960,7 +5963,7 @@ smp_select_delete_do(Opts) ->
element(Eq+1,Diffs0) - Deleted),
Diffs1;
_ ->
- Key = rand:uniform(10000),
+ Key = rand:uniform(KeyRange),
Eq = Key rem Mod,
case ets:insert_new(T,{Key,Key}) of
true ->
@@ -6004,12 +6007,13 @@ smp_select_replace(Config) when is_list(Config) ->
[[set,ordered_set,stim_cat_ord_set,duplicate_bag]]).
smp_select_replace_do(Opts) ->
+ KeyRange = 20,
T = ets_new(smp_select_replace,
- [public, {write_concurrency, true} | Opts]),
- ObjCount = 20,
+ [public, {write_concurrency, true} | Opts],
+ KeyRange),
InitF = fun (_) -> 0 end,
ExecF = fun (Cnt0) ->
- CounterId = rand:uniform(ObjCount),
+ CounterId = rand:uniform(KeyRange),
Match = [{{'$1', '$2'},
[{'=:=', '$1', CounterId}],
[{{'$1', {'+', '$2', 1}}}]}],
@@ -6033,11 +6037,128 @@ smp_select_replace_do(Opts) ->
FinalCounts = ets:select(T, [{{'_', '$1'}, [], ['$1']}]),
Total = lists:sum(FinalCounts),
Total = lists:sum(Results),
- ObjCount = ets:select_delete(T, [{{'_', '_'}, [], [true]}]),
+ KeyRange = ets:select_delete(T, [{{'_', '_'}, [], [true]}]),
0 = ets:info(T, size),
true = ets:delete(T),
ok.
+%% Iterate ordered_set with write_concurrency
+%% and make sure we hit all "stable" long lived keys
+%% while "volatile" objects are randomly inserted and deleted.
+smp_ordered_iteration(Config) when is_list(Config) ->
+ repeat_for_opts(fun smp_ordered_iteration_do/1,
+ [[cat_ord_set,stim_cat_ord_set]]).
+
+
+smp_ordered_iteration_do(Opts) ->
+ KeyRange = 1000,
+ KeyFun = fun(K, Type) ->
+ {K div 10, K rem 10, Type}
+ end,
+ StimKeyFun = fun(K) ->
+ KeyFun(K, element(rand:uniform(3),
+ {stable, other, volatile}))
+ end,
+ T = ets_new(smp_ordered_iteration, [public, {write_concurrency,true} | Opts],
+ KeyRange, StimKeyFun),
+ NStable = KeyRange div 4,
+ prefill_table(T, KeyRange, NStable, fun(K) -> {KeyFun(K, stable), 0} end),
+ NStable = ets:info(T, size),
+ NVolatile = KeyRange div 2,
+ prefill_table(T, KeyRange, NVolatile, fun(K) -> {KeyFun(K, volatile), 0} end),
+
+ InitF = fun (_) -> #{} end,
+ ExecF = fun (Counters) ->
+ K = rand:uniform(KeyRange),
+ Key = KeyFun(K, volatile),
+ Acc = case rand:uniform(22) of
+ R when R =< 10 ->
+ ets:insert(T, {Key}),
+ incr_counter(insert, Counters);
+ R when R =< 15 ->
+ ets:delete(T, Key),
+ incr_counter(delete, Counters);
+ R when R =< 19 ->
+ %% Delete bound key
+ ets:select_delete(T, [{{Key, '_'}, [], [true]}]),
+ incr_counter(select_delete_bk, Counters);
+ R when R =< 20 ->
+ %% Delete partially bound key
+ ets:select_delete(T, [{{{K div 10, '_', volatile}, '_'}, [], [true]}]),
+ incr_counter(select_delete_pbk, Counters);
+ R when R =< 21 ->
+ %% Replace bound key
+ ets:select_replace(T, [{{Key, '$1'}, [],
+ [{{{const,Key}, {'+','$1',1}}}]}]),
+ incr_counter(select_replace_bk, Counters);
+ _ ->
+ %% Replace partially bound key
+ ets:select_replace(T, [{{{K div 10, '_', volatile}, '$1'}, [],
+ [{{{element,1,'$_'}, {'+','$1',1}}}]}]),
+ incr_counter(select_replace_pbk, Counters)
+ end,
+ receive stop ->
+ [end_of_work | Acc]
+ after 0 ->
+ Acc
+ end
+ end,
+ FiniF = fun (Acc) -> Acc end,
+ Pids = run_sched_workers(InitF, ExecF, FiniF, infinite),
+ timer:send_after(1000, stop),
+ Rounds = fun Loop(N) ->
+ NStable = ets:select_count(T, [{{{'_', '_', stable}, '_'}, [], [true]}]),
+ NStable = count_stable(T, next, ets:first(T), 0),
+ NStable = count_stable(T, prev, ets:last(T), 0),
+ NStable = length(ets:select(T, [{{{'_', '_', stable}, '_'}, [], [true]}])),
+ NStable = length(ets:select_reverse(T, [{{{'_', '_', stable}, '_'}, [], [true]}])),
+ NStable = ets_select_chunks_count(T, [{{{'_', '_', stable}, '_'}, [], [true]}],
+ rand:uniform(5)),
+ receive stop -> N
+ after 0 -> Loop(N+1)
+ end
+ end (1),
+ [P ! stop || P <- Pids],
+ Results = wait_pids(Pids),
+ io:format("Ops = ~p\n", [maps_sum(Results)]),
+ io:format("Diff = ~p\n", [ets:info(T,size) - NStable - NVolatile]),
+ io:format("Stats = ~p\n", [ets:info(T,stats)]),
+ io:format("Rounds = ~p\n", [Rounds]),
+ true = ets:delete(T),
+ ok.
+
+incr_counter(Name, Counters) ->
+ Counters#{Name => maps:get(Name, Counters, 0) + 1}.
+
+count_stable(T, Next, {_, _, stable}=Key, N) ->
+ count_stable(T, Next, ets:Next(T, Key), N+1);
+count_stable(T, Next, {_, _, volatile}=Key, N) ->
+ count_stable(T, Next, ets:Next(T, Key), N);
+count_stable(_, _, '$end_of_table', N) ->
+ N.
+
+ets_select_chunks_count(T, MS, Chunk) ->
+ ets_select_chunks_count(ets:select(T, MS, Chunk), 0).
+
+ets_select_chunks_count('$end_of_table', N) ->
+ N;
+ets_select_chunks_count({List, Continuation}, N) ->
+ ets_select_chunks_count(ets:select(Continuation),
+ length(List) + N).
+
+maps_sum([Ma | Tail]) when is_map(Ma) ->
+ maps_sum([lists:sort(maps:to_list(Ma)) | Tail]);
+maps_sum([La, Mb | Tail]) ->
+ Lab = lists:zipwith(fun({K,Va}, {K,Vb}) -> {K,Va+Vb} end,
+ La,
+ lists:sort(maps:to_list(Mb))),
+ maps_sum([Lab | Tail]);
+maps_sum([L]) ->
+ L.
+
+
+
+
%% Test different types.
types(Config) when is_list(Config) ->
init_externals(),
@@ -6307,19 +6428,18 @@ do_work(WorksDoneSoFar, Table, ProbHelpTab, Range, Operations) ->
do_work(WorksDoneSoFar + 1, Table, ProbHelpTab, Range, Operations)
end.
-prefill_table(T, KeyRange, Num) ->
+prefill_table(T, KeyRange, Num, ObjFun) ->
Seed = rand:uniform(KeyRange),
%%io:format("prefill_table: Seed = ~p\n", [Seed]),
RState = unique_rand_start(KeyRange, Seed),
- prefill_table_loop(T, RState, Num),
- Num = ets:info(T, size).
+ prefill_table_loop(T, RState, Num, ObjFun).
-prefill_table_loop(_, _, 0) ->
+prefill_table_loop(_, _, 0, _) ->
ok;
-prefill_table_loop(T, RS0, N) ->
+prefill_table_loop(T, RS0, N, ObjFun) ->
{Key, RS1} = unique_rand_next(RS0),
- ets:insert(T, {Key}),
- prefill_table_loop(T, RS1, N-1).
+ ets:insert(T, ObjFun(Key)),
+ prefill_table_loop(T, RS1, N-1, ObjFun).
throughput_benchmark() ->
throughput_benchmark(false, not_set, not_set).
@@ -6445,7 +6565,9 @@ throughput_benchmark(TestMode, BenchmarkRunMs, RecoverTimeMs) ->
Range, Duration, RecoverTime) ->
ProbHelpTab = CalculateOpsProbHelpTab(Scenario, 0),
Table = ets:new(t, TableConfig),
- prefill_table(Table, Range, Range div 2),
+ Nobj = Range div 2,
+ prefill_table(Table, Range, Nobj, fun(K) -> {K} end),
+ Nobj = ets:info(Table, size),
SafeFixTableIfRequired(Table, Scenario, true),
ParentPid = self(),
ChildPids =
@@ -7324,20 +7446,46 @@ is_redundant_opts_combo(Opts) ->
lists:member(private, Opts) orelse
lists:member(protected, Opts)).
-ets_new(Name, Opts) ->
- ReplaceStimOrdSetHelper =
- fun (MOpts) ->
- lists:map(fun (I) ->
- case I of
- stim_cat_ord_set -> ordered_set;
- cat_ord_set -> ordered_set;
- _ -> I
- end
- end, MOpts)
- end,
+%% Add fake table option with info about key range.
+%% Will be consumed by ets_new and used for stim_cat_ord_set.
+key_range(Opts, KeyRange) ->
+ [{key_range, KeyRange} | Opts].
+
+key_range_fun(Opts, KeyRange, KeyFun) ->
+ [{key_range, KeyRange}, {key_fun, KeyFun} | Opts].
+
+ets_new(Name, Opts0) ->
+ {KeyRange, Opts1} = case lists:keytake(key_range, 1, Opts0) of
+ {value, {key_range, KR}, Rest1} ->
+ {KR, Rest1};
+ false ->
+ {1000*1000, Opts0}
+ end,
+ ets_new(Name, Opts1, KeyRange).
+
+ets_new(Name, Opts1, KeyRange) ->
+ {KeyFun, Opts2} = case lists:keytake(key_fun, 1, Opts1) of
+ {value, {key_fun, KF}, Rest2} ->
+ {KF, Rest2};
+ false ->
+ {fun id/1, Opts1}
+ end,
+ ets_new(Name, Opts2, KeyRange, KeyFun).
+
+ets_new(Name, Opts0, KeyRange, KeyFun) ->
+ {CATree, Stimulate, RevOpts} =
+ lists:foldl(fun(cat_ord_set, {false, false, Lacc}) ->
+ {true, false, [ordered_set | Lacc]};
+ (stim_cat_ord_set, {false, false, Lacc}) ->
+ {true, true, [ordered_set | Lacc]};
+ (Other, {CAT, STIM, Lacc}) ->
+ {CAT, STIM, [Other | Lacc]}
+ end,
+ {false, false, []},
+ Opts0),
+ Opts = lists:reverse(RevOpts),
EtsNewHelper =
- fun (MOpts) ->
- UseOpts = ReplaceStimOrdSetHelper(MOpts),
+ fun (UseOpts) ->
case get(ets_new_opts) of
UseOpts ->
silence; %% suppress identical table opts spam
@@ -7347,8 +7495,7 @@ ets_new(Name, Opts) ->
end,
ets:new(Name, UseOpts)
end,
- case (lists:member(stim_cat_ord_set, Opts) or
- lists:member(cat_ord_set, Opts)) andalso
+ case CATree andalso
(not lists:member({write_concurrency, false}, Opts)) andalso
(not lists:member(private, Opts)) andalso
(not lists:member(protected, Opts)) of
@@ -7364,62 +7511,54 @@ ets_new(Name, Opts) ->
false -> [public|NewOpts1]
end,
T = EtsNewHelper(NewOpts2),
- case lists:member(stim_cat_ord_set, Opts) of
- true -> stimulate_contention(T);
- false -> ok
+ case Stimulate of
+ false -> ok;
+ true -> stimulate_contention(T, KeyRange, KeyFun)
end,
T;
false ->
EtsNewHelper(Opts)
end.
-% This function do the following to the input ETS table:
-% 1. Perform a number of concurrent insert operations
-% 2. Remove all inserted items
-%
% The purpose of this function is to stimulate fine grained locking in
% tables of types ordered_set with the write_concurrency options
-% turned on. Such tables are implemented as CA trees* and thus
-% activates fine grained locking only when lock contention is
-% detected.
-%
-% A Contention Adapting Approach to Concurrent Ordered Sets
-% Journal of Parallel and Distributed Computing, 2018
-% Kjell Winblad and Konstantinos Sagonas
-% https://doi.org/10.1016/j.jpdc.2017.11.007
-stimulate_contention(T) ->
- NrOfSchedulers = erlang:system_info(schedulers),
- ParentPid = self(),
- KeyRange = 100000,
- ChildPids =
- lists:map(fun(_N) ->
- spawn(fun() ->
- receive start -> ok end,
- stimulate_contention_do_inserts(T, KeyRange, 0),
- ParentPid ! done
- end)
- end, lists:seq(1, NrOfSchedulers)),
- lists:foreach(fun(Pid) -> Pid ! start end, ChildPids),
- timer:sleep(100),
- lists:foreach(fun(Pid) -> Pid ! stop end, ChildPids),
- lists:foreach(fun(_P) -> receive done -> ok end end, ChildPids),
- lists:foreach(fun(N) -> ets:delete(T, N) end, lists:seq(0, KeyRange)).
-
-
-
-stimulate_contention_do_inserts(T, KeyRange, 0) ->
- OpsBetweenStopCheck = 10000,
- receive
- stop -> ok
- after
- 0 -> stimulate_contention_do_inserts(T, KeyRange, OpsBetweenStopCheck)
- end;
-stimulate_contention_do_inserts(T, KeyRange, OpsToNextStopCheck) ->
- R = trunc(KeyRange * rand:uniform()),
- ets:insert(T,{R,R,R}),
- stimulate_contention_do_inserts(T, KeyRange, OpsToNextStopCheck - 1).
-
+% turned on. The erts_debug feature 'ets_force_split' is used to easier
+% generate a routing tree with fine grained locking without having to
+% provoke lots of actual lock contentions.
+stimulate_contention(Tid, KeyRange, KeyFun) ->
+ T = case Tid of
+ A when is_atom(A) -> ets:whereis(A);
+ _ -> Tid
+ end,
+ erts_debug:set_internal_state(ets_force_split, {T, true}),
+ Num = case KeyRange > 50 of
+ true -> 50;
+ false -> KeyRange
+ end,
+ Seed = rand:uniform(KeyRange),
+ %%io:format("prefill_table: Seed = ~p\n", [Seed]),
+ RState = unique_rand_start(KeyRange, Seed),
+ stim_inserter_loop(T, RState, Num, KeyFun),
+ Num = ets:info(T, size),
+ ets:match_delete(T, {'$1','$1','$1'}),
+ 0 = ets:info(T, size),
+ erts_debug:set_internal_state(ets_force_split, {T, false}),
+ case ets:info(T,stats) of
+ {0, _, _} ->
+ io:format("No routing nodes in table?\n"
+ "Debug feature 'ets_force_split' does not seem to work.\n", []),
+ ct:fail("No ets_force_split?");
+ Stats ->
+ io:format("stimulated ordered_set: ~p\n", [Stats])
+ end.
+stim_inserter_loop(_, _, 0, _) ->
+ ok;
+stim_inserter_loop(T, RS0, N, KeyFun) ->
+ {K, RS1} = unique_rand_next(RS0),
+ Key = KeyFun(K),
+ ets:insert(T, {Key, Key, Key}),
+ stim_inserter_loop(T, RS1, N-1, KeyFun).
do_tc(Do, Report) ->
T1 = erlang:monotonic_time(),
@@ -7478,4 +7617,5 @@ dquad(Prime, X, Seed) ->
%% Primes where P rem 4 == 3.
primes_3mod4() ->
[103, 211, 503, 1019, 2003, 5003, 10007, 20011, 50023,
- 100003, 200003, 500083, 1000003, 2000003].
+ 100003, 200003, 500083, 1000003, 2000003, 5000011,
+ 10000019, 20000003, 50000047, 100000007].