aboutsummaryrefslogtreecommitdiffstats
path: root/erts
diff options
context:
space:
mode:
authorSverker Eriksson <[email protected]>2018-09-21 14:22:32 +0200
committerGitHub <[email protected]>2018-09-21 14:22:32 +0200
commitf26d11aa57aff58eec7ba743916981964da711ca (patch)
treee09ea7f7e9141fa357593ea88c53f0bf73ec3e78 /erts
parent6e014f7780903757119ef8f20b881613cdfe8d42 (diff)
parent9c96967fbc6286f27b9be8b04afcfe34b362b2ef (diff)
downloadotp-f26d11aa57aff58eec7ba743916981964da711ca.tar.gz
otp-f26d11aa57aff58eec7ba743916981964da711ca.tar.bz2
otp-f26d11aa57aff58eec7ba743916981964da711ca.zip
Merge PR-1952 from kjellwinblad/ca_tree_pull_request
Add a more scalable ETS ordered_set implementation
Diffstat (limited to 'erts')
-rw-r--r--erts/emulator/Makefile.in2
-rw-r--r--erts/emulator/beam/erl_db.c40
-rw-r--r--erts/emulator/beam/erl_db.h2
-rw-r--r--erts/emulator/beam/erl_db_catree.c2174
-rw-r--r--erts/emulator/beam/erl_db_catree.h91
-rw-r--r--erts/emulator/beam/erl_db_tree.c914
-rw-r--r--erts/emulator/beam/erl_db_tree_util.h151
-rw-r--r--erts/emulator/beam/erl_db_util.h29
-rw-r--r--erts/emulator/beam/erl_lock_check.c54
-rw-r--r--erts/emulator/beam/erl_lock_check.h4
10 files changed, 3064 insertions, 397 deletions
diff --git a/erts/emulator/Makefile.in b/erts/emulator/Makefile.in
index 054692819e..eeb9577a33 100644
--- a/erts/emulator/Makefile.in
+++ b/erts/emulator/Makefile.in
@@ -874,7 +874,7 @@ RUN_OBJS += \
$(OBJDIR)/erl_thr_queue.o $(OBJDIR)/erl_sched_spec_pre_alloc.o \
$(OBJDIR)/erl_ptab.o $(OBJDIR)/erl_map.o \
$(OBJDIR)/erl_msacc.o $(OBJDIR)/erl_lock_flags.o \
- $(OBJDIR)/erl_io_queue.o
+ $(OBJDIR)/erl_io_queue.o $(OBJDIR)/erl_db_catree.o
LTTNG_OBJS = $(OBJDIR)/erlang_lttng.o
NIF_OBJS = \
diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c
index c009a3bde8..1df972f4b6 100644
--- a/erts/emulator/beam/erl_db.c
+++ b/erts/emulator/beam/erl_db.c
@@ -90,7 +90,8 @@ enum DbIterSafety {
ITER_SAFE /* No need to fixate at all */
};
# define ITERATION_SAFETY(Proc,Tab) \
- ((IS_TREE_TABLE((Tab)->common.status) || ONLY_WRITER(Proc,Tab)) ? ITER_SAFE \
+ ((IS_TREE_TABLE((Tab)->common.status) || IS_CATREE_TABLE((Tab)->common.status) \
+ || ONLY_WRITER(Proc,Tab)) ? ITER_SAFE \
: (((Tab)->common.status & DB_FINE_LOCKED) ? ITER_UNSAFE : ITER_SAFE_LOCKED))
#define DID_TRAP(P,Ret) (!is_value(Ret) && ((P)->freason == TRAP))
@@ -359,6 +360,7 @@ typedef enum {
extern DbTableMethod db_hash;
extern DbTableMethod db_tree;
+extern DbTableMethod db_catree;
int user_requested_db_max_tabs;
int erts_ets_realloc_always_moves;
@@ -414,6 +416,15 @@ free_dbtable(void *vtb)
tb->common.fixations);
}
#endif
+ if (erts_atomic_read_nob(&tb->common.memory_size) > sizeof(DbTable)) {
+ /* The CA tree implementation use delayed freeing and the DbTable needs to
+ be freed after all other memory blocks that are allocated by the table. */
+ erts_schedule_thr_prgr_later_cleanup_op(free_dbtable,
+ (void *) tb,
+ &tb->release.data,
+ sizeof(DbTable));
+ return;
+ }
erts_rwmtx_destroy(&tb->common.rwlock);
erts_mtx_destroy(&tb->common.fixlock);
ASSERT(is_immed(tb->common.heir_data));
@@ -1076,7 +1087,7 @@ BIF_RETTYPE ets_update_element_3(BIF_ALIST_3)
DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_update_element_3);
UseTmpHeap(2,BIF_P);
- if (!(tb->common.status & (DB_SET | DB_ORDERED_SET))) {
+ if (!(tb->common.status & (DB_SET | DB_ORDERED_SET | DB_CA_ORDERED_SET))) {
goto bail_out;
}
if (is_tuple(BIF_ARG_3)) {
@@ -1165,7 +1176,7 @@ do_update_counter(Process *p, DbTable* tb,
UseTmpHeap(5, p);
- if (!(tb->common.status & (DB_SET | DB_ORDERED_SET))) {
+ if (!(tb->common.status & (DB_SET | DB_ORDERED_SET | DB_CA_ORDERED_SET))) {
goto bail_out;
}
if (is_integer(arg3)) { /* Incr */
@@ -1647,15 +1658,15 @@ BIF_RETTYPE ets_new_2(BIF_ALIST_2)
val = CAR(list_val(list));
if (val == am_bag) {
status |= DB_BAG;
- status &= ~(DB_SET | DB_DUPLICATE_BAG | DB_ORDERED_SET);
+ status &= ~(DB_SET | DB_DUPLICATE_BAG | DB_ORDERED_SET | DB_CA_ORDERED_SET);
}
else if (val == am_duplicate_bag) {
status |= DB_DUPLICATE_BAG;
- status &= ~(DB_SET | DB_BAG | DB_ORDERED_SET);
+ status &= ~(DB_SET | DB_BAG | DB_ORDERED_SET | DB_CA_ORDERED_SET);
}
else if (val == am_ordered_set) {
status |= DB_ORDERED_SET;
- status &= ~(DB_SET | DB_BAG | DB_DUPLICATE_BAG);
+ status &= ~(DB_SET | DB_BAG | DB_DUPLICATE_BAG | DB_CA_ORDERED_SET);
}
else if (is_tuple(val)) {
Eterm *tp = tuple_val(val);
@@ -1716,7 +1727,13 @@ BIF_RETTYPE ets_new_2(BIF_ALIST_2)
if (is_not_nil(list)) { /* bad opt or not a well formed list */
BIF_ERROR(BIF_P, BADARG);
}
- if (IS_HASH_TABLE(status)) {
+ if (IS_TREE_TABLE(status) && is_fine_locked && !(status & DB_PRIVATE)) {
+ meth = &db_catree;
+ status |= DB_CA_ORDERED_SET;
+ status &= ~(DB_SET | DB_BAG | DB_DUPLICATE_BAG | DB_ORDERED_SET);
+ status |= DB_FINE_LOCKED;
+ }
+ else if (IS_HASH_TABLE(status)) {
meth = &db_hash;
if (is_fine_locked && !(status & DB_PRIVATE)) {
status |= DB_FINE_LOCKED;
@@ -3506,6 +3523,7 @@ void init_db(ErtsDbSpinCount db_spin_count)
db_initialize_hash();
db_initialize_tree();
+ db_initialize_catree();
/* Non visual BIF to trap to. */
erts_init_trap_export(&ets_select_delete_continue_exp,
@@ -4114,6 +4132,8 @@ static Eterm table_info(Process* p, DbTable* tb, Eterm What)
ret = am_duplicate_bag;
} else if (tb->common.status & DB_ORDERED_SET) {
ret = am_ordered_set;
+ } else if (tb->common.status & DB_CA_ORDERED_SET) {
+ ret = am_ordered_set;
} else { /*TT*/
ASSERT(tb->common.status & DB_BAG);
ret = am_bag;
@@ -4409,6 +4429,12 @@ void erts_lcnt_enable_db_lock_count(DbTable *tb, int enable) {
if(IS_HASH_TABLE(tb->common.status)) {
erts_lcnt_enable_db_hash_lock_count(&tb->hash, enable);
+ } else if(IS_CATREE_TABLE(tb->common.status)) {
+ /* erts_lcnt_enable_db_catree_lock_count is not thread safe so
+ the table needs to get locked */
+ db_lock(tb, LCK_WRITE);
+ erts_lcnt_enable_db_catree_lock_count(&tb->catree, enable);
+ db_unlock(tb, LCK_WRITE);
}
}
diff --git a/erts/emulator/beam/erl_db.h b/erts/emulator/beam/erl_db.h
index 23975d208f..45d120ac0e 100644
--- a/erts/emulator/beam/erl_db.h
+++ b/erts/emulator/beam/erl_db.h
@@ -66,6 +66,7 @@ typedef struct {
#include "erl_db_util.h" /* Flags */
#include "erl_db_hash.h" /* DbTableHash */
#include "erl_db_tree.h" /* DbTableTree */
+#include "erl_db_catree.h" /* DbTableCATree */
/*TT*/
Uint erts_get_ets_misc_mem_size(void);
@@ -90,6 +91,7 @@ union db_table {
DbTableCommon common; /* Any type of db table */
DbTableHash hash; /* Linear hash array specific data */
DbTableTree tree; /* AVL tree specific data */
+ DbTableCATree catree; /* CA tree specific data */
DbTableRelease release;
/*TT*/
};
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c
new file mode 100644
index 0000000000..37a299df35
--- /dev/null
+++ b/erts/emulator/beam/erl_db_catree.c
@@ -0,0 +1,2174 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB and Kjell Winblad 1998-2018. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+/*
+ * Description: Implementation of ETS ordered_set table type with
+ * fine-grained synchronization.
+ *
+ * Author: Kjell Winblad
+ *
+ * This implementation is based on the contention adapting search tree
+ * (CA tree). The CA tree is a concurrent data structure that
+ * dynamically adapts its synchronization granularity based on how
+ * much contention is detected in locks. The following publication
+ * contains a detailed description of CA trees:
+ *
+ * A Contention Adapting Approach to Concurrent Ordered Sets
+ * Journal of Parallel and Distributed Computing, 2018
+ * Kjell Winblad and Konstantinos Sagonas
+ * https://doi.org/10.1016/j.jpdc.2017.11.007
+ *
+ * The following publication may also be interesting as it discusses
+ * how the CA tree can be used as an ETS ordered_set table type
+ * backend:
+ *
+ * More Scalable Ordered Set for ETS Using Adaptation
+ * In Thirteenth ACM SIGPLAN workshop on Erlang (2014)
+ * Kjell Winblad and Konstantinos Sagonas
+ * https://doi.org/10.1145/2633448.2633455
+ *
+ * This implementation of the ordered_set ETS table type is only
+ * activated when the options {write_concurrency, true}, public and
+ * ordered_set are passed to the ets:new/2 function. This
+ * implementation is expected to scale better than the default
+ * implementation (located in "erl_db_tree.c") when concurrent
+ * processes use the following ETS operations to operate on a table:
+ *
+ * delete/2, delete_object/2, first/1, insert/2 (single object),
+ * insert_new/2 (single object), lookup/2, lookup_element/2, member/2,
+ * next/2, take/2 and update_element/3 (single object).
+ *
+ * Currently, the implementation does not have scalable support for
+ * the other operations (e.g., select/2). These operations are handled
+ * by merging all locks so that all terms get protected by a single
+ * lock. This implementation may thus perform worse than the default
+ * implementation in some scenarios. For example, when concurrent
+ * processes access a table with the operations insert/2, delete/2 and
+ * select/2, the insert/2 and delete/2 operations will trigger splits
+ * of locks (to get more fine-grained synchronization) but this will
+ * quickly be undone by the select/2 operation if this operation is
+ * also called frequently.
+ *
+ * The default implementation has a static stack optimization (see
+ * get_static_stack in erl_db_tree.c). This implementation does not
+ * have such an optimization as it induces bad scalability when
+ * concurrent read operations are frequent (they all try to get hold
+ * of the same stack). The default implementation may thus perform
+ * better compared to this implementation in scenarios where the
+ * static stack optimization is useful. One such scenario is when only
+ * one process is accessing the table and this process is traversing
+ * the table with a sequence of next/2 calls.
+ */
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include "sys.h"
+#include "erl_vm.h"
+#include "global.h"
+#include "erl_process.h"
+#include "error.h"
+#define ERTS_WANT_DB_INTERNAL__
+#include "erl_db.h"
+#include "bif.h"
+#include "big.h"
+#include "erl_binary.h"
+
+#include "erl_db_catree.h"
+#include "erl_db_tree.h"
+#include "erl_db_tree_util.h"
+
+/*
+** Forward declarations
+*/
+
+static SWord do_free_base_node_cont(DbTableCATree *tb, SWord num_left);
+static SWord do_free_routing_nodes_catree_cont(DbTableCATree *tb, SWord num_left);
+static DbTableCATreeNode *catree_first_base_node_from_free_list(DbTableCATree *tb);
+
+/* Method interface functions */
+static int db_first_catree(Process *p, DbTable *tbl,
+ Eterm *ret);
+static int db_next_catree(Process *p, DbTable *tbl,
+ Eterm key, Eterm *ret);
+static int db_last_catree(Process *p, DbTable *tbl,
+ Eterm *ret);
+static int db_prev_catree(Process *p, DbTable *tbl,
+ Eterm key,
+ Eterm *ret);
+static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail);
+static int db_get_catree(Process *p, DbTable *tbl,
+ Eterm key, Eterm *ret);
+static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret);
+static int db_get_element_catree(Process *p, DbTable *tbl,
+ Eterm key,int ndex,
+ Eterm *ret);
+static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret);
+static int db_erase_object_catree(DbTable *tbl, Eterm object,Eterm *ret);
+static int db_slot_catree(Process *p, DbTable *tbl,
+ Eterm slot_term, Eterm *ret);
+static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, int reversed, Eterm *ret);
+static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret);
+static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Sint chunk_size,
+ int reversed, Eterm *ret);
+static int db_select_continue_catree(Process *p, DbTable *tbl,
+ Eterm continuation, Eterm *ret);
+static int db_select_count_continue_catree(Process *p, DbTable *tbl,
+ Eterm continuation, Eterm *ret);
+static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret);
+static int db_select_delete_continue_catree(Process *p, DbTable *tbl,
+ Eterm continuation, Eterm *ret);
+static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret);
+static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
+ Eterm continuation, Eterm *ret);
+static int db_take_catree(Process *, DbTable *, Eterm, Eterm *);
+static void db_print_catree(fmtfn_t to, void *to_arg,
+ int show, DbTable *tbl);
+static int db_free_table_catree(DbTable *tbl);
+static SWord db_free_table_continue_catree(DbTable *tbl, SWord);
+static void db_foreach_offheap_catree(DbTable *,
+ void (*)(ErlOffHeap *, void *),
+ void *);
+static SWord db_delete_all_objects_catree(Process* p, DbTable* tbl, SWord reds);
+static int
+db_lookup_dbterm_catree(Process *, DbTable *, Eterm key, Eterm obj,
+ DbUpdateHandle*);
+static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *);
+
+/*
+** External interface
+*/
+DbTableMethod db_catree =
+{
+ db_create_catree,
+ db_first_catree,
+ db_next_catree,
+ db_last_catree,
+ db_prev_catree,
+ db_put_catree,
+ db_get_catree,
+ db_get_element_catree,
+ db_member_catree,
+ db_erase_catree,
+ db_erase_object_catree,
+ db_slot_catree,
+ db_select_chunk_catree,
+ db_select_catree,
+ db_select_delete_catree,
+ db_select_continue_catree,
+ db_select_delete_continue_catree,
+ db_select_count_catree,
+ db_select_count_continue_catree,
+ db_select_replace_catree,
+ db_select_replace_continue_catree,
+ db_take_catree,
+ db_delete_all_objects_catree,
+ db_free_table_catree,
+ db_free_table_continue_catree,
+ db_print_catree,
+ db_foreach_offheap_catree,
+ db_lookup_dbterm_catree,
+ db_finalize_dbterm_catree
+
+};
+
+/*
+ * Constants
+ */
+
+#define ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION 200
+#define ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION (-1)
+#define ERL_DB_CATREE_LOCK_MORE_THAN_ONE_CONTRIBUTION (-10)
+#define ERL_DB_CATREE_HIGH_CONTENTION_LIMIT 1000
+#define ERL_DB_CATREE_LOW_CONTENTION_LIMIT (-1000)
+#define ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT 14
+
+/*
+ * Internal CA tree related helper functions and macros
+ */
+
+#define GET_ROUTE_NODE_KEY(node) (node->baseOrRoute.route.key.tpl[0])
+#define GET_BASE_NODE_LOCK(node) (&(node->baseOrRoute.base.lock))
+#define GET_ROUTE_NODE_LOCK(node) (&(node->baseOrRoute.route.lock))
+
+
+/* Helpers for reading and writing shared atomic variables */
+
+/* No memory barrier */
+#define GET_ROOT(tb) ((DbTableCATreeNode*)erts_atomic_read_nob(&(tb->root)))
+#define GET_LEFT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->baseOrRoute.route.left)))
+#define GET_RIGHT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->baseOrRoute.route.right)))
+#define SET_ROOT(tb, v) erts_atomic_set_nob(&((tb)->root), (erts_aint_t)(v))
+#define SET_LEFT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->baseOrRoute.route.left), (erts_aint_t)(v));
+#define SET_RIGHT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->baseOrRoute.route.right), (erts_aint_t)(v));
+
+
+/* Release or acquire barriers */
+#define GET_ROOT_ACQB(tb) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(tb->root)))
+#define GET_LEFT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->baseOrRoute.route.left)))
+#define GET_RIGHT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->baseOrRoute.route.right)))
+#define SET_ROOT_RELB(tb, v) erts_atomic_set_relb(&((tb)->root), (erts_aint_t)(v))
+#define SET_LEFT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->baseOrRoute.route.left), (erts_aint_t)(v));
+#define SET_RIGHT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->baseOrRoute.route.right), (erts_aint_t)(v));
+
+/* Compares a key to the key in a route node */
+static ERTS_INLINE Sint cmp_key_route(DbTableCommon * tb,
+ Eterm key,
+ DbTableCATreeNode *obj)
+{
+ return CMP(key, GET_ROUTE_NODE_KEY(obj));
+}
+
+static ERTS_INLINE void push_node_dyn_array(DbTable *tb,
+ CATreeNodeStack *stack,
+ DbTableCATreeNode *node)
+{
+ int i;
+ if (stack->pos == stack->size) {
+ DbTableCATreeNode **newArray =
+ erts_db_alloc(ERTS_ALC_T_DB_STK, tb,
+ sizeof(DbTableCATreeNode*) * (stack->size*2));
+ for (i = 0; i < stack->pos; i++) {
+ newArray[i] = stack->array[i];
+ }
+ if (stack->size > STACK_NEED) {
+ /* Dynamically allocated array that needs to be deallocated */
+ erts_db_free(ERTS_ALC_T_DB_STK, tb,
+ stack->array,
+ sizeof(DbTableCATreeNode *) * stack->size);
+ }
+ stack->array = newArray;
+ stack->size = stack->size*2;
+ }
+ PUSH_NODE(stack, node);
+}
+
+/*
+ * Used by the split_tree function
+ */
+static ERTS_INLINE
+int less_than_two_elements(TreeDbTerm *root)
+{
+ return root == NULL || (root->left == NULL && root->right == NULL);
+}
+
+/*
+ * Inserts a TreeDbTerm into a tree. Returns the new root.
+ */
+static ERTS_INLINE
+TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data,
+ TreeDbTerm *insert_to_root,
+ TreeDbTerm *value_to_insert) {
+ /* Non recursive insertion in AVL tree, building our own stack */
+ TreeDbTerm **tstack[STACK_NEED];
+ int tpos = 0;
+ int dstack[STACK_NEED+1];
+ int dpos = 0;
+ int state = 0;
+ TreeDbTerm * base = insert_to_root;
+ TreeDbTerm **this = &base;
+ Sint c;
+ Eterm key;
+ int dir;
+ TreeDbTerm *p1, *p2, *p;
+
+ key = GETKEY(common_table_data, value_to_insert->dbterm.tpl);
+
+ dstack[dpos++] = DIR_END;
+ for (;;)
+ if (!*this) { /* Found our place */
+ state = 1;
+ *this = value_to_insert;
+ (*this)->balance = 0;
+ (*this)->left = (*this)->right = NULL;
+ break;
+ } else if ((c = cmp_key(common_table_data, key, *this)) < 0) {
+ /* go lefts */
+ dstack[dpos++] = DIR_LEFT;
+ tstack[tpos++] = this;
+ this = &((*this)->left);
+ } else { /* go right */
+ dstack[dpos++] = DIR_RIGHT;
+ tstack[tpos++] = this;
+ this = &((*this)->right);
+ }
+
+ while (state && ( dir = dstack[--dpos] ) != DIR_END) {
+ this = tstack[--tpos];
+ p = *this;
+ if (dir == DIR_LEFT) {
+ switch (p->balance) {
+ case 1:
+ p->balance = 0;
+ state = 0;
+ break;
+ case 0:
+ p->balance = -1;
+ break;
+ case -1: /* The icky case */
+ p1 = p->left;
+ if (p1->balance == -1) { /* Single LL rotation */
+ p->left = p1->right;
+ p1->right = p;
+ p->balance = 0;
+ (*this) = p1;
+ } else { /* Double RR rotation */
+ p2 = p1->right;
+ p1->right = p2->left;
+ p2->left = p1;
+ p->left = p2->right;
+ p2->right = p;
+ p->balance = (p2->balance == -1) ? +1 : 0;
+ p1->balance = (p2->balance == 1) ? -1 : 0;
+ (*this) = p2;
+ }
+ (*this)->balance = 0;
+ state = 0;
+ break;
+ }
+ } else { /* dir == DIR_RIGHT */
+ switch (p->balance) {
+ case -1:
+ p->balance = 0;
+ state = 0;
+ break;
+ case 0:
+ p->balance = 1;
+ break;
+ case 1:
+ p1 = p->right;
+ if (p1->balance == 1) { /* Single RR rotation */
+ p->right = p1->left;
+ p1->left = p;
+ p->balance = 0;
+ (*this) = p1;
+ } else { /* Double RL rotation */
+ p2 = p1->left;
+ p1->left = p2->right;
+ p2->right = p1;
+ p->right = p2->left;
+ p2->left = p;
+ p->balance = (p2->balance == 1) ? -1 : 0;
+ p1->balance = (p2->balance == -1) ? 1 : 0;
+ (*this) = p2;
+ }
+ (*this)->balance = 0;
+ state = 0;
+ break;
+ }
+ }
+ }
+ return base;
+}
+
+/*
+ * Split an AVL tree into two trees. The function stores the node
+ * containing the "split key" in the write back parameter
+ * split_key_wb. The function stores the left tree containing the keys
+ * that are smaller than the "split key" in the write back parameter
+ * left_wb and the tree containing the rest of the keys in the write
+ * back parameter right_wb.
+ */
+static void split_tree(DbTableCommon *tb,
+ TreeDbTerm *root,
+ TreeDbTerm **split_key_node_wb,
+ TreeDbTerm **left_wb,
+ TreeDbTerm **right_wb) {
+ TreeDbTerm * split_node = NULL;
+ TreeDbTerm * left_root;
+ TreeDbTerm * right_root;
+ if (root->left == NULL) { /* To get non empty split */
+ *right_wb = root->right;
+ *split_key_node_wb = root->right;
+ root->right = NULL;
+ root->balance = 0;
+ *left_wb = root;
+ return;
+ }
+ split_node = root;
+ left_root = split_node->left;
+ split_node->left = NULL;
+ right_root = split_node->right;
+ split_node->right = NULL;
+ right_root = insert_TreeDbTerm(tb,
+ right_root,
+ split_node);
+ *split_key_node_wb = split_node;
+ *left_wb = left_root;
+ *right_wb = right_root;
+}
+
+/*
+ * Used by the join_trees function
+ */
+static ERTS_INLINE int compute_tree_hight(TreeDbTerm * root)
+{
+ if(root == NULL) {
+ return 0;
+ } else {
+ TreeDbTerm * current_node = root;
+ int hight_so_far = 1;
+ while (current_node->left != NULL || current_node->right != NULL) {
+ if (current_node->balance == -1) {
+ current_node = current_node->left;
+ } else {
+ current_node = current_node->right;
+ }
+ hight_so_far = hight_so_far + 1;
+ }
+ return hight_so_far;
+ }
+}
+
+/*
+ * Used by the join_trees function
+ */
+static ERTS_INLINE
+TreeDbTerm* linkout_min_or_max_tree_node(TreeDbTerm **root, int is_min)
+{
+ TreeDbTerm **tstack[STACK_NEED];
+ int tpos = 0;
+ int dstack[STACK_NEED+1];
+ int dpos = 0;
+ int state = 0;
+ TreeDbTerm **this = root;
+ int dir;
+ TreeDbTerm *q = NULL;
+
+ dstack[dpos++] = DIR_END;
+ for (;;) {
+ if (!*this) { /* Failure */
+ return NULL;
+ } else if (is_min && (*this)->left != NULL) {
+ dstack[dpos++] = DIR_LEFT;
+ tstack[tpos++] = this;
+ this = &((*this)->left);
+ } else if (!is_min && (*this)->right != NULL) {
+ dstack[dpos++] = DIR_RIGHT;
+ tstack[tpos++] = this;
+ this = &((*this)->right);
+ } else { /* Min value, found the one to splice out */
+ q = (*this);
+ if (q->right == NULL) {
+ (*this) = q->left;
+ state = 1;
+ } else if (q->left == NULL) {
+ (*this) = q->right;
+ state = 1;
+ }
+ break;
+ }
+ }
+ while (state && ( dir = dstack[--dpos] ) != DIR_END) {
+ this = tstack[--tpos];
+ if (dir == DIR_LEFT) {
+ state = tree_balance_left(this);
+ } else {
+ state = tree_balance_right(this);
+ }
+ }
+ return q;
+}
+
+#define LINKOUT_MIN_TREE_NODE(root) linkout_min_or_max_tree_node(root, 1)
+#define LINKOUT_MAX_TREE_NODE(root) linkout_min_or_max_tree_node(root, 0)
+
+/*
+ * Joins two AVL trees where all the keys in the left one are smaller
+ * then the keys in the right one and returns the resulting tree.
+ *
+ * The algorithm is described on page 474 in D. E. Knuth. The Art of
+ * Computer Programming: Sorting and Searching,
+ * vol. 3. Addison-Wesley, 2nd edition, 1998.
+ */
+static TreeDbTerm* join_trees(TreeDbTerm *left_root_param,
+ TreeDbTerm *right_root_param)
+{
+ TreeDbTerm **tstack[STACK_NEED];
+ int tpos = 0;
+ int dstack[STACK_NEED+1];
+ int dpos = 0;
+ int state = 1;
+ TreeDbTerm **this;
+ int dir;
+ TreeDbTerm *p1, *p2, *p;
+ TreeDbTerm *left_root = left_root_param;
+ TreeDbTerm *right_root = right_root_param;
+ int left_height;
+ int right_height;
+ int current_height;
+ dstack[dpos++] = DIR_END;
+ if (left_root == NULL) {
+ return right_root;
+ } else if (right_root == NULL) {
+ return left_root;
+ }
+
+ left_height = compute_tree_hight(left_root);
+ right_height = compute_tree_hight(right_root);
+ if (left_height >= right_height) {
+ TreeDbTerm * new_root =
+ LINKOUT_MIN_TREE_NODE(&right_root);
+ int new_right_height = compute_tree_hight(right_root);
+ TreeDbTerm * current_node = left_root;
+ this = &left_root;
+ current_height = left_height;
+ while(current_height > new_right_height + 1) {
+ if (current_node->balance == -1) {
+ current_height = current_height - 2;
+ } else {
+ current_height = current_height - 1;
+ }
+ dstack[dpos++] = DIR_RIGHT;
+ tstack[tpos++] = this;
+ this = &((*this)->right);
+ current_node = current_node->right;
+ }
+ new_root->left = current_node;
+ new_root->right = right_root;
+ new_root->balance = new_right_height - current_height;
+ *this = new_root;
+ } else {
+ /* This case is symmetric to the previous case */
+ TreeDbTerm * new_root =
+ LINKOUT_MAX_TREE_NODE(&left_root);
+ int new_left_height = compute_tree_hight(left_root);
+ TreeDbTerm * current_node = right_root;
+ this = &right_root;
+ current_height = right_height;
+ while (current_height > new_left_height + 1) {
+ if (current_node->balance == 1) {
+ current_height = current_height - 2;
+ } else {
+ current_height = current_height - 1;
+ }
+ dstack[dpos++] = DIR_LEFT;
+ tstack[tpos++] = this;
+ this = &((*this)->left);
+ current_node = current_node->left;
+ }
+ new_root->right = current_node;
+ new_root->left = left_root;
+ new_root->balance = current_height - new_left_height;
+ *this = new_root;
+ }
+ /* Now we need to continue as if this was during the insert */
+ while (state && ( dir = dstack[--dpos] ) != DIR_END) {
+ this = tstack[--tpos];
+ p = *this;
+ if (dir == DIR_LEFT) {
+ switch (p->balance) {
+ case 1:
+ p->balance = 0;
+ state = 0;
+ break;
+ case 0:
+ p->balance = -1;
+ break;
+ case -1: /* The icky case */
+ p1 = p->left;
+ if (p1->balance == -1) { /* Single LL rotation */
+ p->left = p1->right;
+ p1->right = p;
+ p->balance = 0;
+ (*this) = p1;
+ } else { /* Double RR rotation */
+ p2 = p1->right;
+ p1->right = p2->left;
+ p2->left = p1;
+ p->left = p2->right;
+ p2->right = p;
+ p->balance = (p2->balance == -1) ? +1 : 0;
+ p1->balance = (p2->balance == 1) ? -1 : 0;
+ (*this) = p2;
+ }
+ (*this)->balance = 0;
+ state = 0;
+ break;
+ }
+ } else { /* dir == DIR_RIGHT */
+ switch (p->balance) {
+ case -1:
+ p->balance = 0;
+ state = 0;
+ break;
+ case 0:
+ p->balance = 1;
+ break;
+ case 1:
+ p1 = p->right;
+ if (p1->balance == 1) { /* Single RR rotation */
+ p->right = p1->left;
+ p1->left = p;
+ p->balance = 0;
+ (*this) = p1;
+ } else { /* Double RL rotation */
+ p2 = p1->left;
+ p1->left = p2->right;
+ p2->right = p1;
+ p->right = p2->left;
+ p2->left = p;
+ p->balance = (p2->balance == 1) ? -1 : 0;
+ p1->balance = (p2->balance == -1) ? 1 : 0;
+ (*this) = p2;
+ }
+ (*this)->balance = 0;
+ state = 0;
+ break;
+ }
+ }
+ }
+ /* Return the joined tree */
+ if (left_height >= right_height) {
+ return left_root;
+ } else {
+ return right_root;
+ }
+}
+
+
+static ERTS_INLINE
+int try_wlock_base_node(DbTableCATreeBaseNode *base_node)
+{
+ return EBUSY == erts_rwmtx_tryrwlock(&base_node->lock);
+}
+
+/*
+ * Locks a base node without adjusting the lock statistics
+ */
+static ERTS_INLINE
+void wlock_base_node_no_stats(DbTableCATreeBaseNode *base_node)
+{
+ erts_rwmtx_rwlock(&base_node->lock);
+}
+
+/*
+ * Locks a base node and adjusts the lock statistics according to if
+ * the lock was contended or not
+ */
+static ERTS_INLINE
+void wlock_base_node(DbTableCATreeBaseNode *base_node)
+{
+ if (try_wlock_base_node(base_node)) {
+ /* The lock is contended */
+ wlock_base_node_no_stats(base_node);
+ base_node->lock_statistics += ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION;
+ } else {
+ base_node->lock_statistics += ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION;
+ }
+}
+
+static ERTS_INLINE
+void wunlock_base_node(DbTableCATreeBaseNode *base_node)
+{
+ erts_rwmtx_rwunlock(&base_node->lock);
+}
+
+static ERTS_INLINE
+void rlock_base_node(DbTableCATreeBaseNode *base_node)
+{
+ erts_rwmtx_rlock(&base_node->lock);
+}
+
+static ERTS_INLINE
+void runlock_base_node(DbTableCATreeBaseNode *base_node)
+{
+ erts_rwmtx_runlock(&base_node->lock);
+}
+
+static ERTS_INLINE
+void lock_route_node(DbTableCATreeRouteNode *route_node)
+{
+ erts_mtx_lock(&route_node->lock);
+}
+
+static ERTS_INLINE
+void unlock_route_node(DbTableCATreeRouteNode *route_node)
+{
+ erts_mtx_unlock(&route_node->lock);
+}
+
+
+/*
+ * The following macros are used to create the ETS functions that only
+ * need to access one element (e.g. db_put_catree, db_get_catree and
+ * db_erase_catree).
+ */
+
+#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(LOCK,UNLOCK) \
+ int retry; \
+ DbTableCATreeNode *current_node; \
+ DbTableCATreeNode *prev_node; \
+ DbTableCommon* common_table_data = &tb->common; \
+ DbTableCATreeBaseNode *base_node; \
+ int current_level; \
+ (void)prev_node; \
+ do { \
+ retry = 0; \
+ current_node = GET_ROOT_ACQB(tb); \
+ prev_node = NULL; \
+ current_level = 0; \
+ while ( ! current_node->is_base_node ) { \
+ current_level = current_level + 1; \
+ prev_node = current_node; \
+ if (cmp_key_route(common_table_data,key,current_node) < 0) { \
+ current_node = GET_LEFT_ACQB(current_node); \
+ } else { \
+ current_node = GET_RIGHT_ACQB(current_node); \
+ } \
+ } \
+ base_node = &current_node->baseOrRoute.base; \
+ LOCK(base_node); \
+ if ( ! base_node->is_valid ) { \
+ /* Retry */ \
+ UNLOCK(base_node); \
+ retry = 1; \
+ } \
+ } while(retry);
+
+
+#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \
+ if (base_node->lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT \
+ && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) { \
+ split_catree(&tb->common, prev_node, current_node); \
+ } else if (base_node->lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) { \
+ join_catree(tb, prev_node, current_node); \
+ } else { \
+ wunlock_base_node(base_node); \
+ }
+
+#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \
+ static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \
+ Eterm key, \
+ PARAMETERS){ \
+ int result; \
+ ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node) \
+ result = SEQUENTAIL_CALL; \
+ ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \
+ return result; \
+ }
+
+
+#define ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \
+ static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \
+ Eterm key, \
+ PARAMETERS){ \
+ int result; \
+ ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(rlock_base_node,runlock_base_node) \
+ result = SEQUENTAIL_CALL; \
+ runlock_base_node(base_node); \
+ return result; \
+ }
+
+
+
+static DbTableCATreeNode *create_catree_base_node(DbTableCATree *tb)
+{
+ DbTableCATreeNode *new_base_node_container =
+ erts_db_alloc(ERTS_ALC_T_DB_TABLE,
+ (DbTable *) tb,
+ sizeof(DbTableCATreeNode));
+ DbTableCATreeBaseNode *new_base_node =
+ &new_base_node_container->baseOrRoute.base;
+ erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
+ new_base_node_container->is_base_node = 1;
+ new_base_node->root = NULL;
+ if (tb->common.type & DB_FREQ_READ)
+ rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
+ if (erts_ets_rwmtx_spin_count >= 0)
+ rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
+ erts_rwmtx_init_opt(&new_base_node->lock, &rwmtx_opt,
+ "erl_db_catree_base_node", tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB);
+ new_base_node->lock_statistics = 0;
+ new_base_node->is_valid = 1;
+ new_base_node->tab = (DbTable *) tb;
+ return new_base_node_container;
+}
+
+static DbTableCATreeNode*
+create_catree_route_node(DbTableCommon * common_table_data,
+ DbTableCATreeNode *left,
+ DbTableCATreeNode *right,
+ DbTerm * keyTerm)
+{
+ Eterm* top;
+ Eterm key = GETKEY(common_table_data,keyTerm->tpl);
+ int key_size = size_object(key);
+ Uint offset = offsetof(DbTableCATreeNode,baseOrRoute) +
+ offsetof(DbTableCATreeRouteNode,key);
+ size_t route_node_container_size =
+ offset +
+ sizeof(DbTerm) +
+ sizeof(Eterm)*key_size;
+ ErlOffHeap tmp_offheap;
+ byte* new_route_node_container_bytes =
+ erts_db_alloc(ERTS_ALC_T_DB_TABLE,
+ (DbTable *) common_table_data,
+ route_node_container_size);
+ DbTerm* newp = (DbTerm*) (new_route_node_container_bytes + offset);
+ DbTableCATreeNode *new_route_node_container =
+ (DbTableCATreeNode*)new_route_node_container_bytes;
+ DbTableCATreeRouteNode *new_route_node =
+ &new_route_node_container->baseOrRoute.route;
+ new_route_node->tab = (DbTable *)common_table_data;
+ if (key_size != 0) {
+ newp->size = key_size;
+ top = &newp->tpl[1];
+ tmp_offheap.first = NULL;
+ newp->tpl[0] = copy_struct(key, key_size, &top, &tmp_offheap);
+ newp->first_oh = tmp_offheap.first;
+ } else {
+ newp->size = key_size;
+ newp->first_oh = NULL;
+ newp->tpl[0] = key;
+ }
+ new_route_node_container->is_base_node = 0;
+ new_route_node->is_valid = 1;
+ erts_atomic_init_nob(&(new_route_node->left), (erts_aint_t)left);
+ erts_atomic_init_nob(&(new_route_node->right), (erts_aint_t)right);
+ erts_mtx_init(&new_route_node->lock, "erl_db_catree_route_node",
+ NIL, ERTS_LOCK_FLAGS_CATEGORY_DB);
+ return new_route_node_container;
+}
+
+static void free_catree_base_node(void* base_node_container_ptr)
+{
+ DbTableCATreeNode *base_node_container =
+ (DbTableCATreeNode *)base_node_container_ptr;
+ DbTableCATreeBaseNode *base_node =
+ &base_node_container->baseOrRoute.base;
+ erts_rwmtx_destroy(&base_node->lock);
+ erts_db_free(ERTS_ALC_T_DB_TABLE,
+ base_node->tab, base_node_container,
+ sizeof(DbTableCATreeNode));
+}
+
+static void free_catree_routing_node(void *route_node_container_ptr)
+{
+ size_t route_node_container_size;
+ byte* route_node_container_bytes = route_node_container_ptr;
+ DbTableCATreeNode *route_node_container =
+ (DbTableCATreeNode *)route_node_container_bytes;
+ DbTableCATreeRouteNode *route_node =
+ &route_node_container->baseOrRoute.route;
+ int key_size = route_node->key.size;
+ Uint offset = offsetof(DbTableCATreeNode,baseOrRoute) +
+ offsetof(DbTableCATreeRouteNode,key);
+ ErlOffHeap tmp_oh;
+ DbTerm* db_term = (DbTerm*) (route_node_container_bytes + offset);
+ erts_mtx_destroy(&route_node->lock);
+ route_node_container_size =
+ offset +
+ sizeof(DbTerm) +
+ sizeof(Eterm)*key_size;
+ if (key_size != 0) {
+ tmp_oh.first = db_term->first_oh;
+ erts_cleanup_offheap(&tmp_oh);
+ }
+ erts_db_free(ERTS_ALC_T_DB_TABLE,
+ route_node->tab,
+ route_node_container,
+ route_node_container_size);
+}
+
+/*
+ * Returns the parent routing node of the specified
+ * route_node_container if such a routing node exists or NULL if
+ * route_node_container is attached to the root
+ */
+static ERTS_INLINE DbTableCATreeNode *
+parent_of(DbTableCATree *tb,
+ DbTableCATreeNode *route_node_container)
+{
+
+ Eterm key = GET_ROUTE_NODE_KEY(route_node_container);
+ DbTableCATreeNode *current_node = GET_ROOT_ACQB(tb);
+ DbTableCATreeNode *prev_node = NULL;
+ if (current_node == route_node_container) {
+ return NULL;
+ }
+ while (current_node != route_node_container) {
+ prev_node = current_node;
+ if (cmp_key_route((DbTableCommon *)tb, key, current_node) < 0) {
+ current_node = GET_LEFT_ACQB(current_node);
+ } else {
+ current_node = GET_RIGHT_ACQB(current_node);
+ }
+ }
+ return prev_node;
+}
+
+
+static ERTS_INLINE DbTableCATreeNode *
+leftmost_base_node(DbTableCATreeNode *root)
+{
+ DbTableCATreeNode *node = root;
+ while (!node->is_base_node) {
+ node = GET_LEFT_ACQB(node);
+ }
+ return node;
+}
+
+
+static ERTS_INLINE DbTableCATreeNode *
+rightmost_base_node(DbTableCATreeNode *root)
+{
+ DbTableCATreeNode *node = root;
+ while (!node->is_base_node) {
+ node = GET_RIGHT_ACQB(node);
+ }
+ return node;
+}
+
+
+static ERTS_INLINE DbTableCATreeNode *
+leftmost_route_node(DbTableCATreeNode *root)
+{
+ DbTableCATreeNode *node = root;
+ DbTableCATreeNode *prev_node = NULL;
+ while (!node->is_base_node) {
+ prev_node = node;
+ node = GET_LEFT_ACQB(node);
+ }
+ if (prev_node == NULL) {
+ return NULL;
+ } else {
+ return prev_node;
+ }
+}
+
+static ERTS_INLINE DbTableCATreeNode*
+rightmost_route_node(DbTableCATreeNode *root)
+{
+ DbTableCATreeNode * node = root;
+ DbTableCATreeNode * prev_node = NULL;
+ while (!node->is_base_node) {
+ prev_node = node;
+ node = GET_RIGHT_ACQB(node);
+ }
+ if (prev_node == NULL) {
+ return NULL;
+ } else {
+ return prev_node;
+ }
+}
+
+static ERTS_INLINE DbTableCATreeNode*
+leftmost_base_node_and_path(DbTableCATreeNode *root, CATreeNodeStack * stack)
+{
+ DbTableCATreeNode * node = root;
+ while (!node->is_base_node) {
+ PUSH_NODE(stack, node);
+ node = GET_LEFT_ACQB(node);
+ }
+ return node;
+}
+
+static ERTS_INLINE DbTableCATreeNode*
+get_next_base_node_and_path(DbTableCommon *common_table_data,
+ DbTableCATreeNode *base_node,
+ CATreeNodeStack *stack)
+{
+ if (EMPTY_NODE(stack)) { /* The parent of b is the root */
+ return NULL;
+ } else {
+ if (GET_LEFT(TOP_NODE(stack)) == base_node) {
+ return leftmost_base_node_and_path(
+ GET_RIGHT_ACQB(TOP_NODE(stack)),
+ stack);
+ } else {
+ Eterm pkey =
+ TOP_NODE(stack)->baseOrRoute.route.key.tpl[0]; /* pKey = key of parent */
+ POP_NODE(stack);
+ while (!EMPTY_NODE(stack)) {
+ if (TOP_NODE(stack)->baseOrRoute.route.is_valid &&
+ cmp_key_route(common_table_data, pkey, TOP_NODE(stack)) <= 0) {
+ return leftmost_base_node_and_path(GET_RIGHT_ACQB(TOP_NODE(stack)), stack);
+ } else {
+ POP_NODE(stack);
+ }
+ }
+ }
+ return NULL;
+ }
+}
+
+static ERTS_INLINE void
+clone_stack(CATreeNodeStack *search_stack_ptr,
+ CATreeNodeStack *search_stack_copy_ptr)
+{
+ int i;
+ search_stack_copy_ptr->pos = search_stack_ptr->pos;
+ for (i = 0; i < search_stack_ptr->pos; i++) {
+ search_stack_copy_ptr->array[i] = search_stack_ptr->array[i];
+ }
+}
+
+
+
+static ERTS_INLINE DbTableCATreeNode*
+lock_first_base_node(DbTable *tbl,
+ CATreeNodeStack *search_stack_ptr,
+ CATreeNodeStack *locked_base_nodes_stack_ptr)
+{
+ int retry;
+ DbTableCATreeNode *current_node;
+ DbTableCATreeBaseNode *base_node;
+ DbTableCATree* tb = &tbl->catree;
+ do {
+ retry = 0;
+ current_node = GET_ROOT_ACQB(tb);
+ while ( ! current_node->is_base_node ) {
+ PUSH_NODE(search_stack_ptr, current_node);
+ current_node = GET_LEFT_ACQB(current_node);
+ }
+ base_node = &current_node->baseOrRoute.base;
+ rlock_base_node(base_node);
+ if ( ! base_node->is_valid ) {
+ /* Retry */
+ runlock_base_node(base_node);
+ retry = 1;
+ }
+ } while(retry);
+ push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
+ return current_node;
+}
+
+static ERTS_INLINE DbTableCATreeBaseNode*
+find_and_lock_next_base_node_and_path(DbTable *tbl,
+ CATreeNodeStack **search_stack_ptr_ptr,
+ CATreeNodeStack **search_stack_copy_ptr_ptr,
+ CATreeNodeStack *locked_base_nodes_stack_ptr)
+{
+ DbTableCATreeNode *current_node;
+ DbTableCATreeBaseNode *base_node;
+ CATreeNodeStack * tmp_stack_ptr;
+ DbTableCommon* common_table_data;
+ retry_find_and_lock_next_base_node:
+ current_node = TOP_NODE(locked_base_nodes_stack_ptr);
+ common_table_data = &tbl->common;
+ clone_stack(*search_stack_ptr_ptr, *search_stack_copy_ptr_ptr);
+ current_node =
+ get_next_base_node_and_path(common_table_data, current_node, *search_stack_ptr_ptr);
+ if (current_node == NULL) {
+ return NULL;
+ }
+ base_node = &current_node->baseOrRoute.base;
+ rlock_base_node(base_node);
+ if ( ! base_node->is_valid ) {
+ /* Retry */
+ runlock_base_node(base_node);
+ /* Revert to previous state */
+ current_node = TOP_NODE(locked_base_nodes_stack_ptr);
+ tmp_stack_ptr = *search_stack_ptr_ptr;
+ *search_stack_ptr_ptr = *search_stack_copy_ptr_ptr;
+ *search_stack_copy_ptr_ptr = tmp_stack_ptr;
+ goto retry_find_and_lock_next_base_node;
+ } else {
+ push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
+ }
+ return base_node;
+}
+
+static ERTS_INLINE
+void unlock_and_release_locked_base_node_stack(DbTable *tbl,
+ CATreeNodeStack *locked_base_nodes_stack_ptr)
+{
+ DbTableCATreeNode *current_node;
+ DbTableCATreeBaseNode *base_node;
+ int i;
+ for (i = 0; i < locked_base_nodes_stack_ptr->pos; i++) {
+ current_node = locked_base_nodes_stack_ptr->array[i];
+ base_node = &current_node->baseOrRoute.base;
+ if (locked_base_nodes_stack_ptr->pos > 1) {
+ base_node->lock_statistics = /* This is not atomic which is fine as */
+ base_node->lock_statistics + /* correctness does not depend on that. */
+ ERL_DB_CATREE_LOCK_MORE_THAN_ONE_CONTRIBUTION;
+ }
+ runlock_base_node(base_node);
+ }
+ if (locked_base_nodes_stack_ptr->size > STACK_NEED) {
+ erts_db_free(ERTS_ALC_T_DB_STK, tbl,
+ locked_base_nodes_stack_ptr->array,
+ sizeof(DbTableCATreeNode *) * locked_base_nodes_stack_ptr->size);
+ }
+}
+
+static ERTS_INLINE
+void init_stack(CATreeNodeStack *stack,
+ DbTableCATreeNode **stack_array,
+ Uint init_size)
+{
+ stack->array = stack_array;
+ stack->pos = 0;
+ stack->size = init_size;
+}
+
+static ERTS_INLINE
+void init_tree_stack(DbTreeStack *stack,
+ TreeDbTerm **stack_array,
+ Uint init_slot)
+{
+ stack->array = stack_array;
+ stack->pos = 0;
+ stack->slot = init_slot;
+}
+
+#define DEC_ROUTE_NODE_STACK_AND_ARRAY(STACK_NAME) \
+ CATreeNodeStack STACK_NAME; \
+ CATreeNodeStack * STACK_NAME##_ptr = &(STACK_NAME); \
+ DbTableCATreeNode *STACK_NAME##_array[STACK_NEED];
+
+#define DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS \
+DEC_ROUTE_NODE_STACK_AND_ARRAY(search_stack) \
+DEC_ROUTE_NODE_STACK_AND_ARRAY(search_stack_copy) \
+DEC_ROUTE_NODE_STACK_AND_ARRAY(locked_base_nodes_stack) \
+init_stack(&search_stack, search_stack_array, 0); \
+init_stack(&search_stack_copy, search_stack_copy_array, 0); \
+init_stack(&locked_base_nodes_stack, locked_base_nodes_stack_array, STACK_NEED);/* Abuse as stack array size*/
+
+/*
+ * Locks and returns the base node that contains the specified key if
+ * it is present. The function saves the search path to the found base
+ * node in search_stack_ptr and adds the found base node to
+ * locked_base_nodes_stack_ptr.
+ */
+static ERTS_INLINE DbTableCATreeBaseNode *
+lock_base_node_with_key(DbTable *tbl,
+ Eterm key,
+ CATreeNodeStack * search_stack_ptr,
+ CATreeNodeStack * locked_base_nodes_stack_ptr)
+{
+ int retry;
+ DbTableCATreeNode *current_node;
+ DbTableCATreeBaseNode *base_node;
+ DbTableCATree* tb = &tbl->catree;
+ DbTableCommon* common_table_data = &tbl->common;
+ do {
+ retry = 0;
+ current_node = GET_ROOT_ACQB(tb);
+ while ( ! current_node->is_base_node ) {
+ PUSH_NODE(search_stack_ptr, current_node);
+ if( cmp_key_route(common_table_data,key,current_node) < 0 ) {
+ current_node = GET_LEFT_ACQB(current_node);
+ } else {
+ current_node = GET_RIGHT_ACQB(current_node);
+ }
+ }
+ base_node = &current_node->baseOrRoute.base;
+ rlock_base_node(base_node);
+ if ( ! base_node->is_valid ) {
+ /* Retry */
+ runlock_base_node(base_node);
+ retry = 1;
+ }
+ } while(retry);
+ push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
+ return base_node;
+}
+
+/*
+ * Joins a base node with it's right neighbor. Returns the base node
+ * resulting from the join in locked state or NULL if there is no base
+ * node to join with.
+ */
+static DbTableCATreeNode*
+erl_db_catree_force_join_right(DbTableCommon *common_table_data,
+ DbTableCATreeNode *parent_container,
+ DbTableCATreeNode *base_container,
+ DbTableCATreeNode **result_parent_wb)
+{
+ DbTableCATreeRouteNode *parent;
+ DbTableCATreeNode *gparent_container;
+ DbTableCATreeRouteNode *gparent;
+ DbTableCATreeBaseNode *base = &base_container->baseOrRoute.base;
+ DbTableCATree *tb = (DbTableCATree *)common_table_data;
+ DbTableCATreeNode *neighbor_base_container;
+ DbTableCATreeBaseNode *neighbor_base;
+ DbTableCATreeNode *new_neighbor_base;
+ DbTableCATreeNode *neighbor_base_parent;
+ int neighbour_not_valid;
+ if (parent_container == NULL) {
+ return NULL;
+ }
+ parent = &parent_container->baseOrRoute.route;
+ do {
+ neighbor_base_container = leftmost_base_node(GET_RIGHT_ACQB(parent_container));
+ neighbor_base = &neighbor_base_container->baseOrRoute.base;
+ wlock_base_node_no_stats(neighbor_base);
+ neighbour_not_valid = !neighbor_base->is_valid;
+ if (neighbour_not_valid) {
+ wunlock_base_node(neighbor_base);
+ }
+ } while (neighbour_not_valid);
+ lock_route_node(parent);
+ parent->is_valid = 0;
+ neighbor_base->is_valid = 0;
+ base->is_valid = 0;
+ gparent = NULL;
+ gparent_container = NULL;
+ do {
+ if (gparent != NULL) {
+ unlock_route_node(gparent);
+ }
+ gparent_container = parent_of(tb, parent_container);
+ if (gparent_container != NULL) {
+ gparent = &gparent_container->baseOrRoute.route;
+ lock_route_node(gparent);
+ } else {
+ gparent = NULL;
+ }
+ } while (gparent != NULL && !gparent->is_valid);
+ if (gparent == NULL) {
+ SET_ROOT_RELB(tb, GET_RIGHT(parent_container));
+ } else if (GET_LEFT(gparent_container) == parent_container) {
+ SET_LEFT_RELB(gparent_container, GET_RIGHT(parent_container));
+ } else {
+ SET_RIGHT_RELB(gparent_container, GET_RIGHT(parent_container));
+ }
+ unlock_route_node(parent);
+ if (gparent != NULL) {
+ unlock_route_node(gparent);
+ }
+ new_neighbor_base = create_catree_base_node(tb);
+ new_neighbor_base->baseOrRoute.base.root =
+ join_trees(base->root, neighbor_base->root);
+ wlock_base_node_no_stats(&(new_neighbor_base->baseOrRoute.base));
+ neighbor_base_parent = NULL;
+ if (GET_RIGHT(parent_container) == neighbor_base_container) {
+ neighbor_base_parent = gparent_container;
+ } else {
+ neighbor_base_parent =
+ leftmost_route_node(GET_RIGHT(parent_container));
+ }
+ if(neighbor_base_parent == NULL) {
+ SET_ROOT_RELB(tb, new_neighbor_base);
+ } else if (GET_LEFT(neighbor_base_parent) == neighbor_base_container) {
+ SET_LEFT_RELB(neighbor_base_parent, new_neighbor_base);
+ } else {
+ SET_RIGHT_RELB(neighbor_base_parent, new_neighbor_base);
+ }
+ wunlock_base_node(base);
+ wunlock_base_node(neighbor_base);
+ /* Free the parent and base */
+ erts_schedule_thr_prgr_later_op(free_catree_routing_node,
+ parent_container,
+ &parent->free_item);
+ erts_schedule_thr_prgr_later_op(free_catree_base_node,
+ base_container,
+ &base->free_item);
+ erts_schedule_thr_prgr_later_op(free_catree_base_node,
+ neighbor_base_container,
+ &neighbor_base->free_item);
+
+ if (parent_container == neighbor_base_container) {
+ *result_parent_wb = gparent_container;
+ } else {
+ *result_parent_wb = neighbor_base_parent;
+ }
+ return new_neighbor_base;
+}
+
+/*
+ * Used to merge together all base nodes for operations such as last
+ * and select_*. Returns the base node resulting from the merge in
+ * locked state.
+ */
+static DbTableCATreeNode *
+merge_to_one_locked_base_node(DbTableCommon * common_table_data)
+{
+ DbTableCATreeNode *parent_container;
+ DbTableCATreeNode *new_parent_container;
+ DbTableCATree *tb = (DbTableCATree *)common_table_data;
+ DbTableCATreeNode *base_container;
+ DbTableCATreeNode *new_base_container;
+ int is_not_valid;
+ /* Find first base node */
+ do {
+ parent_container = NULL;
+ base_container = GET_ROOT_ACQB(tb);
+ while ( ! base_container->is_base_node ) {
+ parent_container = base_container;
+ base_container = GET_LEFT_ACQB(base_container);
+ }
+ wlock_base_node_no_stats(&(base_container->baseOrRoute.base));
+ is_not_valid = ! base_container->baseOrRoute.base.is_valid;
+ if (is_not_valid) {
+ wunlock_base_node(&(base_container->baseOrRoute.base));
+ }
+ } while(is_not_valid);
+ do {
+ new_base_container = erl_db_catree_force_join_right(common_table_data,
+ parent_container,
+ base_container,
+ &new_parent_container);
+ if (new_base_container != NULL) {
+ base_container = new_base_container;
+ parent_container = new_parent_container;
+ }
+ } while(new_base_container != NULL);
+ return base_container;
+}
+
+
+static void join_catree(DbTableCATree *tb,
+ DbTableCATreeNode *parent_container,
+ DbTableCATreeNode *base_container)
+{
+ DbTableCATreeRouteNode *parent;
+ DbTableCATreeNode *gparent_container;
+ DbTableCATreeRouteNode *gparent;
+ DbTableCATreeBaseNode *base = &base_container->baseOrRoute.base;
+ DbTableCATreeNode *neighbor_base_container;
+ DbTableCATreeBaseNode *neighbor_base;
+ DbTableCATreeNode *new_neighbor_base;
+ DbTableCATreeNode *neighbor_base_parent;
+ if (parent_container == NULL) {
+ base->lock_statistics = 0;
+ wunlock_base_node(base);
+ return;
+ }
+ parent = &parent_container->baseOrRoute.route;
+ if (GET_LEFT(parent_container) == base_container) {
+ neighbor_base_container = leftmost_base_node(GET_RIGHT_ACQB(parent_container));
+ neighbor_base = &neighbor_base_container->baseOrRoute.base;
+ if (try_wlock_base_node(neighbor_base)) {
+ /* Failed to acquire lock */
+ base->lock_statistics = 0;
+ wunlock_base_node(base);
+ return;
+ } else if (!neighbor_base->is_valid) {
+ base->lock_statistics = 0;
+ wunlock_base_node(base);
+ wunlock_base_node(neighbor_base);
+ return;
+ } else {
+ lock_route_node(parent);
+ parent->is_valid = 0;
+ neighbor_base->is_valid = 0;
+ base->is_valid = 0;
+ gparent = NULL;
+ gparent_container = NULL;
+ do {
+ if (gparent != NULL) {
+ unlock_route_node(gparent);
+ }
+ gparent_container = parent_of(tb, parent_container);
+ if (gparent_container != NULL) {
+ gparent = &gparent_container->baseOrRoute.route;
+ lock_route_node(gparent);
+ } else {
+ gparent = NULL;
+ }
+ } while (gparent != NULL && !gparent->is_valid);
+ if (gparent == NULL) {
+ SET_ROOT_RELB(tb, GET_RIGHT(parent_container));
+ } else if (GET_LEFT(gparent_container) == parent_container) {
+ SET_LEFT_RELB(gparent_container, GET_RIGHT(parent_container));
+ } else {
+ SET_RIGHT_RELB(gparent_container, GET_RIGHT(parent_container));
+ }
+ unlock_route_node(parent);
+ if (gparent != NULL) {
+ unlock_route_node(gparent);
+ }
+ new_neighbor_base = create_catree_base_node(tb);
+ new_neighbor_base->baseOrRoute.base.root =
+ join_trees(base->root, neighbor_base->root);
+ neighbor_base_parent = NULL;
+ if (GET_RIGHT(parent_container) == neighbor_base_container) {
+ neighbor_base_parent = gparent_container;
+ } else {
+ neighbor_base_parent =
+ leftmost_route_node(GET_RIGHT(parent_container));
+ }
+ }
+ } else { /* Symetric case */
+ neighbor_base_container = rightmost_base_node(GET_LEFT_ACQB(parent_container));
+ neighbor_base = &neighbor_base_container->baseOrRoute.base;
+ if (try_wlock_base_node(neighbor_base)) {
+ /* Failed to acquire lock */
+ base->lock_statistics = 0;
+ wunlock_base_node(base);
+ return;
+ } else if (!neighbor_base->is_valid) {
+ base->lock_statistics = 0;
+ wunlock_base_node(base);
+ wunlock_base_node(neighbor_base);
+ return;
+ } else {
+ lock_route_node(parent);
+ parent->is_valid = 0;
+ neighbor_base->is_valid = 0;
+ base->is_valid = 0;
+ gparent = NULL;
+ gparent_container = NULL;
+ do {
+ if (gparent != NULL) {
+ unlock_route_node(gparent);
+ }
+ gparent_container = parent_of(tb, parent_container);
+ if (gparent_container != NULL) {
+ gparent = &gparent_container->baseOrRoute.route;
+ lock_route_node(gparent);
+ } else {
+ gparent = NULL;
+ }
+ } while (gparent != NULL && !gparent->is_valid);
+ if (gparent == NULL) {
+ SET_ROOT_RELB(tb, GET_LEFT(parent_container));
+ } else if (GET_RIGHT(gparent_container) == parent_container) {
+ SET_RIGHT_RELB(gparent_container, GET_LEFT(parent_container));
+ } else {
+ SET_LEFT_RELB(gparent_container, GET_LEFT(parent_container));
+ }
+ unlock_route_node(parent);
+ if (gparent != NULL) {
+ unlock_route_node(gparent);
+ }
+ new_neighbor_base = create_catree_base_node(tb);
+ new_neighbor_base->baseOrRoute.base.root =
+ join_trees(neighbor_base->root, base->root);
+ neighbor_base_parent = NULL;
+ if (GET_LEFT(parent_container) == neighbor_base_container) {
+ neighbor_base_parent = gparent_container;
+ } else {
+ neighbor_base_parent =
+ rightmost_route_node(GET_LEFT(parent_container));
+ }
+ }
+ }
+ /* Link in new neighbor and free nodes that are no longer in the tree */
+ if (neighbor_base_parent == NULL) {
+ SET_ROOT_RELB(tb, new_neighbor_base);
+ } else if (GET_LEFT(neighbor_base_parent) == neighbor_base_container) {
+ SET_LEFT_RELB(neighbor_base_parent, new_neighbor_base);
+ } else {
+ SET_RIGHT_RELB(neighbor_base_parent, new_neighbor_base);
+ }
+ wunlock_base_node(base);
+ wunlock_base_node(neighbor_base);
+ /* Free the parent and base */
+ erts_schedule_thr_prgr_later_op(free_catree_routing_node,
+ parent_container,
+ &parent->free_item);
+ erts_schedule_thr_prgr_later_op(free_catree_base_node,
+ base_container,
+ &base->free_item);
+ erts_schedule_thr_prgr_later_op(free_catree_base_node,
+ neighbor_base_container,
+ &neighbor_base->free_item);
+}
+
+
+static void split_catree(DbTableCommon *tb,
+ DbTableCATreeNode *parent_container,
+ DbTableCATreeNode *base_container) {
+ TreeDbTerm *splitOutWriteBack;
+ TreeDbTerm *leftWriteBack;
+ TreeDbTerm *rightWriteBack;
+ DbTableCATreeNode *left_base_node;
+ DbTableCATreeNode *right_base_node;
+ DbTableCATreeNode *routing_node_container;
+ DbTableCATreeBaseNode *base = &base_container->baseOrRoute.base;
+ DbTableCATreeRouteNode *parent;
+ if (parent_container == NULL) {
+ parent = NULL;
+ } else {
+ parent = &parent_container->baseOrRoute.route;
+ }
+
+ if (less_than_two_elements(base->root)) {
+ base->lock_statistics = 0;
+ wunlock_base_node(base);
+ return;
+ } else {
+ /* Split the tree */
+ split_tree(tb,
+ base->root,
+ &splitOutWriteBack,
+ &leftWriteBack,
+ &rightWriteBack);
+ /* Create new base nodes */
+ left_base_node =
+ create_catree_base_node((DbTableCATree*)tb);
+ right_base_node =
+ create_catree_base_node((DbTableCATree*)tb);
+ left_base_node->baseOrRoute.base.root = leftWriteBack;
+ right_base_node->baseOrRoute.base.root = rightWriteBack;
+ routing_node_container = create_catree_route_node(tb,
+ left_base_node,
+ right_base_node,
+ &splitOutWriteBack->dbterm);
+ if (parent == NULL) {
+ SET_ROOT_RELB((DbTableCATree*)tb, routing_node_container);
+ } else if(GET_LEFT(parent_container) == base_container) {
+ SET_LEFT_RELB(parent_container, routing_node_container);
+ } else {
+ SET_RIGHT_RELB(parent_container, routing_node_container);
+ }
+ base->is_valid = 0;
+ wunlock_base_node(base);
+ erts_schedule_thr_prgr_later_op(free_catree_base_node,
+ base_container,
+ &base->free_item);
+ }
+}
+
+/*
+ * Helper functions for removing the table
+ */
+
+static void catree_add_base_node_to_free_list(
+ DbTableCATree *tb,
+ DbTableCATreeNode *base_node_container)
+{
+ base_node_container->baseOrRoute.base.next =
+ tb->base_nodes_to_free_list;
+ tb->base_nodes_to_free_list = base_node_container;
+}
+
+static void catree_deque_base_node_from_free_list(DbTableCATree *tb)
+{
+ if (tb->base_nodes_to_free_list == NULL) {
+ return; /* List empty */
+ } else {
+ DbTableCATreeNode *first = tb->base_nodes_to_free_list;
+ tb->base_nodes_to_free_list = first->baseOrRoute.base.next;
+ }
+}
+
+static DbTableCATreeNode *catree_first_base_node_from_free_list(
+ DbTableCATree *tb)
+{
+ return tb->base_nodes_to_free_list;
+}
+
+static SWord do_free_routing_nodes_catree_cont(DbTableCATree *tb, SWord num_left)
+{
+ DbTableCATreeNode *root;
+ DbTableCATreeNode *p;
+ for (;;) {
+ root = POP_NODE(&tb->free_stack_rnodes);
+ if (root == NULL) break;
+ else if(root->is_base_node) {
+ catree_add_base_node_to_free_list(tb, root);
+ break;
+ }
+ for (;;) {
+ if ((GET_LEFT(root) != NULL) &&
+ (p = GET_LEFT(root))->is_base_node) {
+ SET_LEFT(root, NULL);
+ catree_add_base_node_to_free_list(tb, p);
+ } else if ((GET_RIGHT(root) != NULL) &&
+ (p = GET_RIGHT(root))->is_base_node) {
+ SET_RIGHT(root, NULL);
+ catree_add_base_node_to_free_list(tb, p);
+ } else if ((p = GET_LEFT(root)) != NULL) {
+ SET_LEFT(root, NULL);
+ PUSH_NODE(&tb->free_stack_rnodes, root);
+ root = p;
+ } else if ((p = GET_RIGHT(root)) != NULL) {
+ SET_RIGHT(root, NULL);
+ PUSH_NODE(&tb->free_stack_rnodes, root);
+ root = p;
+ } else {
+ free_catree_routing_node(root);
+ if (--num_left >= 0) {
+ break;
+ } else {
+ return num_left; /* Done enough for now */
+ }
+ }
+ }
+ }
+ return num_left;
+}
+
+static SWord do_free_base_node_cont(DbTableCATree *tb, SWord num_left)
+{
+ TreeDbTerm *root;
+ TreeDbTerm *p;
+ DbTableCATreeNode *base_node_container =
+ catree_first_base_node_from_free_list(tb);
+ for (;;) {
+ root = POP_NODE(&tb->free_stack_elems);
+ if (root == NULL) break;
+ for (;;) {
+ if ((p = root->left) != NULL) {
+ root->left = NULL;
+ PUSH_NODE(&tb->free_stack_elems, root);
+ root = p;
+ } else if ((p = root->right) != NULL) {
+ root->right = NULL;
+ PUSH_NODE(&tb->free_stack_elems, root);
+ root = p;
+ } else {
+ free_term((DbTable*)tb, root);
+ if (--num_left >= 0) {
+ break;
+ } else {
+ return num_left; /* Done enough for now */
+ }
+ }
+ }
+ }
+ catree_deque_base_node_from_free_list(tb);
+ free_catree_base_node(base_node_container);
+ base_node_container = catree_first_base_node_from_free_list(tb);
+ if (base_node_container != NULL) {
+ PUSH_NODE(&tb->free_stack_elems, base_node_container->baseOrRoute.base.root);
+ }
+ return num_left;
+}
+
+
+/*
+** Initialization function
+*/
+
+void db_initialize_catree(void)
+{
+ return;
+};
+
+/*
+** Table interface routines (i.e., what's called by the bif's)
+*/
+
+int db_create_catree(Process *p, DbTable *tbl)
+{
+ DbTableCATree *tb = &tbl->catree;
+ DbTableCATreeNode *root = create_catree_base_node(tb);
+ tb->deletion = 0;
+ tb->base_nodes_to_free_list = NULL;
+ erts_atomic_init_relb(&(tb->root), (erts_aint_t)root);
+ return DB_ERROR_NONE;
+}
+
+static int db_first_catree(Process *p, DbTable *tbl, Eterm *ret)
+{
+ DbTableCATreeBaseNode *base_node;
+ int result;
+ DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS;
+ /* Find first base node */
+ base_node = &(lock_first_base_node(tbl, &search_stack, &locked_base_nodes_stack)->baseOrRoute.base);
+ /* Find next base node until non-empty base node is found */
+ while (base_node != NULL && base_node->root == NULL) {
+ base_node = find_and_lock_next_base_node_and_path(tbl, &search_stack_ptr, &search_stack_copy_ptr, locked_base_nodes_stack_ptr);
+ }
+ /* Get the return value */
+ result = db_first_tree_common(p, tbl, (base_node == NULL ? NULL : base_node->root), ret, NULL);
+ /* Unlock base nodes */
+ unlock_and_release_locked_base_node_stack(tbl, locked_base_nodes_stack_ptr);
+ return result;
+}
+
+static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTreeStack next_search_stack;
+ TreeDbTerm *next_search_stack_array[STACK_NEED];
+ DbTableCATreeBaseNode *base_node;
+ int result = 0;
+ DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS;
+ init_tree_stack(&next_search_stack, next_search_stack_array, 0);
+ /* Lock base node with key */
+ base_node = lock_base_node_with_key(tbl, key, &search_stack, &locked_base_nodes_stack);
+ /* Continue until key is not >= greatest key in base_node */
+ while (base_node != NULL) {
+ result = db_next_tree_common(p, tbl, base_node->root, key,
+ ret, &next_search_stack);
+ if (result != DB_ERROR_NONE || *ret != am_EOT) {
+ break;
+ }
+ base_node =
+ find_and_lock_next_base_node_and_path(tbl,
+ &search_stack_ptr,
+ &search_stack_copy_ptr,
+ locked_base_nodes_stack_ptr);
+ }
+ unlock_and_release_locked_base_node_stack(tbl, &locked_base_nodes_stack);
+ return result;
+}
+
+static int db_last_catree(Process *p, DbTable *tbl, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common);
+ int result = db_last_tree_common(p, tbl, base->baseOrRoute.base.root, ret, NULL);
+ wunlock_base_node(&(base->baseOrRoute.base));
+ return result;
+
+}
+
+static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTreeStack stack;
+ TreeDbTerm * stack_array[STACK_NEED];
+ DbTableCATree *tb = &tbl->catree;
+ int result;
+ DbTableCATreeNode *base;
+ init_tree_stack(&stack, stack_array, 0);
+ base = merge_to_one_locked_base_node(&tb->common);
+ result = db_prev_tree_common(p, tbl, base->baseOrRoute.base.root, key, ret, &stack);
+ wunlock_base_node(&(base->baseOrRoute.base));
+ return result;
+}
+
+#define ERL_DB_CATREE_DO_OPERATION_PUT_PARAMS Eterm obj, int key_clash_fail
+ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(put,
+ ERL_DB_CATREE_DO_OPERATION_PUT_PARAMS,
+ db_put_tree_common(&tb->common,
+ &base_node->root,
+ obj,
+ key_clash_fail,
+ NULL);)
+
+static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail)
+{
+ DbTableCATree *tb = &tbl->catree;
+ Eterm key = GETKEY(&tb->common, tuple_val(obj));
+ return erl_db_catree_do_operation_put(tb,
+ key,
+ obj,
+ key_clash_fail);
+}
+
+#define ERL_DB_CATREE_DO_OPERATION_GET_PARAMS Process *p, Eterm *ret
+ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(get,
+ ERL_DB_CATREE_DO_OPERATION_GET_PARAMS,
+ db_get_tree_common(p, &tb->common,
+ base_node->root,
+ key, ret, NULL);)
+
+static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ return erl_db_catree_do_operation_get(tb, key, p, ret);
+}
+
+#define ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS Eterm *ret
+ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(member,
+ ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS,
+ db_member_tree_common(&tb->common,
+ base_node->root,
+ key,
+ ret,
+ NULL);)
+
+static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ return erl_db_catree_do_operation_member(tb, key, ret);
+}
+
+#define ERL_DB_CATREE_DO_OPERATION_GET_ELEMENT_PARAMS Process *p, int ndex, Eterm *ret
+ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(get_element,
+ ERL_DB_CATREE_DO_OPERATION_GET_ELEMENT_PARAMS,
+ db_get_element_tree_common(p, &tb->common,
+ base_node->root,
+ key, ndex,
+ ret, NULL))
+
+static int db_get_element_catree(Process *p, DbTable *tbl,
+ Eterm key, int ndex, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ return erl_db_catree_do_operation_get_element(tb, key, p, ndex, ret);
+}
+
+#define ERL_DB_CATREE_DO_OPERATION_ERASE_PARAMS DbTable *tbl, Eterm *ret
+ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(erase,
+ ERL_DB_CATREE_DO_OPERATION_ERASE_PARAMS,
+ db_erase_tree_common(tbl,
+ &base_node->root,
+ key,
+ ret,
+ NULL);)
+
+static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ return erl_db_catree_do_operation_erase(tb, key, tbl, ret);
+}
+
+#define ERL_DB_CATREE_DO_OPERATION_ERASE_OBJECT_PARAMS DbTable *tbl, Eterm object, Eterm *ret
+ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(erase_object,
+ ERL_DB_CATREE_DO_OPERATION_ERASE_OBJECT_PARAMS,
+ db_erase_object_tree_common(tbl,
+ &base_node->root,
+ object,
+ ret,
+ NULL);)
+
+static int db_erase_object_catree(DbTable *tbl, Eterm object, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ Eterm key = GETKEY(&tb->common, tuple_val(object));
+ return erl_db_catree_do_operation_erase_object(tb, key, tbl, object, ret);
+}
+
+
+static int db_slot_catree(Process *p, DbTable *tbl,
+ Eterm slot_term, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ int result;
+ DbTableCATreeNode *base;
+ base = merge_to_one_locked_base_node(&tb->common);
+ result = db_slot_tree_common(p, tbl, base->baseOrRoute.base.root,
+ slot_term, ret, NULL);
+ wunlock_base_node(&(base->baseOrRoute.base));
+ return result;
+}
+
+static int db_select_continue_catree(Process *p,
+ DbTable *tbl,
+ Eterm continuation,
+ Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ int result;
+ DbTableCATreeNode *base;
+ base = merge_to_one_locked_base_node(&tb->common);
+ result = db_select_continue_tree_common(p, &tb->common, &base->baseOrRoute.base.root,
+ continuation, ret, NULL);
+ wunlock_base_node(&(base->baseOrRoute.base));
+ return result;
+}
+
+
+static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, int reverse, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ int result;
+ DbTableCATreeNode *base;
+ base = merge_to_one_locked_base_node(&tb->common);
+ result = db_select_tree_common(p, &tb->common, &base->baseOrRoute.base.root,
+ tid, pattern, reverse, ret,
+ NULL);
+ wunlock_base_node(&(base->baseOrRoute.base));
+ return result;
+}
+
+static int db_select_count_continue_catree(Process *p,
+ DbTable *tbl,
+ Eterm continuation,
+ Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ int result;
+ DbTableCATreeNode *base;
+ base = merge_to_one_locked_base_node(&tb->common);
+ result = db_select_count_continue_tree_common(p, &tb->common,
+ &base->baseOrRoute.base.root,
+ continuation, ret, NULL);
+ wunlock_base_node(&(base->baseOrRoute.base));
+ return result;
+}
+
+static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ int result;
+ DbTableCATreeNode *base;
+ base = merge_to_one_locked_base_node(&tb->common);
+ result = db_select_count_tree_common(p, &tb->common, &base->baseOrRoute.base.root,
+ tid, pattern, ret, NULL);
+ wunlock_base_node(&(base->baseOrRoute.base));
+ return result;
+}
+
+static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Sint chunk_size,
+ int reversed, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ int result;
+ DbTableCATreeNode *base;
+ base = merge_to_one_locked_base_node(&tb->common);
+ result = db_select_chunk_tree_common(p, &tb->common, &base->baseOrRoute.base.root,
+ tid, pattern, chunk_size, reversed, ret, NULL);
+ wunlock_base_node(&(base->baseOrRoute.base));
+ return result;
+}
+
+static int db_select_delete_continue_catree(Process *p,
+ DbTable *tbl,
+ Eterm continuation,
+ Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ DbTreeStack stack;
+ TreeDbTerm * stack_array[STACK_NEED];
+ int result;
+ DbTableCATreeNode *base;
+ init_tree_stack(&stack, stack_array, 0);
+ base = merge_to_one_locked_base_node(&tb->common);
+ result = db_select_delete_continue_tree_common(p, tbl, &base->baseOrRoute.base.root,
+ continuation, ret, &stack);
+ wunlock_base_node(&(base->baseOrRoute.base));
+ return result;
+}
+
+static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ DbTreeStack stack;
+ TreeDbTerm * stack_array[STACK_NEED];
+ int result;
+ DbTableCATreeNode *base;
+ init_tree_stack(&stack, stack_array, 0);
+ base = merge_to_one_locked_base_node(&tb->common);
+ result = db_select_delete_tree_common(p, tbl, &base->baseOrRoute.base.root,
+ tid, pattern, ret, &stack);
+ wunlock_base_node(&(base->baseOrRoute.base));
+ return result;
+}
+
+static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ int result;
+ DbTableCATreeNode *base;
+ base = merge_to_one_locked_base_node(&tb->common);
+ result = db_select_replace_tree_common(p, &tb->common,
+ &base->baseOrRoute.base.root,
+ tid, pattern, ret, NULL);
+ wunlock_base_node(&(base->baseOrRoute.base));
+ return result;
+}
+
+static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
+ Eterm continuation, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ int result;
+ DbTableCATreeNode *base;
+ base = merge_to_one_locked_base_node(&tb->common);
+ result = db_select_replace_continue_tree_common(p, &tb->common,
+ &base->baseOrRoute.base.root,
+ continuation, ret, NULL);
+ wunlock_base_node(&(base->baseOrRoute.base));
+ return result;
+}
+
+#define ERL_DB_CATREE_DO_OPERATION_TAKE_PARAMS Process *p, Eterm *ret, DbTable *tbl
+ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(take,
+ ERL_DB_CATREE_DO_OPERATION_TAKE_PARAMS,
+ db_take_tree_common(p, tbl,
+ &base_node->root,
+ key, ret, NULL);)
+
+static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ return erl_db_catree_do_operation_take(tb, key, p, ret, tbl);
+}
+
+/*
+** Other interface routines (not directly coupled to one bif)
+*/
+
+
+/* Display tree contents (for dump) */
+static void db_print_catree(fmtfn_t to, void *to_arg,
+ int show, DbTable *tbl)
+{
+ DbTableCATree *tb = &tbl->catree;
+ DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common);
+ db_print_tree_common(to, to_arg, show, base->baseOrRoute.base.root, tbl);
+ wunlock_base_node(&(base->baseOrRoute.base));
+}
+
+/* Release all memory occupied by a single table */
+static int db_free_table_catree(DbTable *tbl)
+{
+ while (db_free_table_continue_catree(tbl, ERTS_SWORD_MAX) < 0)
+ ;
+ return 1;
+}
+
+static SWord db_free_table_continue_catree(DbTable *tbl, SWord reds)
+{
+ DbTableCATreeNode *first_base_node;
+ DbTableCATree *tb = &tbl->catree;
+ if (!tb->deletion) {
+ tb->deletion = 1;
+ tb->free_stack_elems.array =
+ erts_db_alloc(ERTS_ALC_T_DB_STK,
+ (DbTable *) tb,
+ sizeof(TreeDbTerm *) * STACK_NEED);
+ tb->free_stack_elems.pos = 0;
+ tb->free_stack_elems.slot = 0;
+ tb->free_stack_rnodes.array =
+ erts_db_alloc(ERTS_ALC_T_DB_STK,
+ (DbTable *) tb,
+ sizeof(DbTableCATreeNode *) * STACK_NEED);
+ tb->free_stack_rnodes.pos = 0;
+ tb->free_stack_rnodes.size = STACK_NEED;
+ PUSH_NODE(&tb->free_stack_rnodes, GET_ROOT(tb));
+ tb->is_routing_nodes_freed = 0;
+ tb->base_nodes_to_free_list = NULL;
+ }
+ if ( ! tb->is_routing_nodes_freed ) {
+ reds = do_free_routing_nodes_catree_cont(tb, reds);
+ if (reds < 0) {
+ return reds; /* Not finished */
+ } else {
+ tb->is_routing_nodes_freed = 1; /* Ready with the routing nodes */
+ first_base_node = catree_first_base_node_from_free_list(tb);
+ PUSH_NODE(&tb->free_stack_elems, first_base_node->baseOrRoute.base.root);
+ }
+ }
+ while (catree_first_base_node_from_free_list(tb) != NULL) {
+ reds = do_free_base_node_cont(tb, reds);
+ if (reds < 0) {
+ return reds; /* Continue later */
+ }
+ }
+ /* Time to free the main structure*/
+ erts_db_free(ERTS_ALC_T_DB_STK,
+ (DbTable *) tb,
+ (void *) tb->free_stack_elems.array,
+ sizeof(TreeDbTerm *) * STACK_NEED);
+ erts_db_free(ERTS_ALC_T_DB_STK,
+ (DbTable *) tb,
+ (void *) tb->free_stack_rnodes.array,
+ sizeof(DbTableCATreeNode *) * STACK_NEED);
+ return 1;
+}
+
+static SWord db_delete_all_objects_catree(Process* p, DbTable* tbl, SWord reds)
+{
+ reds = db_free_table_continue_catree(tbl, reds);
+ if (reds < 0)
+ return reds;
+ db_create_catree(p, tbl);
+ erts_atomic_set_nob(&tbl->catree.common.nitems, 0);
+ return reds;
+}
+
+
+static void db_foreach_offheap_catree(DbTable *tbl,
+ void (*func)(ErlOffHeap *, void *),
+ void *arg)
+{
+ DbTableCATree *tb = &tbl->catree;
+ DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common);
+ db_foreach_offheap_tree_common(base->baseOrRoute.base.root, func, arg);
+ wunlock_base_node(&(base->baseOrRoute.base));
+}
+
+static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
+ DbUpdateHandle *handle)
+{
+ DbTableCATree *tb = &tbl->catree;
+ int res;
+ ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node);
+ res = db_lookup_dbterm_tree_common(p, tbl, &base_node->root, key, obj, handle, NULL);
+ if (res == 0) {
+ ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART;
+ } else {
+ /* db_finalize_dbterm_catree will unlock */
+ handle->lck = prev_node;
+ handle->lck2 = current_node;
+ handle->current_level = current_level;
+ }
+ return res;
+}
+
+static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *handle)
+{
+ DbTableCATree *tb = &(handle->tb->catree);
+ DbTableCATreeNode *prev_node = handle->lck;
+ DbTableCATreeNode *current_node = handle->lck2;
+ int current_level = handle->current_level;
+ DbTableCATreeBaseNode *base_node = &current_node->baseOrRoute.base;
+ db_finalize_dbterm_tree_common(cret, handle, NULL);
+ ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART;
+ return;
+}
+
+#ifdef ERTS_ENABLE_LOCK_COUNT
+static void erts_lcnt_enable_db_catree_lock_count_helper(DbTableCATree *tb,
+ DbTableCATreeNode *node,
+ int enable)
+{
+ erts_lcnt_ref_t *lcnt_ref;
+ erts_lock_flags_t lock_type;
+ if (node->is_base_node) {
+ lcnt_ref = &GET_BASE_NODE_LOCK(node)->lcnt;
+ lock_type = ERTS_LOCK_TYPE_RWMUTEX;
+ } else {
+ erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_LEFT(node), enable);
+ erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_RIGHT(node), enable);
+ lcnt_ref = &GET_ROUTE_NODE_LOCK(node)->lcnt;
+ lock_type = ERTS_LOCK_TYPE_MUTEX;
+ }
+ if (enable) {
+ erts_lcnt_install_new_lock_info(lcnt_ref, "db_hash_slot", tb->common.the_name,
+ lock_type | ERTS_LOCK_FLAGS_CATEGORY_DB);
+ } else {
+ erts_lcnt_uninstall(lcnt_ref);
+ }
+}
+
+void erts_lcnt_enable_db_catree_lock_count(DbTableCATree *tb, int enable)
+{
+ erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_ROOT(tb), enable);
+}
+#endif /* ERTS_ENABLE_LOCK_COUNT */
+
+
+#ifdef HARDDEBUG
+
+/*
+ * Not called, but kept as it might come to use
+ */
+static inline int my_check_table_tree(TreeDbTerm *t)
+{
+ int lh, rh;
+ if (t == NULL)
+ return 0;
+ lh = my_check_table_tree(t->left);
+ rh = my_check_table_tree(t->right);
+ if ((rh - lh) != t->balance) {
+ erts_fprintf(stderr, "Invalid tree balance for this node:\n");
+ erts_fprintf(stderr,"balance = %d, left = 0x%08X, right = 0x%08X\n",
+ t->balance, t->left, t->right);
+ erts_fprintf(stderr,"\nDump:\n---------------------------------\n");
+ erts_fprintf(stderr,"\n---------------------------------\n");
+ abort();
+ }
+ return ((rh > lh) ? rh : lh) + 1;
+}
+
+#endif
diff --git a/erts/emulator/beam/erl_db_catree.h b/erts/emulator/beam/erl_db_catree.h
new file mode 100644
index 0000000000..1f2c7da091
--- /dev/null
+++ b/erts/emulator/beam/erl_db_catree.h
@@ -0,0 +1,91 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 1998-2016. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+/*
+ * Description: Implementation of ETS ordered_set table type with
+ * fine-grained synchronization.
+ *
+ * Author: Kjell Winblad
+ *
+ * "erl_db_catree.c" contains more details about the implementation.
+ *
+ */
+
+#ifndef _DB_CATREE_H
+#define _DB_CATREE_H
+
+struct DbTableCATreeNode;
+
+typedef struct {
+ erts_rwmtx_t lock; /* The lock for this base node */
+ Sint lock_statistics;
+ int is_valid; /* If this base node is still valid */
+ TreeDbTerm *root; /* The root of the sequential tree */
+ DbTable * tab; /* Table ptr, used when freeing using thread progress */
+ ErtsThrPrgrLaterOp free_item; /* Used when freeing using thread progress */
+ struct DbTableCATreeNode * next; /* Used when gradually deleting */
+} DbTableCATreeBaseNode;
+
+typedef struct {
+ ErtsThrPrgrLaterOp free_item; /* Used when freeing using thread progress */
+ DbTable* tab; /* Table ptr, used when freeing using thread progress */
+ erts_mtx_t lock; /* Used when joining route nodes */
+ int is_valid; /* If this route node is still valid */
+ erts_atomic_t left;
+ erts_atomic_t right;
+ DbTerm key;
+} DbTableCATreeRouteNode;
+
+typedef struct DbTableCATreeNode {
+ int is_base_node;
+ union {
+ DbTableCATreeRouteNode route;
+ DbTableCATreeBaseNode base;
+ } baseOrRoute;
+} DbTableCATreeNode;
+
+typedef struct {
+ Uint pos; /* Current position on stack */
+ Uint size; /* The size of the stack array */
+ DbTableCATreeNode** array; /* The stack */
+} CATreeNodeStack;
+
+typedef struct db_table_catree {
+ DbTableCommon common;
+
+ /* CA Tree-specific fields */
+ erts_atomic_t root; /* The tree root (DbTableCATreeNode*) */
+ Uint deletion; /* Being deleted */
+ DbTreeStack free_stack_elems;/* Used for deletion ...*/
+ CATreeNodeStack free_stack_rnodes;
+ DbTableCATreeNode *base_nodes_to_free_list;
+ int is_routing_nodes_freed;
+} DbTableCATree;
+
+void db_initialize_catree(void);
+
+int db_create_catree(Process *p, DbTable *tbl);
+
+
+#ifdef ERTS_ENABLE_LOCK_COUNT
+void erts_lcnt_enable_db_catree_lock_count(DbTableCATree *tb, int enable);
+#endif
+
+#endif /* _DB_CATREE_H */
diff --git a/erts/emulator/beam/erl_db_tree.c b/erts/emulator/beam/erl_db_tree.c
index 2eb874b005..f643344477 100644
--- a/erts/emulator/beam/erl_db_tree.c
+++ b/erts/emulator/beam/erl_db_tree.c
@@ -48,34 +48,13 @@
#include "erl_binary.h"
#include "erl_db_tree.h"
+#include "erl_db_tree_util.h"
#define GETKEY_WITH_POS(Keypos, Tplp) (*((Tplp) + Keypos))
#define NITEMS(tb) ((int)erts_atomic_read_nob(&(tb)->common.nitems))
-/*
-** A stack of this size is enough for an AVL tree with more than
-** 0xFFFFFFFF elements. May be subject to change if
-** the datatype of the element counter is changed to a 64 bit integer.
-** The Maximal height of an AVL tree is calculated as:
-** h(n) <= 1.4404 * log(n + 2) - 0.328
-** Where n denotes the number of nodes, h(n) the height of the tree
-** with n nodes and log is the binary logarithm.
-*/
-
-#define STACK_NEED 50
#define TREE_MAX_ELEMENTS 0xFFFFFFFFUL
-#define PUSH_NODE(Dtt, Tdt) \
- ((Dtt)->array[(Dtt)->pos++] = Tdt)
-
-#define POP_NODE(Dtt) \
- (((Dtt)->pos) ? \
- (Dtt)->array[--((Dtt)->pos)] : NULL)
-
-#define TOP_NODE(Dtt) \
- ((Dtt->pos) ? \
- (Dtt)->array[(Dtt)->pos - 1] : NULL)
-
#define TOPN_NODE(Dtt, Pos) \
(((Pos) < Dtt->pos) ? \
(Dtt)->array[(Dtt)->pos - ((Pos) + 1)] : NULL)
@@ -89,9 +68,9 @@
/* Obtain table static stack if available. NULL if not.
** Must be released with release_stack()
*/
-static DbTreeStack* get_static_stack(DbTableTree* tb)
+ERTS_INLINE static DbTreeStack* get_static_stack(DbTableTree* tb)
{
- if (!erts_atomic_xchg_acqb(&tb->is_stack_busy, 1)) {
+ if (tb != NULL && !erts_atomic_xchg_acqb(&tb->is_stack_busy, 1)) {
return &tb->static_stack;
}
return NULL;
@@ -100,13 +79,14 @@ static DbTreeStack* get_static_stack(DbTableTree* tb)
/* Obtain static stack if available, otherwise empty dynamic stack.
** Must be released with release_stack()
*/
-static DbTreeStack* get_any_stack(DbTableTree* tb)
+static DbTreeStack* get_any_stack(DbTable* tb, DbTableTree* stack_container)
{
DbTreeStack* stack;
- if (!erts_atomic_xchg_acqb(&tb->is_stack_busy, 1)) {
- return &tb->static_stack;
+ if (stack_container != NULL &&
+ !erts_atomic_xchg_acqb(&stack_container->is_stack_busy, 1)) {
+ return &stack_container->static_stack;
}
- stack = erts_db_alloc(ERTS_ALC_T_DB_STK, (DbTable *) tb,
+ stack = erts_db_alloc(ERTS_ALC_T_DB_STK, tb,
sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
stack->pos = 0;
stack->slot = 0;
@@ -114,62 +94,59 @@ static DbTreeStack* get_any_stack(DbTableTree* tb)
return stack;
}
-static void release_stack(DbTableTree* tb, DbTreeStack* stack)
+static void release_stack(DbTable* tb, DbTableTree* stack_container, DbTreeStack* stack)
{
- if (stack == &tb->static_stack) {
- ASSERT(erts_atomic_read_nob(&tb->is_stack_busy) == 1);
- erts_atomic_set_relb(&tb->is_stack_busy, 0);
+ if (stack_container != NULL && stack == &stack_container->static_stack) {
+ ASSERT(erts_atomic_read_nob(&stack_container->is_stack_busy) == 1);
+ erts_atomic_set_relb(&stack_container->is_stack_busy, 0);
}
else {
- erts_db_free(ERTS_ALC_T_DB_STK, (DbTable *) tb,
+ erts_db_free(ERTS_ALC_T_DB_STK, tb,
(void *) stack, sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
}
}
-static ERTS_INLINE void reset_static_stack(DbTableTree* tb)
+static ERTS_INLINE void reset_stack(DbTreeStack* stack)
{
- tb->static_stack.pos = 0;
- tb->static_stack.slot = 0;
+ if (stack != NULL) {
+ stack->pos = 0;
+ stack->slot = 0;
+ }
}
-static ERTS_INLINE void free_term(DbTableTree *tb, TreeDbTerm* p)
+static ERTS_INLINE void reset_static_stack(DbTableTree* tb)
{
- db_free_term((DbTable*)tb, p, offsetof(TreeDbTerm, dbterm));
+ if (tb != NULL) {
+ reset_stack(&tb->static_stack);
+ }
}
-static ERTS_INLINE TreeDbTerm* new_dbterm(DbTableTree *tb, Eterm obj)
+static ERTS_INLINE TreeDbTerm* new_dbterm(DbTableCommon *tb, Eterm obj)
{
TreeDbTerm* p;
- if (tb->common.compress) {
- p = db_store_term_comp(&tb->common, NULL, offsetof(TreeDbTerm,dbterm), obj);
+ if (tb->compress) {
+ p = db_store_term_comp(tb, NULL, offsetof(TreeDbTerm,dbterm), obj);
}
else {
- p = db_store_term(&tb->common, NULL, offsetof(TreeDbTerm,dbterm), obj);
+ p = db_store_term(tb, NULL, offsetof(TreeDbTerm,dbterm), obj);
}
return p;
}
-static ERTS_INLINE TreeDbTerm* replace_dbterm(DbTableTree *tb, TreeDbTerm* old,
+static ERTS_INLINE TreeDbTerm* replace_dbterm(DbTableCommon *tb, TreeDbTerm* old,
Eterm obj)
{
TreeDbTerm* p;
ASSERT(old != NULL);
- if (tb->common.compress) {
- p = db_store_term_comp(&tb->common, &(old->dbterm), offsetof(TreeDbTerm,dbterm), obj);
+ if (tb->compress) {
+ p = db_store_term_comp(tb, &(old->dbterm), offsetof(TreeDbTerm,dbterm), obj);
}
else {
- p = db_store_term(&tb->common, &(old->dbterm), offsetof(TreeDbTerm,dbterm), obj);
+ p = db_store_term(tb, &(old->dbterm), offsetof(TreeDbTerm,dbterm), obj);
}
return p;
}
/*
-** Some macros for "direction stacks"
-*/
-#define DIR_LEFT 0
-#define DIR_RIGHT 1
-#define DIR_END 2
-
-/*
* Number of records to delete before trapping.
*/
#define DELETE_RECORD_LIMIT 12000
@@ -262,7 +239,9 @@ struct select_count_context {
*/
struct select_delete_context {
Process *p;
- DbTableTree *tb;
+ DbTableCommon *tb;
+ TreeDbTerm **root;
+ DbTreeStack *stack;
Uint accum;
Binary *mp;
Eterm end_condition;
@@ -277,7 +256,8 @@ struct select_delete_context {
*/
struct select_replace_context {
Process *p;
- DbTableTree *tb;
+ DbTableCommon *tb;
+ TreeDbTerm **root;
Binary *mp;
Eterm end_condition;
Eterm *lastobj;
@@ -292,73 +272,82 @@ typedef int (*extra_match_validator_t)(int keypos, Eterm match, Eterm guard, Ete
/*
** Forward declarations
*/
-static TreeDbTerm *linkout_tree(DbTableTree *tb, Eterm key);
-static TreeDbTerm *linkout_object_tree(DbTableTree *tb,
- Eterm object);
+static TreeDbTerm *linkout_tree(DbTableCommon *tb, TreeDbTerm **root,
+ Eterm key, DbTreeStack *stack);
+static TreeDbTerm *linkout_object_tree(DbTableCommon *tb, TreeDbTerm **root,
+ Eterm object, DbTableTree *stack);
static SWord do_free_tree_continue(DbTableTree *tb, SWord reds);
-static void free_term(DbTableTree *tb, TreeDbTerm* p);
-static int balance_left(TreeDbTerm **this);
-static int balance_right(TreeDbTerm **this);
+static void free_term(DbTable *tb, TreeDbTerm* p);
+int tree_balance_left(TreeDbTerm **this);
+int tree_balance_right(TreeDbTerm **this);
static int delsub(TreeDbTerm **this);
-static TreeDbTerm *slot_search(Process *p, DbTableTree *tb, Sint slot);
-static TreeDbTerm *find_node(DbTableTree *tb, Eterm key);
-static TreeDbTerm **find_node2(DbTableTree *tb, Eterm key);
-static TreeDbTerm **find_ptr(DbTableTree *tb, DbTreeStack*, TreeDbTerm *this);
-static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack*, Eterm key);
-static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack*, Eterm key);
-static TreeDbTerm *find_next_from_pb_key(DbTableTree *tb, DbTreeStack*,
- Eterm key);
-static TreeDbTerm *find_prev_from_pb_key(DbTableTree *tb, DbTreeStack*,
- Eterm key);
-static void traverse_backwards(DbTableTree *tb,
+static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root, Sint slot,
+ DbTable *tb, DbTableTree *stack_container);
+static TreeDbTerm *find_node(DbTableCommon *tb, TreeDbTerm *root,
+ Eterm key, DbTableTree *stack_container);
+static TreeDbTerm **find_node2(DbTableCommon *tb, TreeDbTerm **root, Eterm key);
+static TreeDbTerm **find_ptr(DbTableCommon *tb, TreeDbTerm **root,
+ DbTreeStack *stack, TreeDbTerm *this);
+static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root,
+ DbTreeStack* stack, Eterm key);
+static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root,
+ DbTreeStack* stack, Eterm key);
+static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
+ DbTreeStack* stack, Eterm key);
+static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
+ DbTreeStack* stack, Eterm key);
+static void traverse_backwards(DbTableCommon *tb,
+ TreeDbTerm **root,
DbTreeStack*,
Eterm lastkey,
- int (*doit)(DbTableTree *tb,
+ int (*doit)(DbTableCommon *,
TreeDbTerm *,
void *,
int),
void *context);
-static void traverse_forward(DbTableTree *tb,
+static void traverse_forward(DbTableCommon *tb,
+ TreeDbTerm **root,
DbTreeStack*,
Eterm lastkey,
- int (*doit)(DbTableTree *tb,
+ int (*doit)(DbTableCommon *tb,
TreeDbTerm *,
void *,
int),
void *context);
-static void traverse_update_backwards(DbTableTree *tb,
+static void traverse_update_backwards(DbTableCommon *tb,
+ TreeDbTerm **root,
DbTreeStack*,
Eterm lastkey,
- int (*doit)(DbTableTree *tb,
+ int (*doit)(DbTableCommon *tb,
TreeDbTerm **, // out
void *,
int),
void *context);
-static int key_given(DbTableTree *tb, Eterm pattern, TreeDbTerm ***ret,
- Eterm *partly_bound_key);
+static int key_given(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
+ TreeDbTerm ***ret, Eterm *partly_bound);
static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key);
static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done);
-static int analyze_pattern(DbTableTree *tb, Eterm pattern,
+static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
extra_match_validator_t extra_validator, /* Optional callback */
struct mp_info *mpi);
-static int doit_select(DbTableTree *tb,
+static int doit_select(DbTableCommon *tb,
TreeDbTerm *this,
void *ptr,
int forward);
-static int doit_select_count(DbTableTree *tb,
+static int doit_select_count(DbTableCommon *tb,
TreeDbTerm *this,
void *ptr,
int forward);
-static int doit_select_chunk(DbTableTree *tb,
+static int doit_select_chunk(DbTableCommon *tb,
TreeDbTerm *this,
void *ptr,
int forward);
-static int doit_select_delete(DbTableTree *tb,
+static int doit_select_delete(DbTableCommon *tb,
TreeDbTerm *this,
void *ptr,
int forward);
-static int doit_select_replace(DbTableTree *tb,
+static int doit_select_replace(DbTableCommon *tb,
TreeDbTerm **this_ptr,
void *ptr,
int forward);
@@ -508,18 +497,18 @@ int db_create_tree(Process *p, DbTable *tbl)
return DB_ERROR_NONE;
}
-static int db_first_tree(Process *p, DbTable *tbl, Eterm *ret)
+int db_first_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
+ Eterm *ret, DbTableTree *stack_container)
{
- DbTableTree *tb = &tbl->tree;
DbTreeStack* stack;
TreeDbTerm *this;
- if (( this = tb->root ) == NULL) {
+ if (( this = root ) == NULL) {
*ret = am_EOT;
return DB_ERROR_NONE;
}
/* Walk down the tree to the left */
- if ((stack = get_static_stack(tb)) != NULL) {
+ if ((stack = get_static_stack(stack_container)) != NULL) {
stack->pos = stack->slot = 0;
}
while (this->left != NULL) {
@@ -529,23 +518,27 @@ static int db_first_tree(Process *p, DbTable *tbl, Eterm *ret)
if (stack) {
PUSH_NODE(stack, this);
stack->slot = 1;
- release_stack(tb,stack);
+ release_stack(tbl,stack_container,stack);
}
*ret = db_copy_key(p, tbl, &this->dbterm);
return DB_ERROR_NONE;
}
-static int db_next_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+static int db_first_tree(Process *p, DbTable *tbl, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
- DbTreeStack* stack;
+ return db_first_tree_common(p, tbl, tb->root, ret, tb);
+}
+
+int db_next_tree_common(Process *p, DbTable *tbl,
+ TreeDbTerm *root, Eterm key,
+ Eterm *ret, DbTreeStack* stack)
+{
TreeDbTerm *this;
if (is_atom(key) && key == am_EOT)
return DB_ERROR_BADKEY;
- stack = get_any_stack(tb);
- this = find_next(tb, stack, key);
- release_stack(tb,stack);
+ this = find_next(&tbl->common, root, stack, key);
if (this == NULL) {
*ret = am_EOT;
return DB_ERROR_NONE;
@@ -554,18 +547,27 @@ static int db_next_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
return DB_ERROR_NONE;
}
-static int db_last_tree(Process *p, DbTable *tbl, Eterm *ret)
+static int db_next_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ DbTreeStack* stack = get_any_stack(tbl, tb);
+ int ret_val = db_next_tree_common(p, tbl, tb->root, key, ret, stack);
+ release_stack(tbl,tb,stack);
+ return ret_val;
+}
+
+int db_last_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
+ Eterm *ret, DbTableTree *stack_container)
+{
TreeDbTerm *this;
DbTreeStack* stack;
- if (( this = tb->root ) == NULL) {
+ if (( this = root ) == NULL) {
*ret = am_EOT;
return DB_ERROR_NONE;
}
/* Walk down the tree to the right */
- if ((stack = get_static_stack(tb)) != NULL) {
+ if ((stack = get_static_stack(stack_container)) != NULL) {
stack->pos = stack->slot = 0;
}
while (this->right != NULL) {
@@ -574,24 +576,27 @@ static int db_last_tree(Process *p, DbTable *tbl, Eterm *ret)
}
if (stack) {
PUSH_NODE(stack, this);
- stack->slot = NITEMS(tb);
- release_stack(tb,stack);
+ stack->slot = NITEMS(tbl);
+ release_stack(tbl,stack_container,stack);
}
*ret = db_copy_key(p, tbl, &this->dbterm);
return DB_ERROR_NONE;
}
-static int db_prev_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+static int db_last_tree(Process *p, DbTable *tbl, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_last_tree_common(p, tbl, tb->root, ret, tb);
+}
+
+int db_prev_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, Eterm key,
+ Eterm *ret, DbTreeStack* stack)
+{
TreeDbTerm *this;
- DbTreeStack* stack;
if (is_atom(key) && key == am_EOT)
return DB_ERROR_BADKEY;
- stack = get_any_stack(tb);
- this = find_prev(tb, stack, key);
- release_stack(tb,stack);
+ this = find_prev(&tbl->common, root, stack, key);
if (this == NULL) {
*ret = am_EOT;
return DB_ERROR_NONE;
@@ -600,25 +605,30 @@ static int db_prev_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
return DB_ERROR_NONE;
}
-static ERTS_INLINE Sint cmp_key(DbTableTree* tb, Eterm key, TreeDbTerm* obj) {
- return CMP(key, GETKEY(tb,obj->dbterm.tpl));
+static int db_prev_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ DbTreeStack* stack = get_any_stack(tbl, tb);
+ int res = db_prev_tree_common(p, tbl, tb->root, key, ret, stack);
+ release_stack(tbl,tb,stack);
+ return res;
}
-static ERTS_INLINE int cmp_key_eq(DbTableTree* tb, Eterm key, TreeDbTerm* obj) {
+static ERTS_INLINE int cmp_key_eq(DbTableCommon* tb, Eterm key, TreeDbTerm* obj) {
Eterm obj_key = GETKEY(tb,obj->dbterm.tpl);
return is_same(key, obj_key) || CMP(key, obj_key) == 0;
}
-static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail)
+int db_put_tree_common(DbTableCommon *tb, TreeDbTerm **root, Eterm obj,
+ int key_clash_fail, DbTableTree *stack_container)
{
- DbTableTree *tb = &tbl->tree;
/* Non recursive insertion in AVL tree, building our own stack */
TreeDbTerm **tstack[STACK_NEED];
int tpos = 0;
int dstack[STACK_NEED+1];
int dpos = 0;
int state = 0;
- TreeDbTerm **this = &tb->root;
+ TreeDbTerm **this = root;
Sint c;
Eterm key;
int dir;
@@ -626,14 +636,14 @@ static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail)
key = GETKEY(tb, tuple_val(obj));
- reset_static_stack(tb);
+ reset_static_stack(stack_container);
dstack[dpos++] = DIR_END;
for (;;)
if (!*this) { /* Found our place */
state = 1;
- if (erts_atomic_inc_read_nob(&tb->common.nitems) >= TREE_MAX_ELEMENTS) {
- erts_atomic_dec_nob(&tb->common.nitems);
+ if (erts_atomic_inc_read_nob(&tb->nitems) >= TREE_MAX_ELEMENTS) {
+ erts_atomic_dec_nob(&tb->nitems);
return DB_ERROR_SYSRES;
}
*this = new_dbterm(tb, obj);
@@ -724,9 +734,15 @@ static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail)
return DB_ERROR_NONE;
}
-static int db_get_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail)
{
DbTableTree *tb = &tbl->tree;
+ return db_put_tree_common(&tb->common, &tb->root, obj, key_clash_fail, tb);
+}
+
+int db_get_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm *root, Eterm key,
+ Eterm *ret, DbTableTree *stack_container)
+{
Eterm copy;
Eterm *hp, *hend;
TreeDbTerm *this;
@@ -737,13 +753,13 @@ static int db_get_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
* The list created around it is purely for interface conformance.
*/
- this = find_node(tb,key);
+ this = find_node(tb,root,key,stack_container);
if (this == NULL) {
*ret = NIL;
} else {
hp = HAlloc(p, this->dbterm.size + 2);
hend = hp + this->dbterm.size + 2;
- copy = db_copy_object_from_ets(&tb->common, &this->dbterm, &hp, &MSO(p));
+ copy = db_copy_object_from_ets(tb, &this->dbterm, &hp, &MSO(p));
*ret = CONS(hp, copy, NIL);
hp += 2;
HRelease(p,hend,hp);
@@ -751,18 +767,28 @@ static int db_get_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
return DB_ERROR_NONE;
}
-static int db_member_tree(DbTable *tbl, Eterm key, Eterm *ret)
+static int db_get_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_get_tree_common(p, &tb->common, tb->root, key, ret, tb);
+}
- *ret = (find_node(tb,key) == NULL) ? am_false : am_true;
+int db_member_tree_common(DbTableCommon *tb, TreeDbTerm *root, Eterm key, Eterm *ret,
+ DbTableTree *stack_container)
+{
+ *ret = (find_node(tb,root,key,stack_container) == NULL) ? am_false : am_true;
return DB_ERROR_NONE;
}
-static int db_get_element_tree(Process *p, DbTable *tbl,
- Eterm key, int ndex, Eterm *ret)
+static int db_member_tree(DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_member_tree_common(&tb->common, tb->root, key, ret, tb);
+}
+
+int db_get_element_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm *root, Eterm key,
+ int ndex, Eterm *ret, DbTableTree *stack_container)
+{
/*
* Look the node up:
*/
@@ -776,49 +802,68 @@ static int db_get_element_tree(Process *p, DbTable *tbl,
* around the element here either.
*/
- this = find_node(tb,key);
+ this = find_node(tb,root,key,stack_container);
if (this == NULL) {
return DB_ERROR_BADKEY;
} else {
if (ndex > arityval(this->dbterm.tpl[0])) {
return DB_ERROR_BADPARAM;
}
- *ret = db_copy_element_from_ets(&tb->common, p, &this->dbterm, ndex, &hp, 0);
+ *ret = db_copy_element_from_ets(tb, p, &this->dbterm, ndex, &hp, 0);
}
return DB_ERROR_NONE;
}
-static int db_erase_tree(DbTable *tbl, Eterm key, Eterm *ret)
+static int db_get_element_tree(Process *p, DbTable *tbl,
+ Eterm key, int ndex, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_get_element_tree_common(p, &tb->common, tb->root, key,
+ ndex, ret, tb);
+}
+
+int db_erase_tree_common(DbTable *tbl, TreeDbTerm **root, Eterm key, Eterm *ret,
+ DbTreeStack *stack /* NULL if no static stack */)
+{
TreeDbTerm *res;
*ret = am_true;
- if ((res = linkout_tree(tb, key)) != NULL) {
- free_term(tb, res);
+ if ((res = linkout_tree(&tbl->common, root,key, stack)) != NULL) {
+ free_term(tbl, res);
}
return DB_ERROR_NONE;
}
-static int db_erase_object_tree(DbTable *tbl, Eterm object, Eterm *ret)
+static int db_erase_tree(DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_erase_tree_common(tbl, &tb->root, key, ret, &tb->static_stack);
+}
+
+int db_erase_object_tree_common(DbTable *tbl, TreeDbTerm **root, Eterm object,
+ Eterm *ret, DbTableTree *stack_container)
+{
TreeDbTerm *res;
*ret = am_true;
- if ((res = linkout_object_tree(tb, object)) != NULL) {
- free_term(tb, res);
+ if ((res = linkout_object_tree(&tbl->common, root, object, stack_container)) != NULL) {
+ free_term(tbl, res);
}
return DB_ERROR_NONE;
}
-
-static int db_slot_tree(Process *p, DbTable *tbl,
- Eterm slot_term, Eterm *ret)
+static int db_erase_object_tree(DbTable *tbl, Eterm object, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_erase_object_tree_common(tbl, &tb->root, object, ret, tb);
+}
+
+int db_slot_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
+ Eterm slot_term, Eterm *ret,
+ DbTableTree *stack_container)
+{
Sint slot;
TreeDbTerm *st;
Eterm *hp, *hend;
@@ -834,10 +879,10 @@ static int db_slot_tree(Process *p, DbTable *tbl,
if (is_not_small(slot_term) ||
((slot = signed_val(slot_term)) < 0) ||
- (slot > NITEMS(tb)))
+ (slot > NITEMS(tbl)))
return DB_ERROR_BADPARAM;
- if (slot == NITEMS(tb)) {
+ if (slot == NITEMS(tbl)) {
*ret = am_EOT;
return DB_ERROR_NONE;
}
@@ -847,20 +892,27 @@ static int db_slot_tree(Process *p, DbTable *tbl,
* are counted from 1 and up.
*/
++slot;
- st = slot_search(p, tb, slot);
+ st = slot_search(p, root, slot, tbl, stack_container);
if (st == NULL) {
*ret = am_false;
return DB_ERROR_UNSPEC;
}
hp = HAlloc(p, st->dbterm.size + 2);
hend = hp + st->dbterm.size + 2;
- copy = db_copy_object_from_ets(&tb->common, &st->dbterm, &hp, &MSO(p));
+ copy = db_copy_object_from_ets(&tbl->common, &st->dbterm, &hp, &MSO(p));
*ret = CONS(hp, copy, NIL);
hp += 2;
HRelease(p,hend,hp);
return DB_ERROR_NONE;
}
+static int db_slot_tree(Process *p, DbTable *tbl,
+ Eterm slot_term, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ return db_slot_tree_common(p, tbl, tb->root, slot_term, ret, tb);
+}
+
static BIF_RETTYPE ets_select_reverse(BIF_ALIST_3)
@@ -926,19 +978,14 @@ static BIF_RETTYPE bif_trap3(Export *bif,
{
BIF_TRAP3(bif, p, p1, p2, p3);
}
-
-/*
-** This is called either when the select bif traps or when ets:select/1
-** is called. It does mostly the same as db_select_tree and may in either case
-** trap to itself again (via the ets:select/1 bif).
-** Note that this is common for db_select_tree and db_select_chunk_tree.
-*/
-static int db_select_continue_tree(Process *p,
- DbTable *tbl,
- Eterm continuation,
- Eterm *ret)
+
+int db_select_continue_tree_common(Process *p,
+ DbTableCommon *tb,
+ TreeDbTerm **root,
+ Eterm continuation,
+ Eterm *ret,
+ DbTableTree *stack_container)
{
- DbTableTree *tb = &tbl->tree;
DbTreeStack* stack;
struct select_context sc;
unsigned sz;
@@ -980,26 +1027,26 @@ static int db_select_continue_tree(Process *p,
sc.end_condition = NIL;
sc.lastobj = NULL;
sc.max = 1000;
- sc.keypos = tb->common.keypos;
+ sc.keypos = tb->keypos;
sc.chunk_size = chunk_size;
reverse = unsigned_val(tptr[7]);
sc.got = signed_val(tptr[8]);
- stack = get_any_stack(tb);
+ stack = get_any_stack((DbTable*)tb, stack_container);
if (chunk_size) {
if (reverse) {
- traverse_backwards(tb, stack, lastkey, &doit_select_chunk, &sc);
+ traverse_backwards(tb, root, stack, lastkey, &doit_select_chunk, &sc);
} else {
- traverse_forward(tb, stack, lastkey, &doit_select_chunk, &sc);
+ traverse_forward(tb, root, stack, lastkey, &doit_select_chunk, &sc);
}
} else {
if (reverse) {
- traverse_forward(tb, stack, lastkey, &doit_select, &sc);
+ traverse_forward(tb, root, stack, lastkey, &doit_select, &sc);
} else {
- traverse_backwards(tb, stack, lastkey, &doit_select, &sc);
+ traverse_backwards(tb, root, stack, lastkey, &doit_select, &sc);
}
}
- release_stack(tb,stack);
+ release_stack((DbTable*)tb,stack_container,stack);
BUMP_REDS(p, 1000 - sc.max);
@@ -1082,13 +1129,28 @@ static int db_select_continue_tree(Process *p,
#undef RET_TO_BIF
}
+
+/*
+** This is called either when the select bif traps or when ets:select/1
+** is called. It does mostly the same as db_select_tree and may in either case
+** trap to itself again (via the ets:select/1 bif).
+** Note that this is common for db_select_tree and db_select_chunk_tree.
+*/
+static int db_select_continue_tree(Process *p,
+ DbTable *tbl,
+ Eterm continuation,
+ Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ return db_select_continue_tree_common(p, &tb->common, &tb->root,
+ continuation, ret, tb);
+}
-
-static int db_select_tree(Process *p, DbTable *tbl, Eterm tid,
- Eterm pattern, int reverse, Eterm *ret)
+int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+ Eterm tid, Eterm pattern, int reverse, Eterm *ret,
+ DbTableTree *stack_container)
{
/* Strategy: Traverse backwards to build resulting list from tail to head */
- DbTableTree *tb = &tbl->tree;
DbTreeStack* stack;
struct select_context sc;
struct mp_info mpi;
@@ -1117,11 +1179,11 @@ static int db_select_tree(Process *p, DbTable *tbl, Eterm tid,
sc.p = p;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->common.keypos;
+ sc.keypos = tb->keypos;
sc.got = 0;
sc.chunk_size = 0;
- if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(tb, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
@@ -1138,25 +1200,25 @@ static int db_select_tree(Process *p, DbTable *tbl, Eterm tid,
RET_TO_BIF(sc.accum,DB_ERROR_NONE);
}
- stack = get_any_stack(tb);
+ stack = get_any_stack((DbTable*)tb,stack_container);
if (reverse) {
if (mpi.some_limitation) {
- if ((this = find_prev_from_pb_key(tb, stack, mpi.least)) != NULL) {
+ if ((this = find_prev_from_pb_key(tb, *root, stack, mpi.least)) != NULL) {
lastkey = GETKEY(tb, this->dbterm.tpl);
}
sc.end_condition = mpi.most;
}
- traverse_forward(tb, stack, lastkey, &doit_select, &sc);
+ traverse_forward(tb, root, stack, lastkey, &doit_select, &sc);
} else {
if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) {
+ if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
lastkey = GETKEY(tb, this->dbterm.tpl);
}
sc.end_condition = mpi.least;
}
- traverse_backwards(tb, stack, lastkey, &doit_select, &sc);
+ traverse_backwards(tb, root, stack, lastkey, &doit_select, &sc);
}
- release_stack(tb,stack);
+ release_stack((DbTable*)tb,stack_container,stack);
#ifdef HARDDEBUG
erts_fprintf(stderr,"Least: %T\n", mpi.least);
erts_fprintf(stderr,"Most: %T\n", mpi.most);
@@ -1192,16 +1254,21 @@ static int db_select_tree(Process *p, DbTable *tbl, Eterm tid,
}
-
-/*
-** This is called either when the select_count bif traps.
-*/
-static int db_select_count_continue_tree(Process *p,
- DbTable *tbl,
- Eterm continuation,
- Eterm *ret)
+static int db_select_tree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, int reverse, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_select_tree_common(p, &tb->common, &tb->root, tid,
+ pattern, reverse, ret, tb);
+}
+
+int db_select_count_continue_tree_common(Process *p,
+ DbTableCommon *tb,
+ TreeDbTerm **root,
+ Eterm continuation,
+ Eterm *ret,
+ DbTableTree *stack_container)
+{
DbTreeStack* stack;
struct select_count_context sc;
unsigned sz;
@@ -1238,16 +1305,16 @@ static int db_select_count_continue_tree(Process *p,
sc.end_condition = NIL;
sc.lastobj = NULL;
sc.max = 1000;
- sc.keypos = tb->common.keypos;
+ sc.keypos = tb->keypos;
if (is_big(tptr[5])) {
sc.got = big_to_uint32(tptr[5]);
} else {
sc.got = unsigned_val(tptr[5]);
}
- stack = get_any_stack(tb);
- traverse_backwards(tb, stack, lastkey, &doit_select_count, &sc);
- release_stack(tb,stack);
+ stack = get_any_stack((DbTable*)tb, stack_container);
+ traverse_backwards(tb, root, stack, lastkey, &doit_select_count, &sc);
+ release_stack((DbTable*)tb,stack_container,stack);
BUMP_REDS(p, 1000 - sc.max);
@@ -1285,11 +1352,24 @@ static int db_select_count_continue_tree(Process *p,
#undef RET_TO_BIF
}
-
-static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid,
- Eterm pattern, Eterm *ret)
+/*
+** This is called either when the select_count bif traps.
+*/
+static int db_select_count_continue_tree(Process *p,
+ DbTable *tbl,
+ Eterm continuation,
+ Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_select_count_continue_tree_common(p, &tb->common, &tb->root,
+ continuation, ret, tb);
+}
+
+
+int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+ Eterm tid, Eterm pattern, Eterm *ret,
+ DbTableTree *stack_container)
+{
DbTreeStack* stack;
struct select_count_context sc;
struct mp_info mpi;
@@ -1318,10 +1398,10 @@ static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid,
sc.p = p;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->common.keypos;
+ sc.keypos = tb->keypos;
sc.got = 0;
- if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(tb, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
@@ -1338,16 +1418,16 @@ static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid,
RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE);
}
- stack = get_any_stack(tb);
+ stack = get_any_stack((DbTable*)tb, stack_container);
if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) {
+ if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
lastkey = GETKEY(tb, this->dbterm.tpl);
}
sc.end_condition = mpi.least;
}
- traverse_backwards(tb, stack, lastkey, &doit_select_count, &sc);
- release_stack(tb,stack);
+ traverse_backwards(tb, root, stack, lastkey, &doit_select_count, &sc);
+ release_stack((DbTable*)tb,stack_container,stack);
BUMP_REDS(p, 1000 - sc.max);
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE);
@@ -1383,12 +1463,20 @@ static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid,
}
-static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
- Eterm pattern, Sint chunk_size,
- int reverse,
- Eterm *ret)
+static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_select_count_tree_common(p, &tb->common, &tb->root,
+ tid, pattern, ret, tb);
+}
+
+
+int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+ Eterm tid, Eterm pattern, Sint chunk_size,
+ int reverse, Eterm *ret,
+ DbTableTree *stack_container)
+{
DbTreeStack* stack;
struct select_context sc;
struct mp_info mpi;
@@ -1417,11 +1505,11 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
sc.p = p;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->common.keypos;
+ sc.keypos = tb->keypos;
sc.got = 0;
sc.chunk_size = chunk_size;
- if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(tb, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
@@ -1443,25 +1531,25 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
}
}
- stack = get_any_stack(tb);
+ stack = get_any_stack((DbTable*)tb,stack_container);
if (reverse) {
if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) {
+ if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
lastkey = GETKEY(tb, this->dbterm.tpl);
}
sc.end_condition = mpi.least;
}
- traverse_backwards(tb, stack, lastkey, &doit_select_chunk, &sc);
+ traverse_backwards(tb, root, stack, lastkey, &doit_select_chunk, &sc);
} else {
if (mpi.some_limitation) {
- if ((this = find_prev_from_pb_key(tb, stack, mpi.least)) != NULL) {
+ if ((this = find_prev_from_pb_key(tb, *root, stack, mpi.least)) != NULL) {
lastkey = GETKEY(tb, this->dbterm.tpl);
}
sc.end_condition = mpi.most;
}
- traverse_forward(tb, stack, lastkey, &doit_select_chunk, &sc);
+ traverse_forward(tb, root, stack, lastkey, &doit_select_chunk, &sc);
}
- release_stack(tb,stack);
+ release_stack((DbTable*)tb,stack_container,stack);
BUMP_REDS(p, 1000 - sc.max);
if (sc.max > 0 || sc.got == chunk_size) {
@@ -1530,15 +1618,25 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
}
-/*
-** This is called when select_delete traps
-*/
-static int db_select_delete_continue_tree(Process *p,
- DbTable *tbl,
- Eterm continuation,
- Eterm *ret)
+static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Sint chunk_size,
+ int reverse,
+ Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_select_chunk_tree_common(p, &tb->common, &tb->root,
+ tid, pattern, chunk_size,
+ reverse, ret, tb);
+}
+
+
+int db_select_delete_continue_tree_common(Process *p,
+ DbTable *tbl,
+ TreeDbTerm **root,
+ Eterm continuation,
+ Eterm *ret,
+ DbTreeStack* stack)
+{
struct select_delete_context sc;
unsigned sz;
Eterm *hp;
@@ -1552,7 +1650,7 @@ static int db_select_delete_continue_tree(Process *p,
#define RET_TO_BIF(Term, State) do { \
if (sc.erase_lastterm) { \
- free_term(tb, sc.lastterm); \
+ free_term(tbl, sc.lastterm); \
} \
*ret = (Term); \
return State; \
@@ -1571,7 +1669,9 @@ static int db_select_delete_continue_tree(Process *p,
mp = erts_db_get_match_prog_binary_unchecked(tptr[4]);
sc.p = p;
- sc.tb = tb;
+ sc.tb = &tbl->common;
+ sc.root = root;
+ sc.stack = stack;
if (is_big(tptr[5])) {
sc.accum = big_to_uint32(tptr[5]);
} else {
@@ -1580,17 +1680,16 @@ static int db_select_delete_continue_tree(Process *p,
sc.mp = mp;
sc.end_condition = NIL;
sc.max = 1000;
- sc.keypos = tb->common.keypos;
+ sc.keypos = tbl->common.keypos;
- ASSERT(!erts_atomic_read_nob(&tb->is_stack_busy));
- traverse_backwards(tb, &tb->static_stack, lastkey, &doit_select_delete, &sc);
+ traverse_backwards(&tbl->common, root, stack, lastkey, &doit_select_delete, &sc);
BUMP_REDS(p, 1000 - sc.max);
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.accum, p), DB_ERROR_NONE);
}
- key = GETKEY(tb, (sc.lastterm)->dbterm.tpl);
+ key = GETKEY(&tbl->common, (sc.lastterm)->dbterm.tpl);
if (end_condition != NIL &&
cmp_partly_bound(end_condition,key) > 0) { /* done anyway */
RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE);
@@ -1620,10 +1719,24 @@ static int db_select_delete_continue_tree(Process *p,
#undef RET_TO_BIF
}
-static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
- Eterm pattern, Eterm *ret)
+static int db_select_delete_continue_tree(Process *p,
+ DbTable *tbl,
+ Eterm continuation,
+ Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ ASSERT(!erts_atomic_read_nob(&tb->is_stack_busy));
+ return db_select_delete_continue_tree_common(p, tbl, &tb->root,
+ continuation, ret,
+ &tb->static_stack);
+}
+
+int db_select_delete_tree_common(Process *p, DbTable *tbl,
+ TreeDbTerm **root,
+ Eterm tid, Eterm pattern,
+ Eterm *ret,
+ DbTreeStack* stack)
+{
struct select_delete_context sc;
struct mp_info mpi;
Eterm lastkey = THE_NON_VALUE;
@@ -1641,7 +1754,7 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
erts_bin_free(mpi.mp); \
} \
if (sc.erase_lastterm) { \
- free_term(tb, sc.lastterm); \
+ free_term(tbl, sc.lastterm); \
} \
*ret = (Term); \
return RetVal; \
@@ -1655,10 +1768,12 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
sc.p = p;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->common.keypos;
- sc.tb = tb;
+ sc.keypos = tbl->common.keypos;
+ sc.tb = &tbl->common;
+ sc.root = root;
+ sc.stack = stack;
- if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(&tbl->common, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(0,errcode);
}
@@ -1671,26 +1786,26 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
if (!mpi.got_partial && mpi.some_limitation &&
CMP_EQ(mpi.least,mpi.most)) {
- doit_select_delete(tb,*(mpi.save_term),&sc, 0 /* direction doesn't
+ doit_select_delete(&tbl->common,*(mpi.save_term),&sc, 0 /* direction doesn't
matter */);
RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE);
}
if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(tb, &tb->static_stack, mpi.most)) != NULL) {
- lastkey = GETKEY(tb, this->dbterm.tpl);
+ if ((this = find_next_from_pb_key(&tbl->common, *root, stack, mpi.most)) != NULL) {
+ lastkey = GETKEY(&tbl->common, this->dbterm.tpl);
}
sc.end_condition = mpi.least;
}
- traverse_backwards(tb, &tb->static_stack, lastkey, &doit_select_delete, &sc);
+ traverse_backwards(&tbl->common, root, stack, lastkey, &doit_select_delete, &sc);
BUMP_REDS(p, 1000 - sc.max);
if (sc.max > 0) {
RET_TO_BIF(erts_make_integer(sc.accum,p), DB_ERROR_NONE);
}
- key = GETKEY(tb, (sc.lastterm)->dbterm.tpl);
+ key = GETKEY(&tbl->common, (sc.lastterm)->dbterm.tpl);
sz = size_object(key);
if (IS_USMALL(0, sc.accum)) {
hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6);
@@ -1714,7 +1829,7 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
/* Don't free mpi.mp, so don't use macro */
if (sc.erase_lastterm) {
- free_term(tb, sc.lastterm);
+ free_term(tbl, sc.lastterm);
}
*ret = bif_trap1(&ets_select_delete_continue_exp, p, continuation);
return DB_ERROR_NONE;
@@ -1723,12 +1838,21 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
}
-static int db_select_replace_continue_tree(Process *p,
- DbTable *tbl,
- Eterm continuation,
- Eterm *ret)
+static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_select_delete_tree_common(p, tbl, &tb->root, tid, pattern, ret,
+ &tb->static_stack);
+}
+
+int db_select_replace_continue_tree_common(Process *p,
+ DbTableCommon *tb,
+ TreeDbTerm **root,
+ Eterm continuation,
+ Eterm *ret,
+ DbTableTree *stack_container)
+{
DbTreeStack* stack;
struct select_replace_context sc;
unsigned sz;
@@ -1764,7 +1888,7 @@ static int db_select_replace_continue_tree(Process *p,
sc.end_condition = NIL;
sc.lastobj = NULL;
sc.max = 1000;
- sc.keypos = tb->common.keypos;
+ sc.keypos = tb->keypos;
if (is_big(tptr[5])) {
sc.replaced = big_to_uint32(tptr[5]);
} else {
@@ -1772,9 +1896,9 @@ static int db_select_replace_continue_tree(Process *p,
}
prev_replaced = sc.replaced;
- stack = get_any_stack(tb);
- traverse_update_backwards(tb, stack, lastkey, &doit_select_replace, &sc);
- release_stack(tb,stack);
+ stack = get_any_stack((DbTable*)tb,stack_container);
+ traverse_update_backwards(tb, root, stack, lastkey, &doit_select_replace, &sc);
+ release_stack((DbTable*)tb,stack_container,stack);
// the more objects we've replaced, the more reductions we've consumed
BUMP_REDS(p, MIN(2000, (1000 - sc.max) + (sc.replaced - prev_replaced)));
@@ -1813,10 +1937,20 @@ static int db_select_replace_continue_tree(Process *p,
#undef RET_TO_BIF
}
-static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid,
- Eterm pattern, Eterm *ret)
+static int db_select_replace_continue_tree(Process *p,
+ DbTable *tbl,
+ Eterm continuation,
+ Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_select_replace_continue_tree_common(p, &tb->common, &tb->root,
+ continuation, ret, tb);
+}
+
+int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+ Eterm tid, Eterm pattern, Eterm *ret,
+ DbTableTree *stack_container)
+{
DbTreeStack* stack;
struct select_replace_context sc;
struct mp_info mpi;
@@ -1844,12 +1978,13 @@ static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid,
sc.lastobj = NULL;
sc.p = p;
sc.tb = tb;
+ sc.root = root;
sc.max = 1000;
sc.end_condition = NIL;
- sc.keypos = tb->common.keypos;
+ sc.keypos = tb->keypos;
sc.replaced = 0;
- if ((errcode = analyze_pattern(tb, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) {
+ if ((errcode = analyze_pattern(tb, root, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) {
RET_TO_BIF(NIL,errcode);
}
@@ -1860,7 +1995,7 @@ static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid,
sc.mp = mpi.mp;
- stack = get_static_stack(tb);
+ stack = get_static_stack(stack_container);
if (!mpi.got_partial && mpi.some_limitation &&
CMP_EQ(mpi.least,mpi.most)) {
TreeDbTerm* term = *(mpi.save_term);
@@ -1869,23 +2004,23 @@ static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid,
if (TOP_NODE(stack) == term)
// throw away potentially invalid reference
REPLACE_TOP_NODE(stack, *(mpi.save_term));
- release_stack(tb, stack);
+ release_stack((DbTable*)tb,stack_container, stack);
}
RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE);
}
if (stack == NULL)
- stack = get_any_stack(tb);
+ stack = get_any_stack((DbTable*)tb,stack_container);
if (mpi.some_limitation) {
- if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) {
+ if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) {
lastkey = GETKEY(tb, this->dbterm.tpl);
}
sc.end_condition = mpi.least;
}
- traverse_update_backwards(tb, stack, lastkey, &doit_select_replace, &sc);
- release_stack(tb,stack);
+ traverse_update_backwards(tb, root, stack, lastkey, &doit_select_replace, &sc);
+ release_stack((DbTable*)tb,stack_container,stack);
// the more objects we've replaced, the more reductions we've consumed
BUMP_REDS(p, MIN(2000, (1000 - sc.max) + sc.replaced));
if (sc.max > 0) {
@@ -1922,52 +2057,73 @@ static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid,
}
-static int db_take_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret)
{
DbTableTree *tb = &tbl->tree;
+ return db_select_replace_tree_common(p, &tb->common, &tb->root,
+ tid, pattern, ret, tb);
+}
+
+int db_take_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root,
+ Eterm key, Eterm *ret,
+ DbTreeStack *stack /* NULL if no static stack */)
+{
TreeDbTerm *this;
*ret = NIL;
- this = linkout_tree(tb, key);
+ this = linkout_tree(&tbl->common, root, key, stack);
if (this) {
Eterm copy, *hp, *hend;
hp = HAlloc(p, this->dbterm.size + 2);
hend = hp + this->dbterm.size + 2;
- copy = db_copy_object_from_ets(&tb->common,
+ copy = db_copy_object_from_ets(&tbl->common,
&this->dbterm, &hp, &MSO(p));
*ret = CONS(hp, copy, NIL);
hp += 2;
HRelease(p, hend, hp);
- free_term(tb, this);
+ free_term(tbl, this);
}
return DB_ERROR_NONE;
}
+static int db_take_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ return db_take_tree_common(p, tbl, &tb->root,
+ key, ret, &tb->static_stack);
+}
+
/*
** Other interface routines (not directly coupled to one bif)
*/
-
-/* Display tree contents (for dump) */
-static void db_print_tree(fmtfn_t to, void *to_arg,
- int show,
- DbTable *tbl)
+void db_print_tree_common(fmtfn_t to, void *to_arg,
+ int show, TreeDbTerm *root, DbTable *tbl)
{
- DbTableTree *tb = &tbl->tree;
#ifdef TREE_DEBUG
if (show)
erts_print(to, to_arg, "\nTree data dump:\n"
"------------------------------------------------\n");
- do_dump_tree2(&tbl->tree, to, to_arg, show, tb->root, 0);
+ do_dump_tree2(&tbl->common, to, to_arg, show, root, 0);
if (show)
erts_print(to, to_arg, "\n"
"------------------------------------------------\n");
#else
- erts_print(to, to_arg, "Ordered set (AVL tree), Elements: %d\n", NITEMS(tb));
+ erts_print(to, to_arg, "Ordered set (AVL tree), Elements: %d\n", NITEMS(tbl));
#endif
}
+/* Display tree contents (for dump) */
+static void db_print_tree(fmtfn_t to, void *to_arg,
+ int show,
+ DbTable *tbl)
+{
+ DbTableTree *tb = &tbl->tree;
+ db_print_tree_common(to, to_arg, show, tb->root, tbl);
+}
+
/* release all memory occupied by a single table */
static int db_free_empty_table_tree(DbTable *tbl)
{
@@ -1992,8 +2148,10 @@ static SWord db_free_table_continue_tree(DbTable *tbl, SWord reds)
(DbTable *) tb,
(void *) tb->static_stack.array,
sizeof(TreeDbTerm *) * STACK_NEED);
- ASSERT(erts_atomic_read_nob(&tb->common.memory_size)
- == sizeof(DbTable));
+ ASSERT((erts_atomic_read_nob(&tb->common.memory_size)
+ == sizeof(DbTable)) ||
+ (erts_atomic_read_nob(&tb->common.memory_size)
+ == (sizeof(DbTable) + sizeof(DbFixation))));
}
return reds;
}
@@ -2012,11 +2170,18 @@ static void do_db_tree_foreach_offheap(TreeDbTerm *,
void (*)(ErlOffHeap *, void *),
void *);
+void db_foreach_offheap_tree_common(TreeDbTerm *root,
+ void (*func)(ErlOffHeap *, void *),
+ void * arg)
+{
+ do_db_tree_foreach_offheap(root, func, arg);
+}
+
static void db_foreach_offheap_tree(DbTable *tbl,
void (*func)(ErlOffHeap *, void *),
void * arg)
{
- do_db_tree_foreach_offheap(tbl->tree.root, func, arg);
+ db_foreach_offheap_tree_common(tbl->tree.root, func, arg);
}
@@ -2041,13 +2206,14 @@ do_db_tree_foreach_offheap(TreeDbTerm *tdbt,
do_db_tree_foreach_offheap(tdbt->right, func, arg);
}
-static TreeDbTerm *linkout_tree(DbTableTree *tb, Eterm key) {
+static TreeDbTerm *linkout_tree(DbTableCommon *tb, TreeDbTerm **root,
+ Eterm key, DbTreeStack *stack) {
TreeDbTerm **tstack[STACK_NEED];
int tpos = 0;
int dstack[STACK_NEED+1];
int dpos = 0;
int state = 0;
- TreeDbTerm **this = &tb->root;
+ TreeDbTerm **this = root;
Sint c;
int dir;
TreeDbTerm *q = NULL;
@@ -2058,7 +2224,7 @@ static TreeDbTerm *linkout_tree(DbTableTree *tb, Eterm key) {
* keep the balance. As in insert, we do the stacking ourselves.
*/
- reset_static_stack(tb);
+ reset_stack(stack);
dstack[dpos++] = DIR_END;
for (;;) {
if (!*this) { /* Failure */
@@ -2084,30 +2250,30 @@ static TreeDbTerm *linkout_tree(DbTableTree *tb, Eterm key) {
tstack[tpos++] = this;
state = delsub(this);
}
- erts_atomic_dec_nob(&tb->common.nitems);
+ erts_atomic_dec_nob(&tb->nitems);
break;
}
}
while (state && ( dir = dstack[--dpos] ) != DIR_END) {
this = tstack[--tpos];
if (dir == DIR_LEFT) {
- state = balance_left(this);
+ state = tree_balance_left(this);
} else {
- state = balance_right(this);
+ state = tree_balance_right(this);
}
}
return q;
}
-static TreeDbTerm *linkout_object_tree(DbTableTree *tb,
- Eterm object)
+static TreeDbTerm *linkout_object_tree(DbTableCommon *tb, TreeDbTerm **root,
+ Eterm object, DbTableTree *stack)
{
TreeDbTerm **tstack[STACK_NEED];
int tpos = 0;
int dstack[STACK_NEED+1];
int dpos = 0;
int state = 0;
- TreeDbTerm **this = &tb->root;
+ TreeDbTerm **this = root;
Sint c;
int dir;
TreeDbTerm *q = NULL;
@@ -2122,7 +2288,7 @@ static TreeDbTerm *linkout_object_tree(DbTableTree *tb,
key = GETKEY(tb, tuple_val(object));
- reset_static_stack(tb);
+ reset_static_stack(stack);
dstack[dpos++] = DIR_END;
for (;;) {
if (!*this) { /* Failure */
@@ -2136,7 +2302,7 @@ static TreeDbTerm *linkout_object_tree(DbTableTree *tb,
tstack[tpos++] = this;
this = &((*this)->right);
} else { /* Equal key, found the only possible matching object*/
- if (!db_eq(&tb->common,object,&(*this)->dbterm)) {
+ if (!db_eq(tb,object,&(*this)->dbterm)) {
return NULL;
}
q = (*this);
@@ -2151,16 +2317,16 @@ static TreeDbTerm *linkout_object_tree(DbTableTree *tb,
tstack[tpos++] = this;
state = delsub(this);
}
- erts_atomic_dec_nob(&tb->common.nitems);
+ erts_atomic_dec_nob(&tb->nitems);
break;
}
}
while (state && ( dir = dstack[--dpos] ) != DIR_END) {
this = tstack[--tpos];
if (dir == DIR_LEFT) {
- state = balance_left(this);
+ state = tree_balance_left(this);
} else {
- state = balance_right(this);
+ state = tree_balance_right(this);
}
}
return q;
@@ -2170,7 +2336,7 @@ static TreeDbTerm *linkout_object_tree(DbTableTree *tb,
** For the select functions, analyzes the pattern and determines which
** part of the tree should be searched. Also compiles the match program
*/
-static int analyze_pattern(DbTableTree *tb, Eterm pattern,
+static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
extra_match_validator_t extra_validator, /* Optional callback */
struct mp_info *mpi)
{
@@ -2231,7 +2397,7 @@ static int analyze_pattern(DbTableTree *tb, Eterm pattern,
guards[i] = guard = ptpl[2];
bodies[i] = body = ptpl[3];
- if(extra_validator != NULL && !extra_validator(tb->common.keypos, match, guard, body)) {
+ if(extra_validator != NULL && !extra_validator(tb->keypos, match, guard, body)) {
if (buff != sbuff) {
erts_free(ERTS_ALC_T_DB_TMP, buff);
}
@@ -2244,7 +2410,7 @@ static int analyze_pattern(DbTableTree *tb, Eterm pattern,
++i;
partly_bound = NIL;
- res = key_given(tb, tpl, &(mpi->save_term), &partly_bound);
+ res = key_given(tb, root, tpl, &(mpi->save_term), &partly_bound);
if ( res >= 0 ) { /* Can match something */
key = 0;
mpi->something_can_match = 1;
@@ -2308,7 +2474,7 @@ static SWord do_free_tree_continue(DbTableTree *tb, SWord reds)
PUSH_NODE(&tb->static_stack, root);
root = p;
} else {
- free_term(tb, root);
+ free_term((DbTable*)tb, root);
if (--reds < 0) {
return reds; /* Done enough for now */
}
@@ -2322,7 +2488,7 @@ static SWord do_free_tree_continue(DbTableTree *tb, SWord reds)
/*
* Deletion helpers
*/
-static int balance_left(TreeDbTerm **this)
+int tree_balance_left(TreeDbTerm **this)
{
TreeDbTerm *p, *p1, *p2;
int b1, b2, h = 1;
@@ -2367,7 +2533,7 @@ static int balance_left(TreeDbTerm **this)
return h;
}
-static int balance_right(TreeDbTerm **this)
+int tree_balance_right(TreeDbTerm **this)
{
TreeDbTerm *p, *p1, *p2;
int b1, b2, h = 1;
@@ -2439,7 +2605,7 @@ static int delsub(TreeDbTerm **this)
h = 1;
while (tpos && h) {
r = tstack[--tpos];
- h = balance_right(r);
+ h = tree_balance_right(r);
}
return h;
}
@@ -2448,11 +2614,13 @@ static int delsub(TreeDbTerm **this)
* Helper for db_slot
*/
-static TreeDbTerm *slot_search(Process *p, DbTableTree *tb, Sint slot)
+static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root,
+ Sint slot, DbTable *tb,
+ DbTableTree *stack_container)
{
TreeDbTerm *this;
TreeDbTerm *tmp;
- DbTreeStack* stack = get_any_stack(tb);
+ DbTreeStack* stack = get_any_stack(tb,stack_container);
ASSERT(stack != NULL);
if (slot == 1) { /* Don't search from where we are if we are
@@ -2465,7 +2633,7 @@ static TreeDbTerm *slot_search(Process *p, DbTableTree *tb, Sint slot)
stack->pos = 0;
}
if (EMPTY_NODE(stack)) {
- this = tb->root;
+ this = root;
if (this == NULL)
goto done;
while (this->left != NULL){
@@ -2514,7 +2682,7 @@ static TreeDbTerm *slot_search(Process *p, DbTableTree *tb, Sint slot)
}
}
done:
- release_stack(tb,stack);
+ release_stack(tb,stack_container,stack);
return this;
}
@@ -2522,7 +2690,8 @@ done:
* Find next and previous in sort order
*/
-static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack* stack, Eterm key) {
+static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root,
+ DbTreeStack* stack, Eterm key) {
TreeDbTerm *this;
TreeDbTerm *tmp;
Sint c;
@@ -2534,7 +2703,7 @@ static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack* stack, Eterm key) {
}
}
if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */
- if (( this = tb->root ) == NULL)
+ if (( this = root ) == NULL)
return NULL;
for (;;) {
PUSH_NODE(stack, this);
@@ -2578,7 +2747,8 @@ static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack* stack, Eterm key) {
return this;
}
-static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack* stack, Eterm key) {
+static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root,
+ DbTreeStack* stack, Eterm key) {
TreeDbTerm *this;
TreeDbTerm *tmp;
Sint c;
@@ -2590,7 +2760,7 @@ static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack* stack, Eterm key) {
}
}
if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */
- if (( this = tb->root ) == NULL)
+ if (( this = root ) == NULL)
return NULL;
for (;;) {
PUSH_NODE(stack, this);
@@ -2634,8 +2804,8 @@ static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack* stack, Eterm key) {
return this;
}
-static TreeDbTerm *find_next_from_pb_key(DbTableTree *tb, DbTreeStack* stack,
- Eterm key)
+static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
+ DbTreeStack* stack, Eterm key)
{
TreeDbTerm *this;
TreeDbTerm *tmp;
@@ -2643,7 +2813,7 @@ static TreeDbTerm *find_next_from_pb_key(DbTableTree *tb, DbTreeStack* stack,
/* spool the stack, we have to "re-search" */
stack->pos = stack->slot = 0;
- if (( this = tb->root ) == NULL)
+ if (( this = root ) == NULL)
return NULL;
for (;;) {
PUSH_NODE(stack, this);
@@ -2667,8 +2837,8 @@ static TreeDbTerm *find_next_from_pb_key(DbTableTree *tb, DbTreeStack* stack,
}
}
-static TreeDbTerm *find_prev_from_pb_key(DbTableTree *tb, DbTreeStack* stack,
- Eterm key)
+static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root,
+ DbTreeStack* stack, Eterm key)
{
TreeDbTerm *this;
TreeDbTerm *tmp;
@@ -2676,7 +2846,7 @@ static TreeDbTerm *find_prev_from_pb_key(DbTableTree *tb, DbTreeStack* stack,
/* spool the stack, we have to "re-search" */
stack->pos = stack->slot = 0;
- if (( this = tb->root ) == NULL)
+ if (( this = root ) == NULL)
return NULL;
for (;;) {
PUSH_NODE(stack, this);
@@ -2704,16 +2874,17 @@ static TreeDbTerm *find_prev_from_pb_key(DbTableTree *tb, DbTreeStack* stack,
/*
* Just lookup a node
*/
-static TreeDbTerm *find_node(DbTableTree *tb, Eterm key)
+static TreeDbTerm *find_node(DbTableCommon *tb, TreeDbTerm *root,
+ Eterm key, DbTableTree *stack_container)
{
TreeDbTerm *this;
Sint res;
- DbTreeStack* stack = get_static_stack(tb);
+ DbTreeStack* stack = get_static_stack(stack_container);
if(!stack || EMPTY_NODE(stack)
|| !cmp_key_eq(tb, key, (this=TOP_NODE(stack)))) {
- this = tb->root;
+ this = root;
while (this != NULL && (res = cmp_key(tb,key,this)) != 0) {
if (res < 0)
this = this->left;
@@ -2722,7 +2893,7 @@ static TreeDbTerm *find_node(DbTableTree *tb, Eterm key)
}
}
if (stack) {
- release_stack(tb,stack);
+ release_stack((DbTable*)tb,stack_container,stack);
}
return this;
}
@@ -2730,12 +2901,12 @@ static TreeDbTerm *find_node(DbTableTree *tb, Eterm key)
/*
* Lookup a node and return the address of the node pointer in the tree
*/
-static TreeDbTerm **find_node2(DbTableTree *tb, Eterm key)
+static TreeDbTerm **find_node2(DbTableCommon *tb, TreeDbTerm **root, Eterm key)
{
TreeDbTerm **this;
Sint res;
- this = &tb->root;
+ this = root;
while ((*this) != NULL && (res = cmp_key(tb, key, *this)) != 0) {
if (res < 0)
this = &((*this)->left);
@@ -2752,7 +2923,8 @@ static TreeDbTerm **find_node2(DbTableTree *tb, Eterm key)
* Tries to reuse the existing stack for performance.
*/
-static TreeDbTerm **find_ptr(DbTableTree *tb, DbTreeStack *stack, TreeDbTerm *this) {
+static TreeDbTerm **find_ptr(DbTableCommon *tb, TreeDbTerm **root,
+ DbTreeStack *stack, TreeDbTerm *this) {
Eterm key = GETKEY(tb, this->dbterm.tpl);
TreeDbTerm *tmp;
TreeDbTerm *parent;
@@ -2765,7 +2937,7 @@ static TreeDbTerm **find_ptr(DbTableTree *tb, DbTreeStack *stack, TreeDbTerm *th
}
}
if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */
- if (( tmp = tb->root ) == NULL)
+ if (( tmp = *root ) == NULL)
return NULL;
for (;;) {
PUSH_NODE(stack, tmp);
@@ -2791,7 +2963,7 @@ static TreeDbTerm **find_ptr(DbTableTree *tb, DbTreeStack *stack, TreeDbTerm *th
parent = TOPN_NODE(stack, 1);
if (parent == NULL)
- return ((this != tb->root) ? NULL : &(tb->root));
+ return ((this != *root) ? NULL : root);
if (parent->left == this)
return &(parent->left);
if (parent->right == this)
@@ -2799,12 +2971,11 @@ static TreeDbTerm **find_ptr(DbTableTree *tb, DbTreeStack *stack, TreeDbTerm *th
return NULL;
}
-static int
-db_lookup_dbterm_tree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
- DbUpdateHandle* handle)
+int db_lookup_dbterm_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root,
+ Eterm key, Eterm obj, DbUpdateHandle* handle,
+ DbTableTree *stack_container)
{
- DbTableTree *tb = &tbl->tree;
- TreeDbTerm **pp = find_node2(tb, key);
+ TreeDbTerm **pp = find_node2(&tbl->common, root, key);
int flags = 0;
if (pp == NULL) {
@@ -2815,18 +2986,19 @@ db_lookup_dbterm_tree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
int arity = arityval(*objp);
Eterm *htop, *hend;
- ASSERT(arity >= tb->common.keypos);
+ ASSERT(arity >= tbl->common.keypos);
htop = HAlloc(p, arity + 1);
hend = htop + arity + 1;
sys_memcpy(htop, objp, sizeof(Eterm) * (arity + 1));
- htop[tb->common.keypos] = key;
+ htop[tbl->common.keypos] = key;
obj = make_tuple(htop);
- if (db_put_tree(tbl, obj, 1) != DB_ERROR_NONE) {
+ if (db_put_tree_common(&tbl->common, root,
+ obj, 1, stack_container) != DB_ERROR_NONE) {
return 0;
}
- pp = find_node2(tb, key);
+ pp = find_node2(&tbl->common, root, key);
ASSERT(pp != NULL);
HRelease(p, hend, htop);
flags |= DB_NEW_OBJECT;
@@ -2841,21 +3013,28 @@ db_lookup_dbterm_tree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
return 1;
}
-static void
-db_finalize_dbterm_tree(int cret, DbUpdateHandle *handle)
+static int
+db_lookup_dbterm_tree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
+ DbUpdateHandle* handle)
{
- DbTable *tbl = handle->tb;
DbTableTree *tb = &tbl->tree;
+ return db_lookup_dbterm_tree_common(p, tbl, &tb->root, key, obj, handle, tb);
+}
+
+void db_finalize_dbterm_tree_common(int cret, DbUpdateHandle *handle,
+ DbTableTree *stack_container)
+{
+ DbTable *tbl = handle->tb;
TreeDbTerm *bp = (TreeDbTerm *) *handle->bp;
if (handle->flags & DB_NEW_OBJECT && cret != DB_ERROR_NONE) {
Eterm ret;
- db_erase_tree(tbl, GETKEY(tb, bp->dbterm.tpl), &ret);
+ db_erase_tree(tbl, GETKEY(&tbl->common, bp->dbterm.tpl), &ret);
} else if (handle->flags & DB_MUST_RESIZE) {
db_finalize_resize(handle, offsetof(TreeDbTerm,dbterm));
- reset_static_stack(tb);
+ reset_static_stack(stack_container);
- free_term(tb, bp);
+ free_term(tbl, bp);
}
#ifdef DEBUG
handle->dbterm = 0;
@@ -2863,13 +3042,22 @@ db_finalize_dbterm_tree(int cret, DbUpdateHandle *handle)
return;
}
+static void
+db_finalize_dbterm_tree(int cret, DbUpdateHandle *handle)
+{
+ DbTable *tbl = handle->tb;
+ DbTableTree *tb = &tbl->tree;
+ db_finalize_dbterm_tree_common(cret, handle, tb);
+}
+
/*
* Traverse the tree with a callback function, used by db_match_xxx
*/
-static void traverse_backwards(DbTableTree *tb,
+static void traverse_backwards(DbTableCommon *tb,
+ TreeDbTerm **root,
DbTreeStack* stack,
Eterm lastkey,
- int (*doit)(DbTableTree *,
+ int (*doit)(DbTableCommon *,
TreeDbTerm *,
void *,
int),
@@ -2879,7 +3067,7 @@ static void traverse_backwards(DbTableTree *tb,
if (lastkey == THE_NON_VALUE) {
stack->pos = stack->slot = 0;
- if (( this = tb->root ) == NULL) {
+ if (( this = *root ) == NULL) {
return;
}
while (this != NULL) {
@@ -2887,15 +3075,15 @@ static void traverse_backwards(DbTableTree *tb,
this = this->right;
}
this = TOP_NODE(stack);
- next = find_prev(tb, stack, GETKEY(tb, this->dbterm.tpl));
+ next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
if (!((*doit)(tb, this, context, 0)))
return;
} else {
- next = find_prev(tb, stack, lastkey);
+ next = find_prev(tb, *root, stack, lastkey);
}
while ((this = next) != NULL) {
- next = find_prev(tb, stack, GETKEY(tb, this->dbterm.tpl));
+ next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
if (!((*doit)(tb, this, context, 0)))
return;
}
@@ -2904,10 +3092,11 @@ static void traverse_backwards(DbTableTree *tb,
/*
* Traverse the tree with a callback function, used by db_match_xxx
*/
-static void traverse_forward(DbTableTree *tb,
+static void traverse_forward(DbTableCommon *tb,
+ TreeDbTerm **root,
DbTreeStack* stack,
Eterm lastkey,
- int (*doit)(DbTableTree *,
+ int (*doit)(DbTableCommon *,
TreeDbTerm *,
void *,
int),
@@ -2917,7 +3106,7 @@ static void traverse_forward(DbTableTree *tb,
if (lastkey == THE_NON_VALUE) {
stack->pos = stack->slot = 0;
- if (( this = tb->root ) == NULL) {
+ if (( this = *root ) == NULL) {
return;
}
while (this != NULL) {
@@ -2925,15 +3114,15 @@ static void traverse_forward(DbTableTree *tb,
this = this->left;
}
this = TOP_NODE(stack);
- next = find_next(tb, stack, GETKEY(tb, this->dbterm.tpl));
+ next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
if (!((*doit)(tb, this, context, 1)))
return;
} else {
- next = find_next(tb, stack, lastkey);
+ next = find_next(tb, *root, stack, lastkey);
}
while ((this = next) != NULL) {
- next = find_next(tb, stack, GETKEY(tb, this->dbterm.tpl));
+ next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
if (!((*doit)(tb, this, context, 1)))
return;
}
@@ -2942,10 +3131,11 @@ static void traverse_forward(DbTableTree *tb,
/*
* Traverse the tree with an update callback function, used by db_select_replace
*/
-static void traverse_update_backwards(DbTableTree *tb,
+static void traverse_update_backwards(DbTableCommon *tb,
+ TreeDbTerm **root,
DbTreeStack* stack,
Eterm lastkey,
- int (*doit)(DbTableTree*,
+ int (*doit)(DbTableCommon*,
TreeDbTerm**,
void*,
int),
@@ -2956,7 +3146,7 @@ static void traverse_update_backwards(DbTableTree *tb,
if (lastkey == THE_NON_VALUE) {
stack->pos = stack->slot = 0;
- if (( this = tb->root ) == NULL) {
+ if (( this = *root ) == NULL) {
return;
}
while (this != NULL) {
@@ -2964,23 +3154,23 @@ static void traverse_update_backwards(DbTableTree *tb,
this = this->right;
}
this = TOP_NODE(stack);
- this_ptr = find_ptr(tb, stack, this);
+ this_ptr = find_ptr(tb, root, stack, this);
ASSERT(this_ptr != NULL);
res = (*doit)(tb, this_ptr, context, 0);
REPLACE_TOP_NODE(stack, *this_ptr);
- next = find_prev(tb, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
+ next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
if (!res)
return;
} else {
- next = find_prev(tb, stack, lastkey);
+ next = find_prev(tb, *root, stack, lastkey);
}
while ((this = next) != NULL) {
- this_ptr = find_ptr(tb, stack, this);
+ this_ptr = find_ptr(tb, root, stack, this);
ASSERT(this_ptr != NULL);
res = (*doit)(tb, this_ptr, context, 0);
REPLACE_TOP_NODE(stack, *this_ptr);
- next = find_prev(tb, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
+ next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
if (!res)
return;
}
@@ -2990,8 +3180,8 @@ static void traverse_update_backwards(DbTableTree *tb,
* Returns 0 if not given 1 if given and -1 on no possible match
* if key is given; *ret is set to point to the object concerned.
*/
-static int key_given(DbTableTree *tb, Eterm pattern, TreeDbTerm ***ret,
- Eterm *partly_bound)
+static int key_given(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern,
+ TreeDbTerm ***ret, Eterm *partly_bound)
{
TreeDbTerm **this;
Eterm key;
@@ -2999,11 +3189,11 @@ static int key_given(DbTableTree *tb, Eterm pattern, TreeDbTerm ***ret,
ASSERT(ret != NULL);
if (pattern == am_Underscore || db_is_variable(pattern) != -1)
return 0;
- key = db_getkey(tb->common.keypos, pattern);
+ key = db_getkey(tb->keypos, pattern);
if (is_non_value(key))
return -1; /* can't possibly match anything */
if (!db_has_variable(key)) { /* Bound key */
- if (( this = find_node2(tb, key) ) == NULL) {
+ if (( this = find_node2(tb, root, key) ) == NULL) {
return -1;
}
*ret = this;
@@ -3296,7 +3486,7 @@ static int do_partly_bound_can_match_greater(Eterm a, Eterm b,
* Callback functions for the different match functions
*/
-static int doit_select(DbTableTree *tb, TreeDbTerm *this, void *ptr,
+static int doit_select(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
int forward)
{
struct select_context *sc = (struct select_context *) ptr;
@@ -3314,7 +3504,7 @@ static int doit_select(DbTableTree *tb, TreeDbTerm *this, void *ptr,
GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0))) {
return 0;
}
- ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &this->dbterm, &hp, 2);
+ ret = db_match_dbterm(tb, sc->p,sc->mp, &this->dbterm, &hp, 2);
if (is_value(ret)) {
sc->accum = CONS(hp, ret, sc->accum);
}
@@ -3331,7 +3521,7 @@ static int doit_select(DbTableTree *tb, TreeDbTerm *this, void *ptr,
return 1;
}
-static int doit_select_count(DbTableTree *tb, TreeDbTerm *this, void *ptr,
+static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
int forward)
{
struct select_count_context *sc = (struct select_count_context *) ptr;
@@ -3345,7 +3535,7 @@ static int doit_select_count(DbTableTree *tb, TreeDbTerm *this, void *ptr,
GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0)) {
return 0;
}
- ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &this->dbterm, NULL, 0);
+ ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, NULL, 0);
if (ret == am_true) {
++(sc->got);
}
@@ -3355,7 +3545,7 @@ static int doit_select_count(DbTableTree *tb, TreeDbTerm *this, void *ptr,
return 1;
}
-static int doit_select_chunk(DbTableTree *tb, TreeDbTerm *this, void *ptr,
+static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
int forward)
{
struct select_context *sc = (struct select_context *) ptr;
@@ -3374,7 +3564,7 @@ static int doit_select_chunk(DbTableTree *tb, TreeDbTerm *this, void *ptr,
return 0;
}
- ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &this->dbterm, &hp, 2);
+ ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, &hp, 2);
if (is_value(ret)) {
++(sc->got);
sc->accum = CONS(hp, ret, sc->accum);
@@ -3393,7 +3583,7 @@ static int doit_select_chunk(DbTableTree *tb, TreeDbTerm *this, void *ptr,
}
-static int doit_select_delete(DbTableTree *tb, TreeDbTerm *this, void *ptr,
+static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr,
int forward)
{
struct select_delete_context *sc = (struct select_delete_context *) ptr;
@@ -3401,7 +3591,7 @@ static int doit_select_delete(DbTableTree *tb, TreeDbTerm *this, void *ptr,
Eterm key;
if (sc->erase_lastterm)
- free_term(tb, sc->lastterm);
+ free_term((DbTable*)tb, sc->lastterm);
sc->erase_lastterm = 0;
sc->lastterm = this;
@@ -3409,10 +3599,10 @@ static int doit_select_delete(DbTableTree *tb, TreeDbTerm *this, void *ptr,
cmp_partly_bound(sc->end_condition,
GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0)
return 0;
- ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &this->dbterm, NULL, 0);
+ ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, NULL, 0);
if (ret == am_true) {
key = GETKEY(sc->tb, this->dbterm.tpl);
- linkout_tree(sc->tb, key);
+ linkout_tree(sc->tb, sc->root, key, sc->stack);
sc->erase_lastterm = 1;
++sc->accum;
}
@@ -3422,7 +3612,7 @@ static int doit_select_delete(DbTableTree *tb, TreeDbTerm *this, void *ptr,
return 1;
}
-static int doit_select_replace(DbTableTree *tb, TreeDbTerm **this, void *ptr,
+static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this, void *ptr,
int forward)
{
struct select_replace_context *sc = (struct select_replace_context *) ptr;
@@ -3436,13 +3626,13 @@ static int doit_select_replace(DbTableTree *tb, TreeDbTerm **this, void *ptr,
GETKEY_WITH_POS(sc->keypos, (*this)->dbterm.tpl)) > 0)) {
return 0;
}
- ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &(*this)->dbterm, NULL, 0);
+ ret = db_match_dbterm(tb, sc->p, sc->mp, &(*this)->dbterm, NULL, 0);
if (is_value(ret)) {
TreeDbTerm* new;
TreeDbTerm* old = *this;
#ifdef DEBUG
- Eterm key = db_getkey(tb->common.keypos, ret);
+ Eterm key = db_getkey(tb->keypos, ret);
ASSERT(is_value(key));
ASSERT(cmp_key(tb, key, old) == 0);
#endif
@@ -3452,7 +3642,7 @@ static int doit_select_replace(DbTableTree *tb, TreeDbTerm **this, void *ptr,
new->balance = old->balance;
sc->lastobj = new->dbterm.tpl;
*this = new;
- free_term(tb, old);
+ free_term((DbTable*)tb, old);
++(sc->replaced);
}
if (--(sc->max) <= 0) {
@@ -3462,7 +3652,7 @@ static int doit_select_replace(DbTableTree *tb, TreeDbTerm **this, void *ptr,
}
#ifdef TREE_DEBUG
-static void do_dump_tree2(DbTableTree* tb, int to, void *to_arg, int show,
+static void do_dump_tree2(DbTableCommon* tb, int to, void *to_arg, int show,
TreeDbTerm *t, int offset)
{
if (t == NULL)
@@ -3471,7 +3661,7 @@ static void do_dump_tree2(DbTableTree* tb, int to, void *to_arg, int show,
if (show) {
const char* prefix;
Eterm term;
- if (tb->common.compress) {
+ if (tb->compress) {
prefix = "key=";
term = GETKEY(tb, t->dbterm.tpl);
}
@@ -3526,7 +3716,7 @@ static void check_slot_pos(DbTableTree *tb)
"element position %d is really 0x%08X, when stack says "
"it's 0x%08X\n", tb->stack.slot, t,
tb->stack.array[tb->stack.pos - 1]);
- do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
+ do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
}
}
@@ -3541,14 +3731,14 @@ static void check_saved_stack(DbTableTree *tb)
if (t != stack->array[0]) {
erts_fprintf(stderr,"tb->stack[0] is 0x%08X, should be 0x%08X\n",
stack->array[0], t);
- do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
+ do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
return;
}
while (n < stack->pos) {
if (t == NULL) {
erts_fprintf(stderr, "NULL pointer in tree when stack not empty,"
" stack depth is %d\n", n);
- do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
+ do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
return;
}
n++;
@@ -3562,7 +3752,7 @@ static void check_saved_stack(DbTableTree *tb)
"represent child pointer in tree!"
"(left == 0x%08X, right == 0x%08X\n",
n, tb->stack[n], t->left, t->right);
- do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
+ do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
return;
}
}
@@ -3581,7 +3771,7 @@ static int check_table_tree(DbTableTree* tb, TreeDbTerm *t)
erts_fprintf(stderr,"balance = %d, left = 0x%08X, right = 0x%08X\n",
t->balance, t->left, t->right);
erts_fprintf(stderr,"\nDump:\n---------------------------------\n");
- do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, t, 0);
+ do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, t, 0);
erts_fprintf(stderr,"\n---------------------------------\n");
}
return ((rh > lh) ? rh : lh) + 1;
diff --git a/erts/emulator/beam/erl_db_tree_util.h b/erts/emulator/beam/erl_db_tree_util.h
new file mode 100644
index 0000000000..fbd9a9124a
--- /dev/null
+++ b/erts/emulator/beam/erl_db_tree_util.h
@@ -0,0 +1,151 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 1998-2016. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+#ifndef _DB_TREE_UTIL_H
+#define _DB_TREE_UTIL_H
+
+/*
+** Internal functions and macros used by both the CA tree and the AVL tree
+*/
+
+/*
+** A stack of this size is enough for an AVL tree with more than
+** 0xFFFFFFFF elements. May be subject to change if
+** the datatype of the element counter is changed to a 64 bit integer.
+** The Maximal height of an AVL tree is calculated as:
+** h(n) <= 1.4404 * log(n + 2) - 0.328
+** Where n denotes the number of nodes, h(n) the height of the tree
+** with n nodes and log is the binary logarithm.
+*/
+
+#define STACK_NEED 50
+
+#define PUSH_NODE(Dtt, Tdt) \
+ ((Dtt)->array[(Dtt)->pos++] = Tdt)
+
+#define POP_NODE(Dtt) \
+ (((Dtt)->pos) ? \
+ (Dtt)->array[--((Dtt)->pos)] : NULL)
+
+#define TOP_NODE(Dtt) \
+ ((Dtt->pos) ? \
+ (Dtt)->array[(Dtt)->pos - 1] : NULL)
+
+#define EMPTY_NODE(Dtt) (TOP_NODE(Dtt) == NULL)
+
+static ERTS_INLINE void free_term(DbTable *tb, TreeDbTerm* p)
+{
+ db_free_term(tb, p, offsetof(TreeDbTerm, dbterm));
+}
+
+/*
+** Some macros for "direction stacks"
+*/
+#define DIR_LEFT 0
+#define DIR_RIGHT 1
+#define DIR_END 2
+
+static ERTS_INLINE Sint cmp_key(DbTableCommon* tb, Eterm key, TreeDbTerm* obj) {
+ return CMP(key, GETKEY(tb,obj->dbterm.tpl));
+}
+
+int tree_balance_left(TreeDbTerm **this);
+int tree_balance_right(TreeDbTerm **this);
+
+int db_first_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
+ Eterm *ret, DbTableTree *stack_container);
+int db_next_tree_common(Process *p, DbTable *tbl,
+ TreeDbTerm *root, Eterm key,
+ Eterm *ret, DbTreeStack* stack);
+int db_last_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
+ Eterm *ret, DbTableTree *stack_container);
+int db_prev_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, Eterm key,
+ Eterm *ret, DbTreeStack* stack);
+int db_put_tree_common(DbTableCommon *tb, TreeDbTerm **root, Eterm obj,
+ int key_clash_fail, DbTableTree *stack_container);
+int db_get_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm *root, Eterm key,
+ Eterm *ret, DbTableTree *stack_container);
+int db_get_element_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm *root, Eterm key,
+ int ndex, Eterm *ret, DbTableTree *stack_container);
+int db_member_tree_common(DbTableCommon *tb, TreeDbTerm *root, Eterm key, Eterm *ret,
+ DbTableTree *stack_container);
+int db_erase_tree_common(DbTable *tbl, TreeDbTerm **root, Eterm key, Eterm *ret,
+ DbTreeStack *stack /* NULL if no static stack */);
+int db_erase_object_tree_common(DbTable *tbl, TreeDbTerm **root, Eterm object,
+ Eterm *ret, DbTableTree *stack_container);
+int db_slot_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root,
+ Eterm slot_term, Eterm *ret,
+ DbTableTree *stack_container);
+int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+ Eterm tid, Eterm pattern, Sint chunk_size,
+ int reverse, Eterm *ret,
+ DbTableTree *stack_container);
+int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+ Eterm tid, Eterm pattern, int reverse, Eterm *ret,
+ DbTableTree *stack_container);
+int db_select_delete_tree_common(Process *p, DbTable *tbl,
+ TreeDbTerm **root,
+ Eterm tid, Eterm pattern,
+ Eterm *ret,
+ DbTreeStack* stack);
+int db_select_continue_tree_common(Process *p,
+ DbTableCommon *tb,
+ TreeDbTerm **root,
+ Eterm continuation,
+ Eterm *ret,
+ DbTableTree *stack_container);
+int db_select_delete_continue_tree_common(Process *p,
+ DbTable *tbl,
+ TreeDbTerm **root,
+ Eterm continuation,
+ Eterm *ret,
+ DbTreeStack* stack);
+int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+ Eterm tid, Eterm pattern, Eterm *ret,
+ DbTableTree *stack_container);
+int db_select_count_continue_tree_common(Process *p,
+ DbTableCommon *tb,
+ TreeDbTerm **root,
+ Eterm continuation,
+ Eterm *ret,
+ DbTableTree *stack_container);
+int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root,
+ Eterm tid, Eterm pattern, Eterm *ret,
+ DbTableTree *stack_container);
+int db_select_replace_continue_tree_common(Process *p,
+ DbTableCommon *tb,
+ TreeDbTerm **root,
+ Eterm continuation,
+ Eterm *ret,
+ DbTableTree *stack_container);
+int db_take_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root,
+ Eterm key, Eterm *ret,
+ DbTreeStack *stack /* NULL if no static stack */);
+void db_print_tree_common(fmtfn_t to, void *to_arg,
+ int show, TreeDbTerm *root, DbTable *tbl);
+void db_foreach_offheap_tree_common(TreeDbTerm *root,
+ void (*func)(ErlOffHeap *, void *),
+ void * arg);
+int db_lookup_dbterm_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root,
+ Eterm key, Eterm obj, DbUpdateHandle* handle,
+ DbTableTree *stack_container);
+void db_finalize_dbterm_tree_common(int cret, DbUpdateHandle *handle,
+ DbTableTree *stack_container);
+#endif /* _DB_TREE_UTIL_H */
diff --git a/erts/emulator/beam/erl_db_util.h b/erts/emulator/beam/erl_db_util.h
index 6ec3b4f98f..be74bc795c 100644
--- a/erts/emulator/beam/erl_db_util.h
+++ b/erts/emulator/beam/erl_db_util.h
@@ -90,6 +90,8 @@ typedef struct {
Uint new_size;
int flags;
void* lck;
+ void* lck2;
+ int current_level;
} DbUpdateHandle;
@@ -274,23 +276,26 @@ typedef struct db_table_common {
} DbTableCommon;
/* These are status bit patterns */
-#define DB_PRIVATE (1 << 0)
-#define DB_PROTECTED (1 << 1)
-#define DB_PUBLIC (1 << 2)
-#define DB_DELETE (1 << 3) /* table is being deleted */
-#define DB_SET (1 << 4)
-#define DB_BAG (1 << 5)
-#define DB_DUPLICATE_BAG (1 << 6)
-#define DB_ORDERED_SET (1 << 7)
-#define DB_FINE_LOCKED (1 << 8) /* write_concurrency */
-#define DB_FREQ_READ (1 << 9) /* read_concurrency */
-#define DB_NAMED_TABLE (1 << 10)
-#define DB_BUSY (1 << 11)
+#define DB_PRIVATE (1 << 0)
+#define DB_PROTECTED (1 << 1)
+#define DB_PUBLIC (1 << 2)
+#define DB_DELETE (1 << 3) /* table is being deleted */
+#define DB_SET (1 << 4)
+#define DB_BAG (1 << 5)
+#define DB_DUPLICATE_BAG (1 << 6)
+#define DB_ORDERED_SET (1 << 7)
+#define DB_CA_ORDERED_SET (1 << 8)
+#define DB_FINE_LOCKED (1 << 9) /* write_concurrency */
+#define DB_FREQ_READ (1 << 10) /* read_concurrency */
+#define DB_NAMED_TABLE (1 << 11)
+#define DB_BUSY (1 << 12)
#define IS_HASH_TABLE(Status) (!!((Status) & \
(DB_BAG | DB_SET | DB_DUPLICATE_BAG)))
#define IS_TREE_TABLE(Status) (!!((Status) & \
DB_ORDERED_SET))
+#define IS_CATREE_TABLE(Status) (!!((Status) & \
+ DB_CA_ORDERED_SET))
#define NFIXED(T) (erts_refc_read(&(T)->common.fix_count,0))
#define IS_FIXED(T) (NFIXED(T) != 0)
diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c
index 463ae898a3..3b10ef8c25 100644
--- a/erts/emulator/beam/erl_lock_check.c
+++ b/erts/emulator/beam/erl_lock_check.c
@@ -91,6 +91,8 @@ static erts_lc_lock_order_t erts_lock_order[] = {
{ "db_tab", "address" },
{ "db_tab_fix", "address" },
{ "db_hash_slot", "address" },
+ { "erl_db_catree_base_node", "dynamic" },
+ { "erl_db_catree_route_node", "dynamic" },
{ "resource_monitors", "address" },
{ "driver_list", NULL },
{ "proc_msgq", "pid" },
@@ -707,6 +709,26 @@ erts_lc_get_lock_order_id(char *name)
return (Sint16) -1;
}
+int
+erts_lc_is_check_order(char *name)
+{
+ int i;
+ if (!name || name[0] == '\0')
+ erts_fprintf(stderr, "Missing lock name\n");
+
+ for (i = 0; i < ERTS_LOCK_ORDER_SIZE; i++) {
+ if (sys_strcmp(erts_lock_order[i].name, name) == 0) {
+ if (erts_lock_order[i].internal_order != NULL &&
+ sys_strcmp(erts_lock_order[i].internal_order, "dynamic") == 0) {
+ return 0;
+ }else{
+ return 1;
+ }
+ }
+ }
+ return 1;
+}
+
static int compare_locked_by_id(lc_locked_lock_t *locked_lock, erts_lc_lock_t *comparand)
{
if(locked_lock->id < comparand->id) {
@@ -987,15 +1009,17 @@ erts_lc_trylock_force_busy_flg(erts_lc_lock_t *lck, erts_lock_options_t options)
*/
- /* Check that we are not trying to lock this lock twice */
- for (tl_lck = thr->locked.last; tl_lck; tl_lck = tl_lck->prev) {
- if (tl_lck->id < lck->id
- || (tl_lck->id == lck->id && tl_lck->extra <= lck->extra)) {
- if (tl_lck->id == lck->id && tl_lck->extra == lck->extra)
- lock_twice("Trylocking", thr, lck, options);
- break;
- }
- }
+ if (lck->check_order) {
+ /* Check that we are not trying to lock this lock twice */
+ for (tl_lck = thr->locked.last; tl_lck; tl_lck = tl_lck->prev) {
+ if (tl_lck->id < lck->id
+ || (tl_lck->id == lck->id && tl_lck->extra <= lck->extra)) {
+ if (tl_lck->id == lck->id && tl_lck->extra == lck->extra)
+ lock_twice("Trylocking", thr, lck, options);
+ break;
+ }
+ }
+ }
#ifndef ERTS_LC_ALLWAYS_FORCE_BUSY_TRYLOCK_ON_LOCK_ORDER_VIOLATION
/* We only force busy if a lock order violation would occur
@@ -1044,7 +1068,7 @@ void erts_lc_trylock_flg_x(int locked, erts_lc_lock_t *lck, erts_lock_options_t
for (tl_lck = thr->locked.last; tl_lck; tl_lck = tl_lck->prev) {
if (tl_lck->id < lck->id
|| (tl_lck->id == lck->id && tl_lck->extra <= lck->extra)) {
- if (tl_lck->id == lck->id && tl_lck->extra == lck->extra)
+ if (tl_lck->id == lck->id && tl_lck->extra == lck->extra && lck->check_order)
lock_twice("Trylocking", thr, lck, options);
if (locked) {
ll->next = tl_lck->next;
@@ -1164,9 +1188,10 @@ void erts_lc_lock_flg_x(erts_lc_lock_t *lck, erts_lock_options_t options,
ASSERT(0 < lck->id && lck->id < ERTS_LOCK_ORDER_SIZE);
thr->matrix.m[lck->id][0] = 1;
}
- else if (thr->locked.last->id < lck->id
- || (thr->locked.last->id == lck->id
- && thr->locked.last->extra < lck->extra)) {
+ else if (( ! lck->check_order && thr->locked.last->id == lck->id) ||
+ (thr->locked.last->id < lck->id
+ || (thr->locked.last->id == lck->id
+ && thr->locked.last->extra < lck->extra))) {
lc_locked_lock_t* ll;
if (LOCK_IS_TYPE_ORDER_VIOLATION(lck->flags, thr->locked.last->flags)) {
type_order_violation("locking ", thr, lck);
@@ -1296,7 +1321,7 @@ void
erts_lc_init_lock(erts_lc_lock_t *lck, char *name, erts_lock_flags_t flags)
{
lck->id = erts_lc_get_lock_order_id(name);
-
+ lck->check_order = erts_lc_is_check_order(name);
lck->extra = (UWord) &lck->extra;
ASSERT(is_not_immed(lck->extra));
lck->flags = flags;
@@ -1308,6 +1333,7 @@ void
erts_lc_init_lock_x(erts_lc_lock_t *lck, char *name, erts_lock_flags_t flags, Eterm extra)
{
lck->id = erts_lc_get_lock_order_id(name);
+ lck->check_order = erts_lc_is_check_order(name);
lck->extra = extra;
ASSERT(is_immed(lck->extra));
lck->flags = flags;
diff --git a/erts/emulator/beam/erl_lock_check.h b/erts/emulator/beam/erl_lock_check.h
index d10e32985a..472e88ad65 100644
--- a/erts/emulator/beam/erl_lock_check.h
+++ b/erts/emulator/beam/erl_lock_check.h
@@ -46,6 +46,7 @@
typedef struct {
int inited;
Sint16 id;
+ int check_order;
erts_lock_flags_t flags;
erts_lock_options_t taken_options;
UWord extra;
@@ -53,11 +54,12 @@ typedef struct {
#define ERTS_LC_INITITALIZED 0x7f7f7f7f
-#define ERTS_LC_LOCK_INIT(ID, X, F) {ERTS_LC_INITITALIZED, (ID), (F), 0, (X)}
+#define ERTS_LC_LOCK_INIT(ID, X, F) {ERTS_LC_INITITALIZED, (ID), 1, (F), 0, (X)}
void erts_lc_init(void);
void erts_lc_late_init(void);
Sint16 erts_lc_get_lock_order_id(char *name);
+int erts_lc_is_check_order(char *name);
void erts_lc_check(erts_lc_lock_t *have, int have_len,
erts_lc_lock_t *have_not, int have_not_len);
void erts_lc_check_exact(erts_lc_lock_t *have, int have_len);