aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_db_catree.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_db_catree.c')
-rw-r--r--erts/emulator/beam/erl_db_catree.c2243
1 files changed, 2243 insertions, 0 deletions
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c
new file mode 100644
index 0000000000..639c7e5e03
--- /dev/null
+++ b/erts/emulator/beam/erl_db_catree.c
@@ -0,0 +1,2243 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB and Kjell Winblad 1998-2018. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+/*
+ * Description: Implementation of ETS ordered_set table type with
+ * fine-grained synchronization.
+ *
+ * Author: Kjell Winblad
+ *
+ * This implementation is based on the contention adapting search tree
+ * (CA tree). The CA tree is a concurrent data structure that
+ * dynamically adapts its synchronization granularity based on how
+ * much contention is detected in locks. The following publication
+ * contains a detailed description of CA trees:
+ *
+ * A Contention Adapting Approach to Concurrent Ordered Sets
+ * Journal of Parallel and Distributed Computing, 2018
+ * Kjell Winblad and Konstantinos Sagonas
+ * https://doi.org/10.1016/j.jpdc.2017.11.007
+ *
+ * The following publication may also be interesting as it discusses
+ * how the CA tree can be used as an ETS ordered_set table type
+ * backend:
+ *
+ * More Scalable Ordered Set for ETS Using Adaptation
+ * In Thirteenth ACM SIGPLAN workshop on Erlang (2014)
+ * Kjell Winblad and Konstantinos Sagonas
+ * https://doi.org/10.1145/2633448.2633455
+ *
+ * This implementation of the ordered_set ETS table type is only
+ * activated when the options {write_concurrency, true}, public and
+ * ordered_set are passed to the ets:new/2 function. This
+ * implementation is expected to scale better than the default
+ * implementation (located in "erl_db_tree.c") when concurrent
+ * processes use the following ETS operations to operate on a table:
+ *
+ * delete/2, delete_object/2, first/1, insert/2 (single object),
+ * insert_new/2 (single object), lookup/2, lookup_element/2, member/2,
+ * next/2, take/2 and update_element/3 (single object).
+ *
+ * Currently, the implementation does not have scalable support for
+ * the other operations (e.g., select/2). These operations are handled
+ * by merging all locks so that all terms get protected by a single
+ * lock. This implementation may thus perform worse than the default
+ * implementation in some scenarios. For example, when concurrent
+ * processes access a table with the operations insert/2, delete/2 and
+ * select/2, the insert/2 and delete/2 operations will trigger splits
+ * of locks (to get more fine-grained synchronization) but this will
+ * quickly be undone by the select/2 operation if this operation is
+ * also called frequently.
+ *
+ * The default implementation has a static stack optimization (see
+ * get_static_stack in erl_db_tree.c). This implementation does not
+ * have such an optimization as it induces bad scalability when
+ * concurrent read operations are frequent (they all try to get hold
+ * of the same stack). The default implementation may thus perform
+ * better compared to this implementation in scenarios where the
+ * static stack optimization is useful. One such scenario is when only
+ * one process is accessing the table and this process is traversing
+ * the table with a sequence of next/2 calls.
+ */
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include "sys.h"
+#include "erl_vm.h"
+#include "global.h"
+#include "erl_process.h"
+#include "error.h"
+#define ERTS_WANT_DB_INTERNAL__
+#include "erl_db.h"
+#include "bif.h"
+#include "big.h"
+#include "erl_binary.h"
+
+#include "erl_db_catree.h"
+#include "erl_db_tree.h"
+#include "erl_db_tree_util.h"
+
+#ifdef DEBUG
+# define IF_DEBUG(X) X
+#else
+# define IF_DEBUG(X)
+#endif
+
+/*
+** Forward declarations
+*/
+
+static SWord do_free_base_node_cont(DbTableCATree *tb, SWord num_left);
+static SWord do_free_routing_nodes_catree_cont(DbTableCATree *tb, SWord num_left);
+static DbTableCATreeNode *catree_first_base_node_from_free_list(DbTableCATree *tb);
+
+/* Method interface functions */
+static int db_first_catree(Process *p, DbTable *tbl,
+ Eterm *ret);
+static int db_next_catree(Process *p, DbTable *tbl,
+ Eterm key, Eterm *ret);
+static int db_last_catree(Process *p, DbTable *tbl,
+ Eterm *ret);
+static int db_prev_catree(Process *p, DbTable *tbl,
+ Eterm key,
+ Eterm *ret);
+static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail);
+static int db_get_catree(Process *p, DbTable *tbl,
+ Eterm key, Eterm *ret);
+static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret);
+static int db_get_element_catree(Process *p, DbTable *tbl,
+ Eterm key,int ndex,
+ Eterm *ret);
+static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret);
+static int db_erase_object_catree(DbTable *tbl, Eterm object,Eterm *ret);
+static int db_slot_catree(Process *p, DbTable *tbl,
+ Eterm slot_term, Eterm *ret);
+static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, int reversed, Eterm *ret);
+static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret);
+static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Sint chunk_size,
+ int reversed, Eterm *ret);
+static int db_select_continue_catree(Process *p, DbTable *tbl,
+ Eterm continuation, Eterm *ret);
+static int db_select_count_continue_catree(Process *p, DbTable *tbl,
+ Eterm continuation, Eterm *ret);
+static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret);
+static int db_select_delete_continue_catree(Process *p, DbTable *tbl,
+ Eterm continuation, Eterm *ret);
+static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret);
+static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
+ Eterm continuation, Eterm *ret);
+static int db_take_catree(Process *, DbTable *, Eterm, Eterm *);
+static void db_print_catree(fmtfn_t to, void *to_arg,
+ int show, DbTable *tbl);
+static int db_free_table_catree(DbTable *tbl);
+static SWord db_free_table_continue_catree(DbTable *tbl, SWord);
+static void db_foreach_offheap_catree(DbTable *,
+ void (*)(ErlOffHeap *, void *),
+ void *);
+static SWord db_delete_all_objects_catree(Process* p, DbTable* tbl, SWord reds);
+static int
+db_lookup_dbterm_catree(Process *, DbTable *, Eterm key, Eterm obj,
+ DbUpdateHandle*);
+static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *);
+
+static void split_catree(DbTableCATree *tb,
+ DbTableCATreeNode* ERTS_RESTRICT base,
+ DbTableCATreeNode* ERTS_RESTRICT parent);
+static void join_catree(DbTableCATree *tb,
+ DbTableCATreeNode *thiz,
+ DbTableCATreeNode *parent);
+
+
+/*
+** External interface
+*/
+DbTableMethod db_catree =
+{
+ db_create_catree,
+ db_first_catree,
+ db_next_catree,
+ db_last_catree,
+ db_prev_catree,
+ db_put_catree,
+ db_get_catree,
+ db_get_element_catree,
+ db_member_catree,
+ db_erase_catree,
+ db_erase_object_catree,
+ db_slot_catree,
+ db_select_chunk_catree,
+ db_select_catree,
+ db_select_delete_catree,
+ db_select_continue_catree,
+ db_select_delete_continue_catree,
+ db_select_count_catree,
+ db_select_count_continue_catree,
+ db_select_replace_catree,
+ db_select_replace_continue_catree,
+ db_take_catree,
+ db_delete_all_objects_catree,
+ db_free_table_catree,
+ db_free_table_continue_catree,
+ db_print_catree,
+ db_foreach_offheap_catree,
+ db_lookup_dbterm_catree,
+ db_finalize_dbterm_catree
+
+};
+
+/*
+ * Constants
+ */
+
+#define ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION 200
+#define ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION (-1)
+#define ERL_DB_CATREE_LOCK_MORE_THAN_ONE_CONTRIBUTION (-10)
+#define ERL_DB_CATREE_HIGH_CONTENTION_LIMIT 1000
+#define ERL_DB_CATREE_LOW_CONTENTION_LIMIT (-1000)
+#define ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT 14
+
+/*
+ * Internal CA tree related helper functions and macros
+ */
+
+#define GET_ROUTE_NODE_KEY(node) (node->u.route.key.term)
+#define GET_BASE_NODE_LOCK(node) (&(node->u.base.lock))
+#define GET_ROUTE_NODE_LOCK(node) (&(node->u.route.lock))
+
+
+/* Helpers for reading and writing shared atomic variables */
+
+/* No memory barrier */
+#define GET_ROOT(tb) ((DbTableCATreeNode*)erts_atomic_read_nob(&(tb->root)))
+#define GET_LEFT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->u.route.left)))
+#define GET_RIGHT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->u.route.right)))
+#define SET_ROOT(tb, v) erts_atomic_set_nob(&((tb)->root), (erts_aint_t)(v))
+#define SET_LEFT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->u.route.left), (erts_aint_t)(v));
+#define SET_RIGHT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v));
+
+
+/* Release or acquire barriers */
+#define GET_ROOT_ACQB(tb) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(tb->root)))
+#define GET_LEFT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->u.route.left)))
+#define GET_RIGHT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->u.route.right)))
+#define SET_ROOT_RELB(tb, v) erts_atomic_set_relb(&((tb)->root), (erts_aint_t)(v))
+#define SET_LEFT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.left), (erts_aint_t)(v));
+#define SET_RIGHT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v));
+
+/* Compares a key to the key in a route node */
+static ERTS_INLINE Sint cmp_key_route(Eterm key,
+ DbTableCATreeNode *obj)
+{
+ return CMP(key, GET_ROUTE_NODE_KEY(obj));
+}
+
+/*
+ * Used by the split_tree function
+ */
+static ERTS_INLINE
+int less_than_two_elements(TreeDbTerm *root)
+{
+ return root == NULL || (root->left == NULL && root->right == NULL);
+}
+
+/*
+ * Inserts a TreeDbTerm into a tree. Returns the new root.
+ */
+static ERTS_INLINE
+TreeDbTerm* insert_TreeDbTerm(DbTableCATree *tb,
+ TreeDbTerm *insert_to_root,
+ TreeDbTerm *value_to_insert) {
+ /* Non recursive insertion in AVL tree, building our own stack */
+ TreeDbTerm **tstack[STACK_NEED];
+ int tpos = 0;
+ int dstack[STACK_NEED+1];
+ int dpos = 0;
+ int state = 0;
+ TreeDbTerm * base = insert_to_root;
+ TreeDbTerm **this = &base;
+ Sint c;
+ Eterm key;
+ int dir;
+ TreeDbTerm *p1, *p2, *p;
+
+ key = GETKEY(tb, value_to_insert->dbterm.tpl);
+
+ dstack[dpos++] = DIR_END;
+ for (;;)
+ if (!*this) { /* Found our place */
+ state = 1;
+ *this = value_to_insert;
+ (*this)->balance = 0;
+ (*this)->left = (*this)->right = NULL;
+ break;
+ } else if ((c = cmp_key(&tb->common, key, *this)) < 0) {
+ /* go lefts */
+ dstack[dpos++] = DIR_LEFT;
+ tstack[tpos++] = this;
+ this = &((*this)->left);
+ } else { /* go right */
+ dstack[dpos++] = DIR_RIGHT;
+ tstack[tpos++] = this;
+ this = &((*this)->right);
+ }
+
+ while (state && ( dir = dstack[--dpos] ) != DIR_END) {
+ this = tstack[--tpos];
+ p = *this;
+ if (dir == DIR_LEFT) {
+ switch (p->balance) {
+ case 1:
+ p->balance = 0;
+ state = 0;
+ break;
+ case 0:
+ p->balance = -1;
+ break;
+ case -1: /* The icky case */
+ p1 = p->left;
+ if (p1->balance == -1) { /* Single LL rotation */
+ p->left = p1->right;
+ p1->right = p;
+ p->balance = 0;
+ (*this) = p1;
+ } else { /* Double RR rotation */
+ p2 = p1->right;
+ p1->right = p2->left;
+ p2->left = p1;
+ p->left = p2->right;
+ p2->right = p;
+ p->balance = (p2->balance == -1) ? +1 : 0;
+ p1->balance = (p2->balance == 1) ? -1 : 0;
+ (*this) = p2;
+ }
+ (*this)->balance = 0;
+ state = 0;
+ break;
+ }
+ } else { /* dir == DIR_RIGHT */
+ switch (p->balance) {
+ case -1:
+ p->balance = 0;
+ state = 0;
+ break;
+ case 0:
+ p->balance = 1;
+ break;
+ case 1:
+ p1 = p->right;
+ if (p1->balance == 1) { /* Single RR rotation */
+ p->right = p1->left;
+ p1->left = p;
+ p->balance = 0;
+ (*this) = p1;
+ } else { /* Double RL rotation */
+ p2 = p1->left;
+ p1->left = p2->right;
+ p2->right = p1;
+ p->right = p2->left;
+ p2->left = p;
+ p->balance = (p2->balance == 1) ? -1 : 0;
+ p1->balance = (p2->balance == -1) ? 1 : 0;
+ (*this) = p2;
+ }
+ (*this)->balance = 0;
+ state = 0;
+ break;
+ }
+ }
+ }
+ return base;
+}
+
+/*
+ * Split an AVL tree into two trees. The function stores the node
+ * containing the "split key" in the write back parameter
+ * split_key_wb. The function stores the left tree containing the keys
+ * that are smaller than the "split key" in the write back parameter
+ * left_wb and the tree containing the rest of the keys in the write
+ * back parameter right_wb.
+ */
+static void split_tree(DbTableCATree *tb,
+ TreeDbTerm *root,
+ TreeDbTerm **split_key_node_wb,
+ TreeDbTerm **left_wb,
+ TreeDbTerm **right_wb) {
+ TreeDbTerm * split_node = NULL;
+ TreeDbTerm * left_root;
+ TreeDbTerm * right_root;
+ if (root->left == NULL) { /* To get non empty split */
+ *right_wb = root->right;
+ *split_key_node_wb = root->right;
+ root->right = NULL;
+ root->balance = 0;
+ *left_wb = root;
+ return;
+ }
+ split_node = root;
+ left_root = split_node->left;
+ split_node->left = NULL;
+ right_root = split_node->right;
+ split_node->right = NULL;
+ right_root = insert_TreeDbTerm(tb, right_root, split_node);
+ *split_key_node_wb = split_node;
+ *left_wb = left_root;
+ *right_wb = right_root;
+}
+
+/*
+ * Used by the join_trees function
+ */
+static ERTS_INLINE int compute_tree_hight(TreeDbTerm * root)
+{
+ if(root == NULL) {
+ return 0;
+ } else {
+ TreeDbTerm * current_node = root;
+ int hight_so_far = 1;
+ while (current_node->left != NULL || current_node->right != NULL) {
+ if (current_node->balance == -1) {
+ current_node = current_node->left;
+ } else {
+ current_node = current_node->right;
+ }
+ hight_so_far = hight_so_far + 1;
+ }
+ return hight_so_far;
+ }
+}
+
+/*
+ * Used by the join_trees function
+ */
+static ERTS_INLINE
+TreeDbTerm* linkout_min_or_max_tree_node(TreeDbTerm **root, int is_min)
+{
+ TreeDbTerm **tstack[STACK_NEED];
+ int tpos = 0;
+ int dstack[STACK_NEED+1];
+ int dpos = 0;
+ int state = 0;
+ TreeDbTerm **this = root;
+ int dir;
+ TreeDbTerm *q = NULL;
+
+ dstack[dpos++] = DIR_END;
+ for (;;) {
+ if (!*this) { /* Failure */
+ return NULL;
+ } else if (is_min && (*this)->left != NULL) {
+ dstack[dpos++] = DIR_LEFT;
+ tstack[tpos++] = this;
+ this = &((*this)->left);
+ } else if (!is_min && (*this)->right != NULL) {
+ dstack[dpos++] = DIR_RIGHT;
+ tstack[tpos++] = this;
+ this = &((*this)->right);
+ } else { /* Min value, found the one to splice out */
+ q = (*this);
+ if (q->right == NULL) {
+ (*this) = q->left;
+ state = 1;
+ } else if (q->left == NULL) {
+ (*this) = q->right;
+ state = 1;
+ }
+ break;
+ }
+ }
+ while (state && ( dir = dstack[--dpos] ) != DIR_END) {
+ this = tstack[--tpos];
+ if (dir == DIR_LEFT) {
+ state = tree_balance_left(this);
+ } else {
+ state = tree_balance_right(this);
+ }
+ }
+ return q;
+}
+
+#define LINKOUT_MIN_TREE_NODE(root) linkout_min_or_max_tree_node(root, 1)
+#define LINKOUT_MAX_TREE_NODE(root) linkout_min_or_max_tree_node(root, 0)
+
+/*
+ * Joins two AVL trees where all the keys in the left one are smaller
+ * then the keys in the right one and returns the resulting tree.
+ *
+ * The algorithm is described on page 474 in D. E. Knuth. The Art of
+ * Computer Programming: Sorting and Searching,
+ * vol. 3. Addison-Wesley, 2nd edition, 1998.
+ */
+static TreeDbTerm* join_trees(TreeDbTerm *left_root_param,
+ TreeDbTerm *right_root_param)
+{
+ TreeDbTerm **tstack[STACK_NEED];
+ int tpos = 0;
+ int dstack[STACK_NEED+1];
+ int dpos = 0;
+ int state = 1;
+ TreeDbTerm **this;
+ int dir;
+ TreeDbTerm *p1, *p2, *p;
+ TreeDbTerm *left_root = left_root_param;
+ TreeDbTerm *right_root = right_root_param;
+ int left_height;
+ int right_height;
+ int current_height;
+ dstack[dpos++] = DIR_END;
+ if (left_root == NULL) {
+ return right_root;
+ } else if (right_root == NULL) {
+ return left_root;
+ }
+
+ left_height = compute_tree_hight(left_root);
+ right_height = compute_tree_hight(right_root);
+ if (left_height >= right_height) {
+ TreeDbTerm * new_root =
+ LINKOUT_MIN_TREE_NODE(&right_root);
+ int new_right_height = compute_tree_hight(right_root);
+ TreeDbTerm * current_node = left_root;
+ this = &left_root;
+ current_height = left_height;
+ while(current_height > new_right_height + 1) {
+ if (current_node->balance == -1) {
+ current_height = current_height - 2;
+ } else {
+ current_height = current_height - 1;
+ }
+ dstack[dpos++] = DIR_RIGHT;
+ tstack[tpos++] = this;
+ this = &((*this)->right);
+ current_node = current_node->right;
+ }
+ new_root->left = current_node;
+ new_root->right = right_root;
+ new_root->balance = new_right_height - current_height;
+ *this = new_root;
+ } else {
+ /* This case is symmetric to the previous case */
+ TreeDbTerm * new_root =
+ LINKOUT_MAX_TREE_NODE(&left_root);
+ int new_left_height = compute_tree_hight(left_root);
+ TreeDbTerm * current_node = right_root;
+ this = &right_root;
+ current_height = right_height;
+ while (current_height > new_left_height + 1) {
+ if (current_node->balance == 1) {
+ current_height = current_height - 2;
+ } else {
+ current_height = current_height - 1;
+ }
+ dstack[dpos++] = DIR_LEFT;
+ tstack[tpos++] = this;
+ this = &((*this)->left);
+ current_node = current_node->left;
+ }
+ new_root->right = current_node;
+ new_root->left = left_root;
+ new_root->balance = current_height - new_left_height;
+ *this = new_root;
+ }
+ /* Now we need to continue as if this was during the insert */
+ while (state && ( dir = dstack[--dpos] ) != DIR_END) {
+ this = tstack[--tpos];
+ p = *this;
+ if (dir == DIR_LEFT) {
+ switch (p->balance) {
+ case 1:
+ p->balance = 0;
+ state = 0;
+ break;
+ case 0:
+ p->balance = -1;
+ break;
+ case -1: /* The icky case */
+ p1 = p->left;
+ if (p1->balance == -1) { /* Single LL rotation */
+ p->left = p1->right;
+ p1->right = p;
+ p->balance = 0;
+ (*this) = p1;
+ } else { /* Double RR rotation */
+ p2 = p1->right;
+ p1->right = p2->left;
+ p2->left = p1;
+ p->left = p2->right;
+ p2->right = p;
+ p->balance = (p2->balance == -1) ? +1 : 0;
+ p1->balance = (p2->balance == 1) ? -1 : 0;
+ (*this) = p2;
+ }
+ (*this)->balance = 0;
+ state = 0;
+ break;
+ }
+ } else { /* dir == DIR_RIGHT */
+ switch (p->balance) {
+ case -1:
+ p->balance = 0;
+ state = 0;
+ break;
+ case 0:
+ p->balance = 1;
+ break;
+ case 1:
+ p1 = p->right;
+ if (p1->balance == 1) { /* Single RR rotation */
+ p->right = p1->left;
+ p1->left = p;
+ p->balance = 0;
+ (*this) = p1;
+ } else { /* Double RL rotation */
+ p2 = p1->left;
+ p1->left = p2->right;
+ p2->right = p1;
+ p->right = p2->left;
+ p2->left = p;
+ p->balance = (p2->balance == 1) ? -1 : 0;
+ p1->balance = (p2->balance == -1) ? 1 : 0;
+ (*this) = p2;
+ }
+ (*this)->balance = 0;
+ state = 0;
+ break;
+ }
+ }
+ }
+ /* Return the joined tree */
+ if (left_height >= right_height) {
+ return left_root;
+ } else {
+ return right_root;
+ }
+}
+
+#ifdef DEBUG
+# define FORCE_RANDOM_SPLIT_JOIN
+#endif
+#ifdef FORCE_RANDOM_SPLIT_JOIN
+static int dbg_fastrand(void)
+{
+ static int g_seed = 648835;
+ g_seed = (214013*g_seed+2531011);
+ return (g_seed>>16)&0x7FFF;
+}
+
+static void dbg_maybe_force_splitjoin(DbTableCATreeNode* base_node)
+{
+ switch (dbg_fastrand() % 8) {
+ case 1:
+ base_node->u.base.lock_statistics = 1+ERL_DB_CATREE_HIGH_CONTENTION_LIMIT;
+ break;
+ case 2:
+ base_node->u.base.lock_statistics = -1+ERL_DB_CATREE_LOW_CONTENTION_LIMIT;
+ break;
+ }
+}
+#else
+# define dbg_maybe_force_splitjoin(N)
+#endif /* FORCE_RANDOM_SPLIT_JOIN */
+
+static ERTS_INLINE
+int try_wlock_base_node(DbTableCATreeBaseNode *base_node)
+{
+ return EBUSY == erts_rwmtx_tryrwlock(&base_node->lock);
+}
+
+/*
+ * Locks a base node without adjusting the lock statistics
+ */
+static ERTS_INLINE
+void wlock_base_node_no_stats(DbTableCATreeNode *base_node)
+{
+ ASSERT(base_node->is_base_node);
+ erts_rwmtx_rwlock(&base_node->u.base.lock);
+}
+
+/*
+ * Locks a base node and adjusts the lock statistics according to if
+ * the lock was contended or not
+ */
+static ERTS_INLINE
+void wlock_base_node(DbTableCATreeNode *base_node)
+{
+ ASSERT(base_node->is_base_node);
+ if (try_wlock_base_node(&base_node->u.base)) {
+ /* The lock is contended */
+ wlock_base_node_no_stats(base_node);
+ base_node->u.base.lock_statistics += ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION;
+ } else {
+ base_node->u.base.lock_statistics += ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION;
+ }
+}
+
+static ERTS_INLINE
+void wunlock_base_node(DbTableCATreeNode *base_node)
+{
+ erts_rwmtx_rwunlock(&base_node->u.base.lock);
+}
+
+static ERTS_INLINE
+void wunlock_adapt_base_node(DbTableCATree* tb,
+ DbTableCATreeNode* node,
+ DbTableCATreeNode* parent,
+ int current_level)
+{
+ dbg_maybe_force_splitjoin(node);
+ if ((!node->u.base.root && parent && !(tb->common.status
+ & DB_CATREE_FORCE_SPLIT))
+ || node->u.base.lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) {
+ join_catree(tb, node, parent);
+ }
+ else if (node->u.base.lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT
+ && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) {
+ split_catree(tb, node, parent);
+ }
+ else {
+ wunlock_base_node(node);
+ }
+}
+
+static ERTS_INLINE
+void rlock_base_node(DbTableCATreeNode *base_node)
+{
+ ASSERT(base_node->is_base_node);
+ erts_rwmtx_rlock(&base_node->u.base.lock);
+}
+
+static ERTS_INLINE
+void runlock_base_node(DbTableCATreeNode *base_node)
+{
+ ASSERT(base_node->is_base_node);
+ erts_rwmtx_runlock(&base_node->u.base.lock);
+}
+
+static ERTS_INLINE
+void lock_route_node(DbTableCATreeNode *route_node)
+{
+ ASSERT(!route_node->is_base_node);
+ erts_mtx_lock(&route_node->u.route.lock);
+}
+
+static ERTS_INLINE
+void unlock_route_node(DbTableCATreeNode *route_node)
+{
+ ASSERT(!route_node->is_base_node);
+ erts_mtx_unlock(&route_node->u.route.lock);
+}
+
+static ERTS_INLINE
+void init_root_iterator(DbTableCATree* tb, CATreeRootIterator* iter,
+ int read_only)
+{
+ iter->tb = tb;
+ iter->read_only = read_only;
+ iter->locked_bnode = NULL;
+ iter->next_route_key = THE_NON_VALUE;
+ iter->search_key = NULL;
+}
+
+static ERTS_INLINE
+void lock_iter_base_node(CATreeRootIterator* iter,
+ DbTableCATreeNode *base_node,
+ DbTableCATreeNode *parent,
+ int current_level)
+{
+ ASSERT(!iter->locked_bnode);
+ if (iter->read_only)
+ rlock_base_node(base_node);
+ else {
+ wlock_base_node(base_node);
+ iter->bnode_parent = parent;
+ iter->bnode_level = current_level;
+ }
+ iter->locked_bnode = base_node;
+}
+
+static ERTS_INLINE
+void unlock_iter_base_node(CATreeRootIterator* iter)
+{
+ ASSERT(iter->locked_bnode);
+ if (iter->read_only)
+ runlock_base_node(iter->locked_bnode);
+ else if (iter->locked_bnode->u.base.is_valid) {
+ wunlock_adapt_base_node(iter->tb, iter->locked_bnode,
+ iter->bnode_parent, iter->bnode_level);
+ }
+ else
+ wunlock_base_node(iter->locked_bnode);
+ iter->locked_bnode = NULL;
+}
+
+static ERTS_INLINE
+void destroy_root_iterator(CATreeRootIterator* iter)
+{
+ if (iter->locked_bnode)
+ unlock_iter_base_node(iter);
+ if (iter->search_key)
+ erts_free(ERTS_ALC_T_DB_TMP, iter->search_key);
+}
+
+
+typedef struct
+{
+ DbTableCATreeNode *parent;
+ int current_level;
+} FindBaseNode;
+
+static ERTS_INLINE
+DbTableCATreeNode* find_base_node(DbTableCATree* tb, Eterm key,
+ FindBaseNode* fbn)
+{
+ DbTableCATreeNode* ERTS_RESTRICT node = GET_ROOT_ACQB(tb);
+ if (fbn) {
+ fbn->parent = NULL;
+ fbn->current_level = 0;
+ }
+ while (!node->is_base_node) {
+ if (fbn) {
+ fbn->current_level++;
+ fbn->parent = node;
+ }
+ if (cmp_key_route(key, node) < 0) {
+ node = GET_LEFT_ACQB(node);
+ } else {
+ node = GET_RIGHT_ACQB(node);
+ }
+ }
+ return node;
+}
+
+static ERTS_INLINE
+DbTableCATreeNode* find_rlock_valid_base_node(DbTableCATree* tb, Eterm key)
+{
+ DbTableCATreeNode* base_node;
+
+ while (1) {
+ base_node = find_base_node(tb, key, NULL);
+ rlock_base_node(base_node);
+ if (base_node->u.base.is_valid)
+ break;
+ runlock_base_node(base_node);
+ }
+ return base_node;
+}
+
+static ERTS_INLINE
+DbTableCATreeNode* find_wlock_valid_base_node(DbTableCATree* tb, Eterm key,
+ FindBaseNode* fbn)
+{
+ DbTableCATreeNode* base_node;
+
+ while (1) {
+ base_node = find_base_node(tb, key, fbn);
+ wlock_base_node(base_node);
+ if (base_node->u.base.is_valid)
+ break;
+ wunlock_base_node(base_node);
+ }
+ return base_node;
+}
+
+static ERTS_INLINE
+Eterm copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size)
+{
+ dst->size = key_size;
+ if (key_size != 0) {
+ Eterm* hp = &dst->heap[0];
+ ErlOffHeap tmp_offheap;
+ tmp_offheap.first = NULL;
+ dst->term = copy_struct(key, key_size, &hp, &tmp_offheap);
+ dst->oh = tmp_offheap.first;
+ }
+ else {
+ ASSERT(is_immed(key));
+ dst->term = key;
+ dst->oh = NULL;
+ }
+ return dst->term;
+}
+
+static ERTS_INLINE
+void destroy_route_key(DbRouteKey* key)
+{
+ if (key->oh) {
+ ErlOffHeap oh;
+ oh.first = key->oh;
+ erts_cleanup_offheap(&oh);
+ }
+}
+
+
+#ifdef ERTS_ENABLE_LOCK_CHECK
+# define LC_ORDER(ORDER) ORDER
+#else
+# define LC_ORDER(ORDER) NIL
+#endif
+
+#define sizeof_base_node() \
+ offsetof(DbTableCATreeNode, u.base.end_of_struct__)
+
+static DbTableCATreeNode *create_base_node(DbTableCATree *tb,
+ TreeDbTerm* root)
+{
+ DbTableCATreeNode *p;
+ erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
+ p = erts_db_alloc(ERTS_ALC_T_DB_TABLE, (DbTable *) tb,
+ sizeof_base_node());
+
+ p->is_base_node = 1;
+ p->u.base.root = root;
+ if (tb->common.type & DB_FREQ_READ)
+ rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
+ if (erts_ets_rwmtx_spin_count >= 0)
+ rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
+
+ erts_rwmtx_init_opt(&p->u.base.lock, &rwmtx_opt,
+ "erl_db_catree_base_node",
+ NIL,
+ ERTS_LOCK_FLAGS_CATEGORY_DB);
+ p->u.base.lock_statistics = ((tb->common.status & DB_CATREE_FORCE_SPLIT)
+ ? INT_MAX : 0);
+ p->u.base.is_valid = 1;
+ return p;
+}
+
+static ERTS_INLINE Uint sizeof_route_node(Uint key_size)
+{
+ return (offsetof(DbTableCATreeNode, u.route.key.heap)
+ + key_size*sizeof(Eterm));
+}
+
+static DbTableCATreeNode*
+create_route_node(DbTableCATree *tb,
+ DbTableCATreeNode *left,
+ DbTableCATreeNode *right,
+ DbTerm * keyTerm,
+ DbTableCATreeNode* lc_parent)
+{
+ Eterm key = GETKEY(tb,keyTerm->tpl);
+ int key_size = size_object(key);
+ DbTableCATreeNode* p = erts_db_alloc(ERTS_ALC_T_DB_TABLE,
+ (DbTable *) tb,
+ sizeof_route_node(key_size));
+
+ copy_route_key(&p->u.route.key, key, key_size);
+ p->is_base_node = 0;
+ p->u.route.is_valid = 1;
+ erts_atomic_init_nob(&p->u.route.left, (erts_aint_t)left);
+ erts_atomic_init_nob(&p->u.route.right, (erts_aint_t)right);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ /* Route node lock order is inverse tree depth (from leafs toward root) */
+ p->u.route.lc_order = (lc_parent == NULL ? MAX_SMALL :
+ lc_parent->u.route.lc_order - 1);
+ /*
+ * This assert may eventually fail as we don't increase 'lc_order' in join
+ * operations when route nodes move up in the tree.
+ * Tough luck if you run a lock-checking VM for such a long time on 32-bit.
+ */
+ ERTS_LC_ASSERT(p->u.route.lc_order >= 0);
+#endif
+ erts_mtx_init(&p->u.route.lock, "erl_db_catree_route_node",
+ LC_ORDER(make_small(p->u.route.lc_order)),
+ ERTS_LOCK_FLAGS_CATEGORY_DB);
+ return p;
+}
+
+static void do_free_base_node(void* vptr)
+{
+ DbTableCATreeNode *p = (DbTableCATreeNode *)vptr;
+ ASSERT(p->is_base_node);
+ erts_rwmtx_destroy(&p->u.base.lock);
+ erts_free(ERTS_ALC_T_DB_TABLE, p);
+}
+
+static void free_catree_base_node(DbTableCATree* tb, DbTableCATreeNode* p)
+{
+ ASSERT(p->is_base_node);
+ ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_base_node(), 0);
+ do_free_base_node(p);
+}
+
+static void do_free_route_node(void *vptr)
+{
+ DbTableCATreeNode *p = (DbTableCATreeNode *)vptr;
+ ASSERT(!p->is_base_node);
+ erts_mtx_destroy(&p->u.route.lock);
+ destroy_route_key(&p->u.route.key);
+ erts_free(ERTS_ALC_T_DB_TABLE, p);
+}
+
+static void free_catree_route_node(DbTableCATree* tb, DbTableCATreeNode* p)
+{
+ ASSERT(!p->is_base_node);
+ ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_route_node(p->u.route.key.size), 0);
+ do_free_route_node(p);
+}
+
+
+/*
+ * Returns the parent routing node of the specified
+ * route node 'child' if such a parent exists
+ * or NULL if 'child' is attached to the root.
+ */
+static ERTS_INLINE DbTableCATreeNode *
+parent_of(DbTableCATree *tb,
+ DbTableCATreeNode *child)
+{
+ Eterm key = GET_ROUTE_NODE_KEY(child);
+ DbTableCATreeNode *current = GET_ROOT_ACQB(tb);
+ DbTableCATreeNode *prev = NULL;
+
+ while (current != child) {
+ prev = current;
+ if (cmp_key_route(key, current) < 0) {
+ current = GET_LEFT_ACQB(current);
+ } else {
+ current = GET_RIGHT_ACQB(current);
+ }
+ }
+ return prev;
+}
+
+
+static ERTS_INLINE DbTableCATreeNode *
+leftmost_base_node(DbTableCATreeNode *root)
+{
+ DbTableCATreeNode *node = root;
+ while (!node->is_base_node) {
+ node = GET_LEFT_ACQB(node);
+ }
+ return node;
+}
+
+
+static ERTS_INLINE DbTableCATreeNode *
+rightmost_base_node(DbTableCATreeNode *root)
+{
+ DbTableCATreeNode *node = root;
+ while (!node->is_base_node) {
+ node = GET_RIGHT_ACQB(node);
+ }
+ return node;
+}
+
+
+static ERTS_INLINE DbTableCATreeNode *
+leftmost_route_node(DbTableCATreeNode *root)
+{
+ DbTableCATreeNode *node = root;
+ DbTableCATreeNode *prev_node = NULL;
+ while (!node->is_base_node) {
+ prev_node = node;
+ node = GET_LEFT_ACQB(node);
+ }
+ return prev_node;
+}
+
+static ERTS_INLINE DbTableCATreeNode*
+rightmost_route_node(DbTableCATreeNode *root)
+{
+ DbTableCATreeNode * node = root;
+ DbTableCATreeNode * prev_node = NULL;
+ while (!node->is_base_node) {
+ prev_node = node;
+ node = GET_RIGHT_ACQB(node);
+ }
+ return prev_node;
+}
+
+static ERTS_INLINE
+void init_tree_stack(DbTreeStack *stack,
+ TreeDbTerm **stack_array,
+ Uint init_slot)
+{
+ stack->array = stack_array;
+ stack->pos = 0;
+ stack->slot = init_slot;
+}
+
+static void join_catree(DbTableCATree *tb,
+ DbTableCATreeNode *thiz,
+ DbTableCATreeNode *parent)
+{
+ DbTableCATreeNode *gparent;
+ DbTableCATreeNode *neighbor;
+ DbTableCATreeNode *new_neighbor;
+ DbTableCATreeNode *neighbor_parent;
+
+ ASSERT(thiz->is_base_node);
+ if (parent == NULL) {
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(thiz);
+ return;
+ }
+ ASSERT(!parent->is_base_node);
+ if (GET_LEFT(parent) == thiz) {
+ neighbor = leftmost_base_node(GET_RIGHT_ACQB(parent));
+ if (try_wlock_base_node(&neighbor->u.base)) {
+ /* Failed to acquire lock */
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(thiz);
+ return;
+ } else if (!neighbor->u.base.is_valid) {
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(thiz);
+ wunlock_base_node(neighbor);
+ return;
+ } else {
+ lock_route_node(parent);
+ parent->u.route.is_valid = 0;
+ neighbor->u.base.is_valid = 0;
+ thiz->u.base.is_valid = 0;
+ gparent = NULL;
+ do {
+ if (gparent != NULL) {
+ unlock_route_node(gparent);
+ }
+ gparent = parent_of(tb, parent);
+ if (gparent != NULL)
+ lock_route_node(gparent);
+ } while (gparent != NULL && !gparent->u.route.is_valid);
+
+ if (gparent == NULL) {
+ SET_ROOT_RELB(tb, GET_RIGHT(parent));
+ } else if (GET_LEFT(gparent) == parent) {
+ SET_LEFT_RELB(gparent, GET_RIGHT(parent));
+ } else {
+ SET_RIGHT_RELB(gparent, GET_RIGHT(parent));
+ }
+ unlock_route_node(parent);
+ if (gparent != NULL) {
+ unlock_route_node(gparent);
+ }
+ {
+ TreeDbTerm* new_root = join_trees(thiz->u.base.root,
+ neighbor->u.base.root);
+ new_neighbor = create_base_node(tb, new_root);
+ }
+ if (GET_RIGHT(parent) == neighbor) {
+ neighbor_parent = gparent;
+ } else {
+ neighbor_parent = leftmost_route_node(GET_RIGHT(parent));
+ }
+ }
+ } else { /* Symetric case */
+ ASSERT(GET_RIGHT(parent) == thiz);
+ neighbor = rightmost_base_node(GET_LEFT_ACQB(parent));
+ if (try_wlock_base_node(&neighbor->u.base)) {
+ /* Failed to acquire lock */
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(thiz);
+ return;
+ } else if (!neighbor->u.base.is_valid) {
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(thiz);
+ wunlock_base_node(neighbor);
+ return;
+ } else {
+ lock_route_node(parent);
+ parent->u.route.is_valid = 0;
+ neighbor->u.base.is_valid = 0;
+ thiz->u.base.is_valid = 0;
+ gparent = NULL;
+ do {
+ if (gparent != NULL) {
+ unlock_route_node(gparent);
+ }
+ gparent = parent_of(tb, parent);
+ if (gparent != NULL) {
+ lock_route_node(gparent);
+ } else {
+ gparent = NULL;
+ }
+ } while (gparent != NULL && !gparent->u.route.is_valid);
+ if (gparent == NULL) {
+ SET_ROOT_RELB(tb, GET_LEFT(parent));
+ } else if (GET_RIGHT(gparent) == parent) {
+ SET_RIGHT_RELB(gparent, GET_LEFT(parent));
+ } else {
+ SET_LEFT_RELB(gparent, GET_LEFT(parent));
+ }
+ unlock_route_node(parent);
+ if (gparent != NULL) {
+ unlock_route_node(gparent);
+ }
+ {
+ TreeDbTerm* new_root = join_trees(neighbor->u.base.root,
+ thiz->u.base.root);
+ new_neighbor = create_base_node(tb, new_root);
+ }
+ if (GET_LEFT(parent) == neighbor) {
+ neighbor_parent = gparent;
+ } else {
+ neighbor_parent =
+ rightmost_route_node(GET_LEFT(parent));
+ }
+ }
+ }
+ /* Link in new neighbor and free nodes that are no longer in the tree */
+ if (neighbor_parent == NULL) {
+ SET_ROOT_RELB(tb, new_neighbor);
+ } else if (GET_LEFT(neighbor_parent) == neighbor) {
+ SET_LEFT_RELB(neighbor_parent, new_neighbor);
+ } else {
+ SET_RIGHT_RELB(neighbor_parent, new_neighbor);
+ }
+ wunlock_base_node(thiz);
+ wunlock_base_node(neighbor);
+ /* Free the parent and base */
+ erts_schedule_db_free(&tb->common,
+ do_free_route_node,
+ parent,
+ &parent->u.route.free_item,
+ sizeof_route_node(parent->u.route.key.size));
+ erts_schedule_db_free(&tb->common,
+ do_free_base_node,
+ thiz,
+ &thiz->u.base.free_item,
+ sizeof_base_node());
+ erts_schedule_db_free(&tb->common,
+ do_free_base_node,
+ neighbor,
+ &neighbor->u.base.free_item,
+ sizeof_base_node());
+}
+
+static void split_catree(DbTableCATree *tb,
+ DbTableCATreeNode* ERTS_RESTRICT base,
+ DbTableCATreeNode* ERTS_RESTRICT parent)
+{
+ TreeDbTerm *splitOutWriteBack;
+ DbTableCATreeNode* ERTS_RESTRICT new_left;
+ DbTableCATreeNode* ERTS_RESTRICT new_right;
+ DbTableCATreeNode* ERTS_RESTRICT new_route;
+
+ if (less_than_two_elements(base->u.base.root)) {
+ if (!(tb->common.status & DB_CATREE_FORCE_SPLIT))
+ base->u.base.lock_statistics = 0;
+ wunlock_base_node(base);
+ return;
+ } else {
+ TreeDbTerm *left_tree;
+ TreeDbTerm *right_tree;
+
+ split_tree(tb, base->u.base.root, &splitOutWriteBack,
+ &left_tree, &right_tree);
+
+ new_left = create_base_node(tb, left_tree);
+ new_right = create_base_node(tb, right_tree);
+ new_route = create_route_node(tb,
+ new_left,
+ new_right,
+ &splitOutWriteBack->dbterm,
+ parent);
+ if (parent == NULL) {
+ SET_ROOT_RELB(tb, new_route);
+ } else if(GET_LEFT(parent) == base) {
+ SET_LEFT_RELB(parent, new_route);
+ } else {
+ SET_RIGHT_RELB(parent, new_route);
+ }
+ base->u.base.is_valid = 0;
+ wunlock_base_node(base);
+ erts_schedule_db_free(&tb->common,
+ do_free_base_node,
+ base,
+ &base->u.base.free_item,
+ sizeof_base_node());
+ }
+}
+
+/*
+ * Helper functions for removing the table
+ */
+
+static void catree_add_base_node_to_free_list(
+ DbTableCATree *tb,
+ DbTableCATreeNode *base_node_container)
+{
+ base_node_container->u.base.next =
+ tb->base_nodes_to_free_list;
+ tb->base_nodes_to_free_list = base_node_container;
+}
+
+static void catree_deque_base_node_from_free_list(DbTableCATree *tb)
+{
+ if (tb->base_nodes_to_free_list == NULL) {
+ return; /* List empty */
+ } else {
+ DbTableCATreeNode *first = tb->base_nodes_to_free_list;
+ tb->base_nodes_to_free_list = first->u.base.next;
+ }
+}
+
+static DbTableCATreeNode *catree_first_base_node_from_free_list(
+ DbTableCATree *tb)
+{
+ return tb->base_nodes_to_free_list;
+}
+
+static SWord do_free_routing_nodes_catree_cont(DbTableCATree *tb, SWord num_left)
+{
+ DbTableCATreeNode *root;
+ DbTableCATreeNode *p;
+ for (;;) {
+ root = POP_NODE(&tb->free_stack_rnodes);
+ if (root == NULL) break;
+ else if(root->is_base_node) {
+ catree_add_base_node_to_free_list(tb, root);
+ break;
+ }
+ for (;;) {
+ if ((GET_LEFT(root) != NULL) &&
+ (p = GET_LEFT(root))->is_base_node) {
+ SET_LEFT(root, NULL);
+ catree_add_base_node_to_free_list(tb, p);
+ } else if ((GET_RIGHT(root) != NULL) &&
+ (p = GET_RIGHT(root))->is_base_node) {
+ SET_RIGHT(root, NULL);
+ catree_add_base_node_to_free_list(tb, p);
+ } else if ((p = GET_LEFT(root)) != NULL) {
+ SET_LEFT(root, NULL);
+ PUSH_NODE(&tb->free_stack_rnodes, root);
+ root = p;
+ } else if ((p = GET_RIGHT(root)) != NULL) {
+ SET_RIGHT(root, NULL);
+ PUSH_NODE(&tb->free_stack_rnodes, root);
+ root = p;
+ } else {
+ free_catree_route_node(tb, root);
+ if (--num_left >= 0) {
+ break;
+ } else {
+ return num_left; /* Done enough for now */
+ }
+ }
+ }
+ }
+ return num_left;
+}
+
+static SWord do_free_base_node_cont(DbTableCATree *tb, SWord num_left)
+{
+ TreeDbTerm *root;
+ TreeDbTerm *p;
+ DbTableCATreeNode *base_node_container =
+ catree_first_base_node_from_free_list(tb);
+ for (;;) {
+ root = POP_NODE(&tb->free_stack_elems);
+ if (root == NULL) break;
+ for (;;) {
+ if ((p = root->left) != NULL) {
+ root->left = NULL;
+ PUSH_NODE(&tb->free_stack_elems, root);
+ root = p;
+ } else if ((p = root->right) != NULL) {
+ root->right = NULL;
+ PUSH_NODE(&tb->free_stack_elems, root);
+ root = p;
+ } else {
+ free_term((DbTable*)tb, root);
+ if (--num_left >= 0) {
+ break;
+ } else {
+ return num_left; /* Done enough for now */
+ }
+ }
+ }
+ }
+ catree_deque_base_node_from_free_list(tb);
+ free_catree_base_node(tb, base_node_container);
+ base_node_container = catree_first_base_node_from_free_list(tb);
+ if (base_node_container != NULL) {
+ PUSH_NODE(&tb->free_stack_elems, base_node_container->u.base.root);
+ }
+ return num_left;
+}
+
+
+/*
+** Initialization function
+*/
+
+void db_initialize_catree(void)
+{
+ return;
+};
+
+/*
+** Table interface routines (i.e., what's called by the bif's)
+*/
+
+int db_create_catree(Process *p, DbTable *tbl)
+{
+ DbTableCATree *tb = &tbl->catree;
+ DbTableCATreeNode *root;
+
+ root = create_base_node(tb, NULL);
+ tb->deletion = 0;
+ tb->base_nodes_to_free_list = NULL;
+ erts_atomic_init_relb(&(tb->root), (erts_aint_t)root);
+ return DB_ERROR_NONE;
+}
+
+static int db_first_catree(Process *p, DbTable *tbl, Eterm *ret)
+{
+ TreeDbTerm *root;
+ CATreeRootIterator iter;
+ int result;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ root = *catree_find_first_root(&iter);
+ if (!root) {
+ TreeDbTerm **pp = catree_find_next_root(&iter, NULL);
+ root = pp ? *pp : NULL;
+ }
+
+ result = db_first_tree_common(p, tbl, root, ret, NULL);
+
+ destroy_root_iterator(&iter);
+ return result;
+}
+
+static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTreeStack stack;
+ TreeDbTerm * stack_array[STACK_NEED];
+ TreeDbTerm **rootp;
+ CATreeRootIterator iter;
+ int result;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ iter.next_route_key = key;
+ rootp = catree_find_next_root(&iter, NULL);
+
+ do {
+ init_tree_stack(&stack, stack_array, 0);
+ result = db_next_tree_common(p, tbl, (rootp ? *rootp : NULL), key, ret, &stack);
+ if (result != DB_ERROR_NONE || *ret != am_EOT)
+ break;
+
+ rootp = catree_find_next_root(&iter, NULL);
+ } while (rootp);
+
+ destroy_root_iterator(&iter);
+ return result;
+}
+
+static int db_last_catree(Process *p, DbTable *tbl, Eterm *ret)
+{
+ TreeDbTerm *root;
+ CATreeRootIterator iter;
+ int result;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ root = *catree_find_last_root(&iter);
+ if (!root) {
+ TreeDbTerm **pp = catree_find_prev_root(&iter, NULL);
+ root = pp ? *pp : NULL;
+ }
+
+ result = db_last_tree_common(p, tbl, root, ret, NULL);
+
+ destroy_root_iterator(&iter);
+ return result;
+}
+
+static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTreeStack stack;
+ TreeDbTerm * stack_array[STACK_NEED];
+ TreeDbTerm **rootp;
+ CATreeRootIterator iter;
+ int result;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ iter.next_route_key = key;
+ rootp = catree_find_prev_root(&iter, NULL);
+
+ do {
+ init_tree_stack(&stack, stack_array, 0);
+ result = db_prev_tree_common(p, tbl, (rootp ? *rootp : NULL), key, ret,
+ &stack);
+ if (result != DB_ERROR_NONE || *ret != am_EOT)
+ break;
+ rootp = catree_find_prev_root(&iter, NULL);
+ } while (rootp);
+
+ destroy_root_iterator(&iter);
+ return result;
+}
+
+static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail)
+{
+ DbTableCATree *tb = &tbl->catree;
+ Eterm key = GETKEY(&tb->common, tuple_val(obj));
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_put_tree_common(&tb->common, &node->u.base.root, obj,
+ key_clash_fail, NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
+}
+
+static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
+ int result = db_get_tree_common(p, &tb->common,
+ node->u.base.root,
+ key, ret, NULL);
+ runlock_base_node(node);
+ return result;
+}
+
+TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator* iter)
+{
+ FindBaseNode fbn;
+ DbTableCATreeNode* base_node;
+
+ while (1) {
+ base_node = find_base_node(iter->tb, key, &fbn);
+ lock_iter_base_node(iter, base_node, fbn.parent, fbn.current_level);
+ if (base_node->u.base.is_valid)
+ break;
+ unlock_iter_base_node(iter);
+ }
+ return &base_node->u.base.root;
+}
+
+static Eterm save_iter_search_key(CATreeRootIterator* iter, Eterm key)
+{
+ Uint key_size;
+
+ if (is_immed(key))
+ return key;
+
+ if (iter->search_key) {
+ if (key == iter->search_key->term)
+ return key; /* already saved */
+ destroy_route_key(iter->search_key);
+ }
+ key_size = size_object(key);
+ if (!iter->search_key || key_size > iter->search_key->size) {
+ iter->search_key = erts_realloc(ERTS_ALC_T_DB_TMP,
+ iter->search_key,
+ (offsetof(DbRouteKey, heap)
+ + key_size*sizeof(Eterm)));
+ }
+ return copy_route_key(iter->search_key, key, key_size);
+}
+
+TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter,
+ int forward,
+ Eterm *search_keyp)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_invalid = NULL;
+ DbTableCATreeNode *rejected_empty = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode *parent;
+ DbTableCATreeNode* next_route_node;
+ Eterm route_key = iter->next_route_key;
+ int current_level;
+
+ if (iter->locked_bnode) {
+ if (search_keyp)
+ *search_keyp = save_iter_search_key(iter, *search_keyp);
+ unlock_iter_base_node(iter);
+ }
+
+ if (is_non_value(route_key))
+ return NULL;
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ parent = NULL;
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ current_level++;
+ parent = node;
+ if (forward) {
+ if (cmp_key_route(route_key,node) < 0) {
+ next_route_node = node;
+ node = GET_LEFT_ACQB(node);
+ } else {
+ node = GET_RIGHT_ACQB(node);
+ }
+ }
+ else {
+ if (cmp_key_route(route_key,node) > 0) {
+ next_route_node = node;
+ node = GET_RIGHT_ACQB(node);
+ } else {
+ node = GET_LEFT_ACQB(node);
+ }
+ }
+ }
+ ASSERT(node != rejected_invalid);
+ lock_iter_base_node(iter, node, parent, current_level);
+ if (node->u.base.is_valid) {
+ ASSERT(node != rejected_empty);
+ if (node->u.base.root) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ iter->locked_bnode = node;
+ return &node->u.base.root;
+ }
+ if (!next_route_node) {
+ unlock_iter_base_node(iter);
+ return NULL;
+ }
+ route_key = next_route_node->u.route.key.term;
+ IF_DEBUG(rejected_empty = node);
+ }
+ else
+ IF_DEBUG(rejected_invalid = node);
+
+ /* Retry */
+ unlock_iter_base_node(iter);
+ }
+}
+
+TreeDbTerm** catree_find_next_root(CATreeRootIterator *iter, Eterm* keyp)
+{
+ return catree_find_nextprev_root(iter, 1, keyp);
+}
+
+TreeDbTerm** catree_find_prev_root(CATreeRootIterator *iter, Eterm* keyp)
+{
+ return catree_find_nextprev_root(iter, 0, keyp);
+}
+
+/* @brief Find root of tree where object with smallest key of all larger than
+ * partially bound key may reside. Can be used as a starting point for
+ * a reverse iteration with pb_key.
+ *
+ * @param pb_key The partially bound key. Example {42, '$1'}
+ * @param iter An initialized root iterator.
+ *
+ * @return Pointer to found root pointer. May not be NULL.
+ */
+TreeDbTerm** catree_find_next_from_pb_key_root(Eterm pb_key,
+ CATreeRootIterator* iter)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode *parent;
+ DbTableCATreeNode* next_route_node;
+ int current_level;
+
+ ASSERT(!iter->locked_bnode);
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ parent = NULL;
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ current_level++;
+ parent = node;
+ if (cmp_partly_bound(pb_key, GET_ROUTE_NODE_KEY(node)) >= 0) {
+ next_route_node = node;
+ node = GET_RIGHT_ACQB(node);
+ } else {
+ node = GET_LEFT_ACQB(node);
+ }
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, node, parent, current_level);
+ if (node->u.base.is_valid) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ return &node->u.base.root;
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+/* @brief Find root of tree where object with largest key of all smaller than
+ * partially bound key may reside. Can be used as a starting point for
+ * a forward iteration with pb_key.
+ *
+ * @param pb_key The partially bound key. Example {42, '$1'}
+ * @param iter An initialized root iterator.
+ *
+ * @return Pointer to found root pointer. May not be NULL.
+ */
+TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key,
+ CATreeRootIterator* iter)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode *parent;
+ DbTableCATreeNode* next_route_node;
+ int current_level;
+
+ ASSERT(!iter->locked_bnode);
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ parent = NULL;
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ current_level++;
+ parent = node;
+ if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) <= 0) {
+ next_route_node = node;
+ node = GET_LEFT_ACQB(node);
+ } else {
+ node = GET_RIGHT_ACQB(node);
+ }
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, node, parent, current_level);
+ if (node->u.base.is_valid) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ return &node->u.base.root;
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+static TreeDbTerm** catree_find_firstlast_root(CATreeRootIterator* iter,
+ int first)
+{
+#ifdef DEBUG
+ DbTableCATreeNode *rejected_base = NULL;
+#endif
+ DbTableCATreeNode *node;
+ DbTableCATreeNode* next_route_node;
+ int current_level;
+
+ while (1) {
+ node = GET_ROOT_ACQB(iter->tb);
+ current_level = 0;
+ next_route_node = NULL;
+ while (!node->is_base_node) {
+ current_level++;
+ next_route_node = node;
+ node = first ? GET_LEFT_ACQB(node) : GET_RIGHT_ACQB(node);
+ }
+ ASSERT(node != rejected_base);
+ lock_iter_base_node(iter, node, next_route_node, current_level);
+ if (node->u.base.is_valid) {
+ iter->next_route_key = (next_route_node ?
+ next_route_node->u.route.key.term :
+ THE_NON_VALUE);
+ return &node->u.base.root;
+ }
+ /* Retry */
+ unlock_iter_base_node(iter);
+#ifdef DEBUG
+ rejected_base = node;
+#endif
+ }
+}
+
+TreeDbTerm** catree_find_first_root(CATreeRootIterator* iter)
+{
+ return catree_find_firstlast_root(iter, 1);
+}
+
+TreeDbTerm** catree_find_last_root(CATreeRootIterator* iter)
+{
+ return catree_find_firstlast_root(iter, 0);
+}
+
+static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
+ int result = db_member_tree_common(&tb->common,
+ node->u.base.root,
+ key, ret, NULL);
+ runlock_base_node(node);
+ return result;
+}
+
+static int db_get_element_catree(Process *p, DbTable *tbl,
+ Eterm key, int ndex, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
+ int result = db_get_element_tree_common(p, &tb->common,
+ node->u.base.root,
+ key, ndex, ret, NULL);
+ runlock_base_node(node);
+ return result;
+}
+
+static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_erase_tree_common(tbl, &node->u.base.root, key,
+ ret, NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
+}
+
+static int db_erase_object_catree(DbTable *tbl, Eterm object, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ Eterm key = GETKEY(&tb->common, tuple_val(object));
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_erase_object_tree_common(tbl,
+ &node->u.base.root,
+ object,
+ ret,
+ NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
+}
+
+
+static int db_slot_catree(Process *p, DbTable *tbl,
+ Eterm slot_term, Eterm *ret)
+{
+ int result;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_slot_tree_common(p, tbl, *catree_find_first_root(&iter),
+ slot_term, ret, NULL, &iter);
+ destroy_root_iterator(&iter);
+ return result;
+}
+
+static int db_select_continue_catree(Process *p,
+ DbTable *tbl,
+ Eterm continuation,
+ Eterm *ret)
+{
+ int result;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_continue_tree_common(p, &tbl->common,
+ continuation, ret, NULL, &iter);
+ destroy_root_iterator(&iter);
+ return result;
+}
+
+static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, int reverse, Eterm *ret)
+{
+ int result;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_tree_common(p, tbl, tid, pattern, reverse, ret,
+ NULL, &iter);
+ destroy_root_iterator(&iter);
+ return result;
+}
+
+static int db_select_count_continue_catree(Process *p,
+ DbTable *tbl,
+ Eterm continuation,
+ Eterm *ret)
+{
+ int result;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_count_continue_tree_common(p, tbl,
+ continuation, ret, NULL,
+ &iter);
+ destroy_root_iterator(&iter);
+ return result;
+}
+
+static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret)
+{
+ int result;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_count_tree_common(p, tbl,
+ tid, pattern, ret, NULL, &iter);
+ destroy_root_iterator(&iter);
+ return result;
+}
+
+static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Sint chunk_size,
+ int reversed, Eterm *ret)
+{
+ int result;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ result = db_select_chunk_tree_common(p, tbl,
+ tid, pattern, chunk_size, reversed, ret,
+ NULL, &iter);
+ destroy_root_iterator(&iter);
+ return result;
+}
+
+static int db_select_delete_continue_catree(Process *p,
+ DbTable *tbl,
+ Eterm continuation,
+ Eterm *ret)
+{
+ DbTreeStack stack;
+ TreeDbTerm * stack_array[STACK_NEED];
+ int result;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
+ init_tree_stack(&stack, stack_array, 0);
+ result = db_select_delete_continue_tree_common(p, tbl, continuation, ret,
+ &stack, &iter);
+ destroy_root_iterator(&iter);
+ return result;
+}
+
+static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret)
+{
+ DbTreeStack stack;
+ TreeDbTerm * stack_array[STACK_NEED];
+ int result;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
+ init_tree_stack(&stack, stack_array, 0);
+ result = db_select_delete_tree_common(p, tbl,
+ tid, pattern, ret, &stack,
+ &iter);
+ destroy_root_iterator(&iter);
+ return result;
+}
+
+static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret)
+{
+ int result;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
+ result = db_select_replace_tree_common(p, tbl,
+ tid, pattern, ret, NULL, &iter);
+ destroy_root_iterator(&iter);
+ return result;
+}
+
+static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
+ Eterm continuation, Eterm *ret)
+{
+ int result;
+ CATreeRootIterator iter;
+
+ init_root_iterator(&tbl->catree, &iter, 0);
+ result = db_select_replace_continue_tree_common(p, tbl, continuation, ret,
+ NULL, &iter);
+ destroy_root_iterator(&iter);
+ return result;
+}
+
+static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTableCATree *tb = &tbl->catree;
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_take_tree_common(p, tbl, &node->u.base.root, key,
+ ret, NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
+}
+
+/*
+** Other interface routines (not directly coupled to one bif)
+*/
+
+
+/* Display tree contents (for dump) */
+static void db_print_catree(fmtfn_t to, void *to_arg,
+ int show, DbTable *tbl)
+{
+ CATreeRootIterator iter;
+ TreeDbTerm** root;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ root = catree_find_first_root(&iter);
+ do {
+ db_print_tree_common(to, to_arg, show, *root, tbl);
+ root = catree_find_next_root(&iter, NULL);
+ } while (root);
+ destroy_root_iterator(&iter);
+}
+
+/* Release all memory occupied by a single table */
+static int db_free_table_catree(DbTable *tbl)
+{
+ while (db_free_table_continue_catree(tbl, ERTS_SWORD_MAX) < 0)
+ ;
+ return 1;
+}
+
+static SWord db_free_table_continue_catree(DbTable *tbl, SWord reds)
+{
+ DbTableCATreeNode *first_base_node;
+ DbTableCATree *tb = &tbl->catree;
+ if (!tb->deletion) {
+ tb->deletion = 1;
+ tb->free_stack_elems.array =
+ erts_db_alloc(ERTS_ALC_T_DB_STK,
+ (DbTable *) tb,
+ sizeof(TreeDbTerm *) * STACK_NEED);
+ tb->free_stack_elems.pos = 0;
+ tb->free_stack_elems.slot = 0;
+ tb->free_stack_rnodes.array =
+ erts_db_alloc(ERTS_ALC_T_DB_STK,
+ (DbTable *) tb,
+ sizeof(DbTableCATreeNode *) * STACK_NEED);
+ tb->free_stack_rnodes.pos = 0;
+ tb->free_stack_rnodes.size = STACK_NEED;
+ PUSH_NODE(&tb->free_stack_rnodes, GET_ROOT(tb));
+ tb->is_routing_nodes_freed = 0;
+ tb->base_nodes_to_free_list = NULL;
+ }
+ if ( ! tb->is_routing_nodes_freed ) {
+ reds = do_free_routing_nodes_catree_cont(tb, reds);
+ if (reds < 0) {
+ return reds; /* Not finished */
+ } else {
+ tb->is_routing_nodes_freed = 1; /* Ready with the routing nodes */
+ first_base_node = catree_first_base_node_from_free_list(tb);
+ PUSH_NODE(&tb->free_stack_elems, first_base_node->u.base.root);
+ }
+ }
+ while (catree_first_base_node_from_free_list(tb) != NULL) {
+ reds = do_free_base_node_cont(tb, reds);
+ if (reds < 0) {
+ return reds; /* Continue later */
+ }
+ }
+ /* Time to free the main structure*/
+ erts_db_free(ERTS_ALC_T_DB_STK,
+ (DbTable *) tb,
+ (void *) tb->free_stack_elems.array,
+ sizeof(TreeDbTerm *) * STACK_NEED);
+ erts_db_free(ERTS_ALC_T_DB_STK,
+ (DbTable *) tb,
+ (void *) tb->free_stack_rnodes.array,
+ sizeof(DbTableCATreeNode *) * STACK_NEED);
+ return 1;
+}
+
+static SWord db_delete_all_objects_catree(Process* p, DbTable* tbl, SWord reds)
+{
+ reds = db_free_table_continue_catree(tbl, reds);
+ if (reds < 0)
+ return reds;
+ db_create_catree(p, tbl);
+ erts_atomic_set_nob(&tbl->catree.common.nitems, 0);
+ return reds;
+}
+
+
+static void db_foreach_offheap_catree(DbTable *tbl,
+ void (*func)(ErlOffHeap *, void *),
+ void *arg)
+{
+ CATreeRootIterator iter;
+ TreeDbTerm** root;
+
+ init_root_iterator(&tbl->catree, &iter, 1);
+ root = catree_find_first_root(&iter);
+ do {
+ db_foreach_offheap_tree_common(*root, func, arg);
+ root = catree_find_next_root(&iter, NULL);
+ } while (root);
+ destroy_root_iterator(&iter);
+}
+
+static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
+ DbUpdateHandle *handle)
+{
+ DbTableCATree *tb = &tbl->catree;
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int res = db_lookup_dbterm_tree_common(p, tbl, &node->u.base.root, key,
+ obj, handle, NULL);
+ if (res == 0) {
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ } else {
+ /* db_finalize_dbterm_catree will unlock */
+ handle->u.catree.base_node = node;
+ handle->u.catree.parent = fbn.parent;
+ handle->u.catree.current_level = fbn.current_level;
+ }
+ return res;
+}
+
+static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *handle)
+{
+ DbTableCATree *tb = &(handle->tb->catree);
+ db_finalize_dbterm_tree_common(cret, handle, NULL);
+ wunlock_adapt_base_node(tb, handle->u.catree.base_node,
+ handle->u.catree.parent,
+ handle->u.catree.current_level);
+ return;
+}
+
+#ifdef ERTS_ENABLE_LOCK_COUNT
+static void erts_lcnt_enable_db_catree_lock_count_helper(DbTableCATree *tb,
+ DbTableCATreeNode *node,
+ int enable)
+{
+ erts_lcnt_ref_t *lcnt_ref;
+ erts_lock_flags_t lock_type;
+ if (node->is_base_node) {
+ lcnt_ref = &GET_BASE_NODE_LOCK(node)->lcnt;
+ lock_type = ERTS_LOCK_TYPE_RWMUTEX;
+ } else {
+ erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_LEFT(node), enable);
+ erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_RIGHT(node), enable);
+ lcnt_ref = &GET_ROUTE_NODE_LOCK(node)->lcnt;
+ lock_type = ERTS_LOCK_TYPE_MUTEX;
+ }
+ if (enable) {
+ erts_lcnt_install_new_lock_info(lcnt_ref, "db_hash_slot", tb->common.the_name,
+ lock_type | ERTS_LOCK_FLAGS_CATEGORY_DB);
+ } else {
+ erts_lcnt_uninstall(lcnt_ref);
+ }
+}
+
+void erts_lcnt_enable_db_catree_lock_count(DbTableCATree *tb, int enable)
+{
+ erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_ROOT(tb), enable);
+}
+#endif /* ERTS_ENABLE_LOCK_COUNT */
+
+void db_catree_force_split(DbTableCATree* tb, int on)
+{
+ CATreeRootIterator iter;
+ TreeDbTerm** root;
+
+ init_root_iterator(tb, &iter, 1);
+ root = catree_find_first_root(&iter);
+ do {
+ iter.locked_bnode->u.base.lock_statistics = (on ? INT_MAX : 0);
+ root = catree_find_next_root(&iter, NULL);
+ } while (root);
+ destroy_root_iterator(&iter);
+
+ if (on)
+ tb->common.status |= DB_CATREE_FORCE_SPLIT;
+ else
+ tb->common.status &= ~DB_CATREE_FORCE_SPLIT;
+}
+
+void db_calc_stats_catree(DbTableCATree* tb, DbCATreeStats* stats)
+{
+ DbTableCATreeNode* stack[ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT];
+ DbTableCATreeNode* node;
+ Uint depth = 0;
+
+ stats->route_nodes = 0;
+ stats->base_nodes = 0;
+ stats->max_depth = 0;
+
+ node = GET_ROOT(tb);
+ do {
+ while (!node->is_base_node) {
+ stats->route_nodes++;
+ ASSERT(depth < sizeof(stack)/sizeof(*stack));
+ stack[depth++] = node; /* PUSH parent */
+ if (stats->max_depth < depth)
+ stats->max_depth = depth;
+ node = GET_LEFT(node);
+ }
+ stats->base_nodes++;
+
+ while (depth > 0) {
+ DbTableCATreeNode* parent = stack[depth-1];
+ if (node == GET_LEFT(parent)) {
+ node = GET_RIGHT(parent);
+ break;
+ }
+ else {
+ ASSERT(node == GET_RIGHT(parent));
+ node = parent;
+ depth--; /* POP parent */
+ }
+ }
+ } while (depth > 0);
+}
+
+#ifdef HARDDEBUG
+
+/*
+ * Not called, but kept as it might come to use
+ */
+static inline int my_check_table_tree(TreeDbTerm *t)
+{
+ int lh, rh;
+ if (t == NULL)
+ return 0;
+ lh = my_check_table_tree(t->left);
+ rh = my_check_table_tree(t->right);
+ if ((rh - lh) != t->balance) {
+ erts_fprintf(stderr, "Invalid tree balance for this node:\n");
+ erts_fprintf(stderr,"balance = %d, left = 0x%08X, right = 0x%08X\n",
+ t->balance, t->left, t->right);
+ erts_fprintf(stderr,"\nDump:\n---------------------------------\n");
+ erts_fprintf(stderr,"\n---------------------------------\n");
+ abort();
+ }
+ return ((rh > lh) ? rh : lh) + 1;
+}
+
+#endif