diff options
Diffstat (limited to 'erts/emulator/beam/erl_db_catree.c')
-rw-r--r-- | erts/emulator/beam/erl_db_catree.c | 2243 |
1 files changed, 2243 insertions, 0 deletions
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c new file mode 100644 index 0000000000..639c7e5e03 --- /dev/null +++ b/erts/emulator/beam/erl_db_catree.c @@ -0,0 +1,2243 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB and Kjell Winblad 1998-2018. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * %CopyrightEnd% + */ + +/* + * Description: Implementation of ETS ordered_set table type with + * fine-grained synchronization. + * + * Author: Kjell Winblad + * + * This implementation is based on the contention adapting search tree + * (CA tree). The CA tree is a concurrent data structure that + * dynamically adapts its synchronization granularity based on how + * much contention is detected in locks. The following publication + * contains a detailed description of CA trees: + * + * A Contention Adapting Approach to Concurrent Ordered Sets + * Journal of Parallel and Distributed Computing, 2018 + * Kjell Winblad and Konstantinos Sagonas + * https://doi.org/10.1016/j.jpdc.2017.11.007 + * + * The following publication may also be interesting as it discusses + * how the CA tree can be used as an ETS ordered_set table type + * backend: + * + * More Scalable Ordered Set for ETS Using Adaptation + * In Thirteenth ACM SIGPLAN workshop on Erlang (2014) + * Kjell Winblad and Konstantinos Sagonas + * https://doi.org/10.1145/2633448.2633455 + * + * This implementation of the ordered_set ETS table type is only + * activated when the options {write_concurrency, true}, public and + * ordered_set are passed to the ets:new/2 function. This + * implementation is expected to scale better than the default + * implementation (located in "erl_db_tree.c") when concurrent + * processes use the following ETS operations to operate on a table: + * + * delete/2, delete_object/2, first/1, insert/2 (single object), + * insert_new/2 (single object), lookup/2, lookup_element/2, member/2, + * next/2, take/2 and update_element/3 (single object). + * + * Currently, the implementation does not have scalable support for + * the other operations (e.g., select/2). These operations are handled + * by merging all locks so that all terms get protected by a single + * lock. This implementation may thus perform worse than the default + * implementation in some scenarios. For example, when concurrent + * processes access a table with the operations insert/2, delete/2 and + * select/2, the insert/2 and delete/2 operations will trigger splits + * of locks (to get more fine-grained synchronization) but this will + * quickly be undone by the select/2 operation if this operation is + * also called frequently. + * + * The default implementation has a static stack optimization (see + * get_static_stack in erl_db_tree.c). This implementation does not + * have such an optimization as it induces bad scalability when + * concurrent read operations are frequent (they all try to get hold + * of the same stack). The default implementation may thus perform + * better compared to this implementation in scenarios where the + * static stack optimization is useful. One such scenario is when only + * one process is accessing the table and this process is traversing + * the table with a sequence of next/2 calls. + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include "sys.h" +#include "erl_vm.h" +#include "global.h" +#include "erl_process.h" +#include "error.h" +#define ERTS_WANT_DB_INTERNAL__ +#include "erl_db.h" +#include "bif.h" +#include "big.h" +#include "erl_binary.h" + +#include "erl_db_catree.h" +#include "erl_db_tree.h" +#include "erl_db_tree_util.h" + +#ifdef DEBUG +# define IF_DEBUG(X) X +#else +# define IF_DEBUG(X) +#endif + +/* +** Forward declarations +*/ + +static SWord do_free_base_node_cont(DbTableCATree *tb, SWord num_left); +static SWord do_free_routing_nodes_catree_cont(DbTableCATree *tb, SWord num_left); +static DbTableCATreeNode *catree_first_base_node_from_free_list(DbTableCATree *tb); + +/* Method interface functions */ +static int db_first_catree(Process *p, DbTable *tbl, + Eterm *ret); +static int db_next_catree(Process *p, DbTable *tbl, + Eterm key, Eterm *ret); +static int db_last_catree(Process *p, DbTable *tbl, + Eterm *ret); +static int db_prev_catree(Process *p, DbTable *tbl, + Eterm key, + Eterm *ret); +static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail); +static int db_get_catree(Process *p, DbTable *tbl, + Eterm key, Eterm *ret); +static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret); +static int db_get_element_catree(Process *p, DbTable *tbl, + Eterm key,int ndex, + Eterm *ret); +static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret); +static int db_erase_object_catree(DbTable *tbl, Eterm object,Eterm *ret); +static int db_slot_catree(Process *p, DbTable *tbl, + Eterm slot_term, Eterm *ret); +static int db_select_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, int reversed, Eterm *ret); +static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret); +static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Sint chunk_size, + int reversed, Eterm *ret); +static int db_select_continue_catree(Process *p, DbTable *tbl, + Eterm continuation, Eterm *ret); +static int db_select_count_continue_catree(Process *p, DbTable *tbl, + Eterm continuation, Eterm *ret); +static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret); +static int db_select_delete_continue_catree(Process *p, DbTable *tbl, + Eterm continuation, Eterm *ret); +static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret); +static int db_select_replace_continue_catree(Process *p, DbTable *tbl, + Eterm continuation, Eterm *ret); +static int db_take_catree(Process *, DbTable *, Eterm, Eterm *); +static void db_print_catree(fmtfn_t to, void *to_arg, + int show, DbTable *tbl); +static int db_free_table_catree(DbTable *tbl); +static SWord db_free_table_continue_catree(DbTable *tbl, SWord); +static void db_foreach_offheap_catree(DbTable *, + void (*)(ErlOffHeap *, void *), + void *); +static SWord db_delete_all_objects_catree(Process* p, DbTable* tbl, SWord reds); +static int +db_lookup_dbterm_catree(Process *, DbTable *, Eterm key, Eterm obj, + DbUpdateHandle*); +static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *); + +static void split_catree(DbTableCATree *tb, + DbTableCATreeNode* ERTS_RESTRICT base, + DbTableCATreeNode* ERTS_RESTRICT parent); +static void join_catree(DbTableCATree *tb, + DbTableCATreeNode *thiz, + DbTableCATreeNode *parent); + + +/* +** External interface +*/ +DbTableMethod db_catree = +{ + db_create_catree, + db_first_catree, + db_next_catree, + db_last_catree, + db_prev_catree, + db_put_catree, + db_get_catree, + db_get_element_catree, + db_member_catree, + db_erase_catree, + db_erase_object_catree, + db_slot_catree, + db_select_chunk_catree, + db_select_catree, + db_select_delete_catree, + db_select_continue_catree, + db_select_delete_continue_catree, + db_select_count_catree, + db_select_count_continue_catree, + db_select_replace_catree, + db_select_replace_continue_catree, + db_take_catree, + db_delete_all_objects_catree, + db_free_table_catree, + db_free_table_continue_catree, + db_print_catree, + db_foreach_offheap_catree, + db_lookup_dbterm_catree, + db_finalize_dbterm_catree + +}; + +/* + * Constants + */ + +#define ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION 200 +#define ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION (-1) +#define ERL_DB_CATREE_LOCK_MORE_THAN_ONE_CONTRIBUTION (-10) +#define ERL_DB_CATREE_HIGH_CONTENTION_LIMIT 1000 +#define ERL_DB_CATREE_LOW_CONTENTION_LIMIT (-1000) +#define ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT 14 + +/* + * Internal CA tree related helper functions and macros + */ + +#define GET_ROUTE_NODE_KEY(node) (node->u.route.key.term) +#define GET_BASE_NODE_LOCK(node) (&(node->u.base.lock)) +#define GET_ROUTE_NODE_LOCK(node) (&(node->u.route.lock)) + + +/* Helpers for reading and writing shared atomic variables */ + +/* No memory barrier */ +#define GET_ROOT(tb) ((DbTableCATreeNode*)erts_atomic_read_nob(&(tb->root))) +#define GET_LEFT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->u.route.left))) +#define GET_RIGHT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->u.route.right))) +#define SET_ROOT(tb, v) erts_atomic_set_nob(&((tb)->root), (erts_aint_t)(v)) +#define SET_LEFT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->u.route.left), (erts_aint_t)(v)); +#define SET_RIGHT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v)); + + +/* Release or acquire barriers */ +#define GET_ROOT_ACQB(tb) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(tb->root))) +#define GET_LEFT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->u.route.left))) +#define GET_RIGHT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->u.route.right))) +#define SET_ROOT_RELB(tb, v) erts_atomic_set_relb(&((tb)->root), (erts_aint_t)(v)) +#define SET_LEFT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.left), (erts_aint_t)(v)); +#define SET_RIGHT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v)); + +/* Compares a key to the key in a route node */ +static ERTS_INLINE Sint cmp_key_route(Eterm key, + DbTableCATreeNode *obj) +{ + return CMP(key, GET_ROUTE_NODE_KEY(obj)); +} + +/* + * Used by the split_tree function + */ +static ERTS_INLINE +int less_than_two_elements(TreeDbTerm *root) +{ + return root == NULL || (root->left == NULL && root->right == NULL); +} + +/* + * Inserts a TreeDbTerm into a tree. Returns the new root. + */ +static ERTS_INLINE +TreeDbTerm* insert_TreeDbTerm(DbTableCATree *tb, + TreeDbTerm *insert_to_root, + TreeDbTerm *value_to_insert) { + /* Non recursive insertion in AVL tree, building our own stack */ + TreeDbTerm **tstack[STACK_NEED]; + int tpos = 0; + int dstack[STACK_NEED+1]; + int dpos = 0; + int state = 0; + TreeDbTerm * base = insert_to_root; + TreeDbTerm **this = &base; + Sint c; + Eterm key; + int dir; + TreeDbTerm *p1, *p2, *p; + + key = GETKEY(tb, value_to_insert->dbterm.tpl); + + dstack[dpos++] = DIR_END; + for (;;) + if (!*this) { /* Found our place */ + state = 1; + *this = value_to_insert; + (*this)->balance = 0; + (*this)->left = (*this)->right = NULL; + break; + } else if ((c = cmp_key(&tb->common, key, *this)) < 0) { + /* go lefts */ + dstack[dpos++] = DIR_LEFT; + tstack[tpos++] = this; + this = &((*this)->left); + } else { /* go right */ + dstack[dpos++] = DIR_RIGHT; + tstack[tpos++] = this; + this = &((*this)->right); + } + + while (state && ( dir = dstack[--dpos] ) != DIR_END) { + this = tstack[--tpos]; + p = *this; + if (dir == DIR_LEFT) { + switch (p->balance) { + case 1: + p->balance = 0; + state = 0; + break; + case 0: + p->balance = -1; + break; + case -1: /* The icky case */ + p1 = p->left; + if (p1->balance == -1) { /* Single LL rotation */ + p->left = p1->right; + p1->right = p; + p->balance = 0; + (*this) = p1; + } else { /* Double RR rotation */ + p2 = p1->right; + p1->right = p2->left; + p2->left = p1; + p->left = p2->right; + p2->right = p; + p->balance = (p2->balance == -1) ? +1 : 0; + p1->balance = (p2->balance == 1) ? -1 : 0; + (*this) = p2; + } + (*this)->balance = 0; + state = 0; + break; + } + } else { /* dir == DIR_RIGHT */ + switch (p->balance) { + case -1: + p->balance = 0; + state = 0; + break; + case 0: + p->balance = 1; + break; + case 1: + p1 = p->right; + if (p1->balance == 1) { /* Single RR rotation */ + p->right = p1->left; + p1->left = p; + p->balance = 0; + (*this) = p1; + } else { /* Double RL rotation */ + p2 = p1->left; + p1->left = p2->right; + p2->right = p1; + p->right = p2->left; + p2->left = p; + p->balance = (p2->balance == 1) ? -1 : 0; + p1->balance = (p2->balance == -1) ? 1 : 0; + (*this) = p2; + } + (*this)->balance = 0; + state = 0; + break; + } + } + } + return base; +} + +/* + * Split an AVL tree into two trees. The function stores the node + * containing the "split key" in the write back parameter + * split_key_wb. The function stores the left tree containing the keys + * that are smaller than the "split key" in the write back parameter + * left_wb and the tree containing the rest of the keys in the write + * back parameter right_wb. + */ +static void split_tree(DbTableCATree *tb, + TreeDbTerm *root, + TreeDbTerm **split_key_node_wb, + TreeDbTerm **left_wb, + TreeDbTerm **right_wb) { + TreeDbTerm * split_node = NULL; + TreeDbTerm * left_root; + TreeDbTerm * right_root; + if (root->left == NULL) { /* To get non empty split */ + *right_wb = root->right; + *split_key_node_wb = root->right; + root->right = NULL; + root->balance = 0; + *left_wb = root; + return; + } + split_node = root; + left_root = split_node->left; + split_node->left = NULL; + right_root = split_node->right; + split_node->right = NULL; + right_root = insert_TreeDbTerm(tb, right_root, split_node); + *split_key_node_wb = split_node; + *left_wb = left_root; + *right_wb = right_root; +} + +/* + * Used by the join_trees function + */ +static ERTS_INLINE int compute_tree_hight(TreeDbTerm * root) +{ + if(root == NULL) { + return 0; + } else { + TreeDbTerm * current_node = root; + int hight_so_far = 1; + while (current_node->left != NULL || current_node->right != NULL) { + if (current_node->balance == -1) { + current_node = current_node->left; + } else { + current_node = current_node->right; + } + hight_so_far = hight_so_far + 1; + } + return hight_so_far; + } +} + +/* + * Used by the join_trees function + */ +static ERTS_INLINE +TreeDbTerm* linkout_min_or_max_tree_node(TreeDbTerm **root, int is_min) +{ + TreeDbTerm **tstack[STACK_NEED]; + int tpos = 0; + int dstack[STACK_NEED+1]; + int dpos = 0; + int state = 0; + TreeDbTerm **this = root; + int dir; + TreeDbTerm *q = NULL; + + dstack[dpos++] = DIR_END; + for (;;) { + if (!*this) { /* Failure */ + return NULL; + } else if (is_min && (*this)->left != NULL) { + dstack[dpos++] = DIR_LEFT; + tstack[tpos++] = this; + this = &((*this)->left); + } else if (!is_min && (*this)->right != NULL) { + dstack[dpos++] = DIR_RIGHT; + tstack[tpos++] = this; + this = &((*this)->right); + } else { /* Min value, found the one to splice out */ + q = (*this); + if (q->right == NULL) { + (*this) = q->left; + state = 1; + } else if (q->left == NULL) { + (*this) = q->right; + state = 1; + } + break; + } + } + while (state && ( dir = dstack[--dpos] ) != DIR_END) { + this = tstack[--tpos]; + if (dir == DIR_LEFT) { + state = tree_balance_left(this); + } else { + state = tree_balance_right(this); + } + } + return q; +} + +#define LINKOUT_MIN_TREE_NODE(root) linkout_min_or_max_tree_node(root, 1) +#define LINKOUT_MAX_TREE_NODE(root) linkout_min_or_max_tree_node(root, 0) + +/* + * Joins two AVL trees where all the keys in the left one are smaller + * then the keys in the right one and returns the resulting tree. + * + * The algorithm is described on page 474 in D. E. Knuth. The Art of + * Computer Programming: Sorting and Searching, + * vol. 3. Addison-Wesley, 2nd edition, 1998. + */ +static TreeDbTerm* join_trees(TreeDbTerm *left_root_param, + TreeDbTerm *right_root_param) +{ + TreeDbTerm **tstack[STACK_NEED]; + int tpos = 0; + int dstack[STACK_NEED+1]; + int dpos = 0; + int state = 1; + TreeDbTerm **this; + int dir; + TreeDbTerm *p1, *p2, *p; + TreeDbTerm *left_root = left_root_param; + TreeDbTerm *right_root = right_root_param; + int left_height; + int right_height; + int current_height; + dstack[dpos++] = DIR_END; + if (left_root == NULL) { + return right_root; + } else if (right_root == NULL) { + return left_root; + } + + left_height = compute_tree_hight(left_root); + right_height = compute_tree_hight(right_root); + if (left_height >= right_height) { + TreeDbTerm * new_root = + LINKOUT_MIN_TREE_NODE(&right_root); + int new_right_height = compute_tree_hight(right_root); + TreeDbTerm * current_node = left_root; + this = &left_root; + current_height = left_height; + while(current_height > new_right_height + 1) { + if (current_node->balance == -1) { + current_height = current_height - 2; + } else { + current_height = current_height - 1; + } + dstack[dpos++] = DIR_RIGHT; + tstack[tpos++] = this; + this = &((*this)->right); + current_node = current_node->right; + } + new_root->left = current_node; + new_root->right = right_root; + new_root->balance = new_right_height - current_height; + *this = new_root; + } else { + /* This case is symmetric to the previous case */ + TreeDbTerm * new_root = + LINKOUT_MAX_TREE_NODE(&left_root); + int new_left_height = compute_tree_hight(left_root); + TreeDbTerm * current_node = right_root; + this = &right_root; + current_height = right_height; + while (current_height > new_left_height + 1) { + if (current_node->balance == 1) { + current_height = current_height - 2; + } else { + current_height = current_height - 1; + } + dstack[dpos++] = DIR_LEFT; + tstack[tpos++] = this; + this = &((*this)->left); + current_node = current_node->left; + } + new_root->right = current_node; + new_root->left = left_root; + new_root->balance = current_height - new_left_height; + *this = new_root; + } + /* Now we need to continue as if this was during the insert */ + while (state && ( dir = dstack[--dpos] ) != DIR_END) { + this = tstack[--tpos]; + p = *this; + if (dir == DIR_LEFT) { + switch (p->balance) { + case 1: + p->balance = 0; + state = 0; + break; + case 0: + p->balance = -1; + break; + case -1: /* The icky case */ + p1 = p->left; + if (p1->balance == -1) { /* Single LL rotation */ + p->left = p1->right; + p1->right = p; + p->balance = 0; + (*this) = p1; + } else { /* Double RR rotation */ + p2 = p1->right; + p1->right = p2->left; + p2->left = p1; + p->left = p2->right; + p2->right = p; + p->balance = (p2->balance == -1) ? +1 : 0; + p1->balance = (p2->balance == 1) ? -1 : 0; + (*this) = p2; + } + (*this)->balance = 0; + state = 0; + break; + } + } else { /* dir == DIR_RIGHT */ + switch (p->balance) { + case -1: + p->balance = 0; + state = 0; + break; + case 0: + p->balance = 1; + break; + case 1: + p1 = p->right; + if (p1->balance == 1) { /* Single RR rotation */ + p->right = p1->left; + p1->left = p; + p->balance = 0; + (*this) = p1; + } else { /* Double RL rotation */ + p2 = p1->left; + p1->left = p2->right; + p2->right = p1; + p->right = p2->left; + p2->left = p; + p->balance = (p2->balance == 1) ? -1 : 0; + p1->balance = (p2->balance == -1) ? 1 : 0; + (*this) = p2; + } + (*this)->balance = 0; + state = 0; + break; + } + } + } + /* Return the joined tree */ + if (left_height >= right_height) { + return left_root; + } else { + return right_root; + } +} + +#ifdef DEBUG +# define FORCE_RANDOM_SPLIT_JOIN +#endif +#ifdef FORCE_RANDOM_SPLIT_JOIN +static int dbg_fastrand(void) +{ + static int g_seed = 648835; + g_seed = (214013*g_seed+2531011); + return (g_seed>>16)&0x7FFF; +} + +static void dbg_maybe_force_splitjoin(DbTableCATreeNode* base_node) +{ + switch (dbg_fastrand() % 8) { + case 1: + base_node->u.base.lock_statistics = 1+ERL_DB_CATREE_HIGH_CONTENTION_LIMIT; + break; + case 2: + base_node->u.base.lock_statistics = -1+ERL_DB_CATREE_LOW_CONTENTION_LIMIT; + break; + } +} +#else +# define dbg_maybe_force_splitjoin(N) +#endif /* FORCE_RANDOM_SPLIT_JOIN */ + +static ERTS_INLINE +int try_wlock_base_node(DbTableCATreeBaseNode *base_node) +{ + return EBUSY == erts_rwmtx_tryrwlock(&base_node->lock); +} + +/* + * Locks a base node without adjusting the lock statistics + */ +static ERTS_INLINE +void wlock_base_node_no_stats(DbTableCATreeNode *base_node) +{ + ASSERT(base_node->is_base_node); + erts_rwmtx_rwlock(&base_node->u.base.lock); +} + +/* + * Locks a base node and adjusts the lock statistics according to if + * the lock was contended or not + */ +static ERTS_INLINE +void wlock_base_node(DbTableCATreeNode *base_node) +{ + ASSERT(base_node->is_base_node); + if (try_wlock_base_node(&base_node->u.base)) { + /* The lock is contended */ + wlock_base_node_no_stats(base_node); + base_node->u.base.lock_statistics += ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION; + } else { + base_node->u.base.lock_statistics += ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION; + } +} + +static ERTS_INLINE +void wunlock_base_node(DbTableCATreeNode *base_node) +{ + erts_rwmtx_rwunlock(&base_node->u.base.lock); +} + +static ERTS_INLINE +void wunlock_adapt_base_node(DbTableCATree* tb, + DbTableCATreeNode* node, + DbTableCATreeNode* parent, + int current_level) +{ + dbg_maybe_force_splitjoin(node); + if ((!node->u.base.root && parent && !(tb->common.status + & DB_CATREE_FORCE_SPLIT)) + || node->u.base.lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) { + join_catree(tb, node, parent); + } + else if (node->u.base.lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT + && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) { + split_catree(tb, node, parent); + } + else { + wunlock_base_node(node); + } +} + +static ERTS_INLINE +void rlock_base_node(DbTableCATreeNode *base_node) +{ + ASSERT(base_node->is_base_node); + erts_rwmtx_rlock(&base_node->u.base.lock); +} + +static ERTS_INLINE +void runlock_base_node(DbTableCATreeNode *base_node) +{ + ASSERT(base_node->is_base_node); + erts_rwmtx_runlock(&base_node->u.base.lock); +} + +static ERTS_INLINE +void lock_route_node(DbTableCATreeNode *route_node) +{ + ASSERT(!route_node->is_base_node); + erts_mtx_lock(&route_node->u.route.lock); +} + +static ERTS_INLINE +void unlock_route_node(DbTableCATreeNode *route_node) +{ + ASSERT(!route_node->is_base_node); + erts_mtx_unlock(&route_node->u.route.lock); +} + +static ERTS_INLINE +void init_root_iterator(DbTableCATree* tb, CATreeRootIterator* iter, + int read_only) +{ + iter->tb = tb; + iter->read_only = read_only; + iter->locked_bnode = NULL; + iter->next_route_key = THE_NON_VALUE; + iter->search_key = NULL; +} + +static ERTS_INLINE +void lock_iter_base_node(CATreeRootIterator* iter, + DbTableCATreeNode *base_node, + DbTableCATreeNode *parent, + int current_level) +{ + ASSERT(!iter->locked_bnode); + if (iter->read_only) + rlock_base_node(base_node); + else { + wlock_base_node(base_node); + iter->bnode_parent = parent; + iter->bnode_level = current_level; + } + iter->locked_bnode = base_node; +} + +static ERTS_INLINE +void unlock_iter_base_node(CATreeRootIterator* iter) +{ + ASSERT(iter->locked_bnode); + if (iter->read_only) + runlock_base_node(iter->locked_bnode); + else if (iter->locked_bnode->u.base.is_valid) { + wunlock_adapt_base_node(iter->tb, iter->locked_bnode, + iter->bnode_parent, iter->bnode_level); + } + else + wunlock_base_node(iter->locked_bnode); + iter->locked_bnode = NULL; +} + +static ERTS_INLINE +void destroy_root_iterator(CATreeRootIterator* iter) +{ + if (iter->locked_bnode) + unlock_iter_base_node(iter); + if (iter->search_key) + erts_free(ERTS_ALC_T_DB_TMP, iter->search_key); +} + + +typedef struct +{ + DbTableCATreeNode *parent; + int current_level; +} FindBaseNode; + +static ERTS_INLINE +DbTableCATreeNode* find_base_node(DbTableCATree* tb, Eterm key, + FindBaseNode* fbn) +{ + DbTableCATreeNode* ERTS_RESTRICT node = GET_ROOT_ACQB(tb); + if (fbn) { + fbn->parent = NULL; + fbn->current_level = 0; + } + while (!node->is_base_node) { + if (fbn) { + fbn->current_level++; + fbn->parent = node; + } + if (cmp_key_route(key, node) < 0) { + node = GET_LEFT_ACQB(node); + } else { + node = GET_RIGHT_ACQB(node); + } + } + return node; +} + +static ERTS_INLINE +DbTableCATreeNode* find_rlock_valid_base_node(DbTableCATree* tb, Eterm key) +{ + DbTableCATreeNode* base_node; + + while (1) { + base_node = find_base_node(tb, key, NULL); + rlock_base_node(base_node); + if (base_node->u.base.is_valid) + break; + runlock_base_node(base_node); + } + return base_node; +} + +static ERTS_INLINE +DbTableCATreeNode* find_wlock_valid_base_node(DbTableCATree* tb, Eterm key, + FindBaseNode* fbn) +{ + DbTableCATreeNode* base_node; + + while (1) { + base_node = find_base_node(tb, key, fbn); + wlock_base_node(base_node); + if (base_node->u.base.is_valid) + break; + wunlock_base_node(base_node); + } + return base_node; +} + +static ERTS_INLINE +Eterm copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size) +{ + dst->size = key_size; + if (key_size != 0) { + Eterm* hp = &dst->heap[0]; + ErlOffHeap tmp_offheap; + tmp_offheap.first = NULL; + dst->term = copy_struct(key, key_size, &hp, &tmp_offheap); + dst->oh = tmp_offheap.first; + } + else { + ASSERT(is_immed(key)); + dst->term = key; + dst->oh = NULL; + } + return dst->term; +} + +static ERTS_INLINE +void destroy_route_key(DbRouteKey* key) +{ + if (key->oh) { + ErlOffHeap oh; + oh.first = key->oh; + erts_cleanup_offheap(&oh); + } +} + + +#ifdef ERTS_ENABLE_LOCK_CHECK +# define LC_ORDER(ORDER) ORDER +#else +# define LC_ORDER(ORDER) NIL +#endif + +#define sizeof_base_node() \ + offsetof(DbTableCATreeNode, u.base.end_of_struct__) + +static DbTableCATreeNode *create_base_node(DbTableCATree *tb, + TreeDbTerm* root) +{ + DbTableCATreeNode *p; + erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER; + p = erts_db_alloc(ERTS_ALC_T_DB_TABLE, (DbTable *) tb, + sizeof_base_node()); + + p->is_base_node = 1; + p->u.base.root = root; + if (tb->common.type & DB_FREQ_READ) + rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ; + if (erts_ets_rwmtx_spin_count >= 0) + rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count; + + erts_rwmtx_init_opt(&p->u.base.lock, &rwmtx_opt, + "erl_db_catree_base_node", + NIL, + ERTS_LOCK_FLAGS_CATEGORY_DB); + p->u.base.lock_statistics = ((tb->common.status & DB_CATREE_FORCE_SPLIT) + ? INT_MAX : 0); + p->u.base.is_valid = 1; + return p; +} + +static ERTS_INLINE Uint sizeof_route_node(Uint key_size) +{ + return (offsetof(DbTableCATreeNode, u.route.key.heap) + + key_size*sizeof(Eterm)); +} + +static DbTableCATreeNode* +create_route_node(DbTableCATree *tb, + DbTableCATreeNode *left, + DbTableCATreeNode *right, + DbTerm * keyTerm, + DbTableCATreeNode* lc_parent) +{ + Eterm key = GETKEY(tb,keyTerm->tpl); + int key_size = size_object(key); + DbTableCATreeNode* p = erts_db_alloc(ERTS_ALC_T_DB_TABLE, + (DbTable *) tb, + sizeof_route_node(key_size)); + + copy_route_key(&p->u.route.key, key, key_size); + p->is_base_node = 0; + p->u.route.is_valid = 1; + erts_atomic_init_nob(&p->u.route.left, (erts_aint_t)left); + erts_atomic_init_nob(&p->u.route.right, (erts_aint_t)right); +#ifdef ERTS_ENABLE_LOCK_CHECK + /* Route node lock order is inverse tree depth (from leafs toward root) */ + p->u.route.lc_order = (lc_parent == NULL ? MAX_SMALL : + lc_parent->u.route.lc_order - 1); + /* + * This assert may eventually fail as we don't increase 'lc_order' in join + * operations when route nodes move up in the tree. + * Tough luck if you run a lock-checking VM for such a long time on 32-bit. + */ + ERTS_LC_ASSERT(p->u.route.lc_order >= 0); +#endif + erts_mtx_init(&p->u.route.lock, "erl_db_catree_route_node", + LC_ORDER(make_small(p->u.route.lc_order)), + ERTS_LOCK_FLAGS_CATEGORY_DB); + return p; +} + +static void do_free_base_node(void* vptr) +{ + DbTableCATreeNode *p = (DbTableCATreeNode *)vptr; + ASSERT(p->is_base_node); + erts_rwmtx_destroy(&p->u.base.lock); + erts_free(ERTS_ALC_T_DB_TABLE, p); +} + +static void free_catree_base_node(DbTableCATree* tb, DbTableCATreeNode* p) +{ + ASSERT(p->is_base_node); + ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_base_node(), 0); + do_free_base_node(p); +} + +static void do_free_route_node(void *vptr) +{ + DbTableCATreeNode *p = (DbTableCATreeNode *)vptr; + ASSERT(!p->is_base_node); + erts_mtx_destroy(&p->u.route.lock); + destroy_route_key(&p->u.route.key); + erts_free(ERTS_ALC_T_DB_TABLE, p); +} + +static void free_catree_route_node(DbTableCATree* tb, DbTableCATreeNode* p) +{ + ASSERT(!p->is_base_node); + ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_route_node(p->u.route.key.size), 0); + do_free_route_node(p); +} + + +/* + * Returns the parent routing node of the specified + * route node 'child' if such a parent exists + * or NULL if 'child' is attached to the root. + */ +static ERTS_INLINE DbTableCATreeNode * +parent_of(DbTableCATree *tb, + DbTableCATreeNode *child) +{ + Eterm key = GET_ROUTE_NODE_KEY(child); + DbTableCATreeNode *current = GET_ROOT_ACQB(tb); + DbTableCATreeNode *prev = NULL; + + while (current != child) { + prev = current; + if (cmp_key_route(key, current) < 0) { + current = GET_LEFT_ACQB(current); + } else { + current = GET_RIGHT_ACQB(current); + } + } + return prev; +} + + +static ERTS_INLINE DbTableCATreeNode * +leftmost_base_node(DbTableCATreeNode *root) +{ + DbTableCATreeNode *node = root; + while (!node->is_base_node) { + node = GET_LEFT_ACQB(node); + } + return node; +} + + +static ERTS_INLINE DbTableCATreeNode * +rightmost_base_node(DbTableCATreeNode *root) +{ + DbTableCATreeNode *node = root; + while (!node->is_base_node) { + node = GET_RIGHT_ACQB(node); + } + return node; +} + + +static ERTS_INLINE DbTableCATreeNode * +leftmost_route_node(DbTableCATreeNode *root) +{ + DbTableCATreeNode *node = root; + DbTableCATreeNode *prev_node = NULL; + while (!node->is_base_node) { + prev_node = node; + node = GET_LEFT_ACQB(node); + } + return prev_node; +} + +static ERTS_INLINE DbTableCATreeNode* +rightmost_route_node(DbTableCATreeNode *root) +{ + DbTableCATreeNode * node = root; + DbTableCATreeNode * prev_node = NULL; + while (!node->is_base_node) { + prev_node = node; + node = GET_RIGHT_ACQB(node); + } + return prev_node; +} + +static ERTS_INLINE +void init_tree_stack(DbTreeStack *stack, + TreeDbTerm **stack_array, + Uint init_slot) +{ + stack->array = stack_array; + stack->pos = 0; + stack->slot = init_slot; +} + +static void join_catree(DbTableCATree *tb, + DbTableCATreeNode *thiz, + DbTableCATreeNode *parent) +{ + DbTableCATreeNode *gparent; + DbTableCATreeNode *neighbor; + DbTableCATreeNode *new_neighbor; + DbTableCATreeNode *neighbor_parent; + + ASSERT(thiz->is_base_node); + if (parent == NULL) { + thiz->u.base.lock_statistics = 0; + wunlock_base_node(thiz); + return; + } + ASSERT(!parent->is_base_node); + if (GET_LEFT(parent) == thiz) { + neighbor = leftmost_base_node(GET_RIGHT_ACQB(parent)); + if (try_wlock_base_node(&neighbor->u.base)) { + /* Failed to acquire lock */ + thiz->u.base.lock_statistics = 0; + wunlock_base_node(thiz); + return; + } else if (!neighbor->u.base.is_valid) { + thiz->u.base.lock_statistics = 0; + wunlock_base_node(thiz); + wunlock_base_node(neighbor); + return; + } else { + lock_route_node(parent); + parent->u.route.is_valid = 0; + neighbor->u.base.is_valid = 0; + thiz->u.base.is_valid = 0; + gparent = NULL; + do { + if (gparent != NULL) { + unlock_route_node(gparent); + } + gparent = parent_of(tb, parent); + if (gparent != NULL) + lock_route_node(gparent); + } while (gparent != NULL && !gparent->u.route.is_valid); + + if (gparent == NULL) { + SET_ROOT_RELB(tb, GET_RIGHT(parent)); + } else if (GET_LEFT(gparent) == parent) { + SET_LEFT_RELB(gparent, GET_RIGHT(parent)); + } else { + SET_RIGHT_RELB(gparent, GET_RIGHT(parent)); + } + unlock_route_node(parent); + if (gparent != NULL) { + unlock_route_node(gparent); + } + { + TreeDbTerm* new_root = join_trees(thiz->u.base.root, + neighbor->u.base.root); + new_neighbor = create_base_node(tb, new_root); + } + if (GET_RIGHT(parent) == neighbor) { + neighbor_parent = gparent; + } else { + neighbor_parent = leftmost_route_node(GET_RIGHT(parent)); + } + } + } else { /* Symetric case */ + ASSERT(GET_RIGHT(parent) == thiz); + neighbor = rightmost_base_node(GET_LEFT_ACQB(parent)); + if (try_wlock_base_node(&neighbor->u.base)) { + /* Failed to acquire lock */ + thiz->u.base.lock_statistics = 0; + wunlock_base_node(thiz); + return; + } else if (!neighbor->u.base.is_valid) { + thiz->u.base.lock_statistics = 0; + wunlock_base_node(thiz); + wunlock_base_node(neighbor); + return; + } else { + lock_route_node(parent); + parent->u.route.is_valid = 0; + neighbor->u.base.is_valid = 0; + thiz->u.base.is_valid = 0; + gparent = NULL; + do { + if (gparent != NULL) { + unlock_route_node(gparent); + } + gparent = parent_of(tb, parent); + if (gparent != NULL) { + lock_route_node(gparent); + } else { + gparent = NULL; + } + } while (gparent != NULL && !gparent->u.route.is_valid); + if (gparent == NULL) { + SET_ROOT_RELB(tb, GET_LEFT(parent)); + } else if (GET_RIGHT(gparent) == parent) { + SET_RIGHT_RELB(gparent, GET_LEFT(parent)); + } else { + SET_LEFT_RELB(gparent, GET_LEFT(parent)); + } + unlock_route_node(parent); + if (gparent != NULL) { + unlock_route_node(gparent); + } + { + TreeDbTerm* new_root = join_trees(neighbor->u.base.root, + thiz->u.base.root); + new_neighbor = create_base_node(tb, new_root); + } + if (GET_LEFT(parent) == neighbor) { + neighbor_parent = gparent; + } else { + neighbor_parent = + rightmost_route_node(GET_LEFT(parent)); + } + } + } + /* Link in new neighbor and free nodes that are no longer in the tree */ + if (neighbor_parent == NULL) { + SET_ROOT_RELB(tb, new_neighbor); + } else if (GET_LEFT(neighbor_parent) == neighbor) { + SET_LEFT_RELB(neighbor_parent, new_neighbor); + } else { + SET_RIGHT_RELB(neighbor_parent, new_neighbor); + } + wunlock_base_node(thiz); + wunlock_base_node(neighbor); + /* Free the parent and base */ + erts_schedule_db_free(&tb->common, + do_free_route_node, + parent, + &parent->u.route.free_item, + sizeof_route_node(parent->u.route.key.size)); + erts_schedule_db_free(&tb->common, + do_free_base_node, + thiz, + &thiz->u.base.free_item, + sizeof_base_node()); + erts_schedule_db_free(&tb->common, + do_free_base_node, + neighbor, + &neighbor->u.base.free_item, + sizeof_base_node()); +} + +static void split_catree(DbTableCATree *tb, + DbTableCATreeNode* ERTS_RESTRICT base, + DbTableCATreeNode* ERTS_RESTRICT parent) +{ + TreeDbTerm *splitOutWriteBack; + DbTableCATreeNode* ERTS_RESTRICT new_left; + DbTableCATreeNode* ERTS_RESTRICT new_right; + DbTableCATreeNode* ERTS_RESTRICT new_route; + + if (less_than_two_elements(base->u.base.root)) { + if (!(tb->common.status & DB_CATREE_FORCE_SPLIT)) + base->u.base.lock_statistics = 0; + wunlock_base_node(base); + return; + } else { + TreeDbTerm *left_tree; + TreeDbTerm *right_tree; + + split_tree(tb, base->u.base.root, &splitOutWriteBack, + &left_tree, &right_tree); + + new_left = create_base_node(tb, left_tree); + new_right = create_base_node(tb, right_tree); + new_route = create_route_node(tb, + new_left, + new_right, + &splitOutWriteBack->dbterm, + parent); + if (parent == NULL) { + SET_ROOT_RELB(tb, new_route); + } else if(GET_LEFT(parent) == base) { + SET_LEFT_RELB(parent, new_route); + } else { + SET_RIGHT_RELB(parent, new_route); + } + base->u.base.is_valid = 0; + wunlock_base_node(base); + erts_schedule_db_free(&tb->common, + do_free_base_node, + base, + &base->u.base.free_item, + sizeof_base_node()); + } +} + +/* + * Helper functions for removing the table + */ + +static void catree_add_base_node_to_free_list( + DbTableCATree *tb, + DbTableCATreeNode *base_node_container) +{ + base_node_container->u.base.next = + tb->base_nodes_to_free_list; + tb->base_nodes_to_free_list = base_node_container; +} + +static void catree_deque_base_node_from_free_list(DbTableCATree *tb) +{ + if (tb->base_nodes_to_free_list == NULL) { + return; /* List empty */ + } else { + DbTableCATreeNode *first = tb->base_nodes_to_free_list; + tb->base_nodes_to_free_list = first->u.base.next; + } +} + +static DbTableCATreeNode *catree_first_base_node_from_free_list( + DbTableCATree *tb) +{ + return tb->base_nodes_to_free_list; +} + +static SWord do_free_routing_nodes_catree_cont(DbTableCATree *tb, SWord num_left) +{ + DbTableCATreeNode *root; + DbTableCATreeNode *p; + for (;;) { + root = POP_NODE(&tb->free_stack_rnodes); + if (root == NULL) break; + else if(root->is_base_node) { + catree_add_base_node_to_free_list(tb, root); + break; + } + for (;;) { + if ((GET_LEFT(root) != NULL) && + (p = GET_LEFT(root))->is_base_node) { + SET_LEFT(root, NULL); + catree_add_base_node_to_free_list(tb, p); + } else if ((GET_RIGHT(root) != NULL) && + (p = GET_RIGHT(root))->is_base_node) { + SET_RIGHT(root, NULL); + catree_add_base_node_to_free_list(tb, p); + } else if ((p = GET_LEFT(root)) != NULL) { + SET_LEFT(root, NULL); + PUSH_NODE(&tb->free_stack_rnodes, root); + root = p; + } else if ((p = GET_RIGHT(root)) != NULL) { + SET_RIGHT(root, NULL); + PUSH_NODE(&tb->free_stack_rnodes, root); + root = p; + } else { + free_catree_route_node(tb, root); + if (--num_left >= 0) { + break; + } else { + return num_left; /* Done enough for now */ + } + } + } + } + return num_left; +} + +static SWord do_free_base_node_cont(DbTableCATree *tb, SWord num_left) +{ + TreeDbTerm *root; + TreeDbTerm *p; + DbTableCATreeNode *base_node_container = + catree_first_base_node_from_free_list(tb); + for (;;) { + root = POP_NODE(&tb->free_stack_elems); + if (root == NULL) break; + for (;;) { + if ((p = root->left) != NULL) { + root->left = NULL; + PUSH_NODE(&tb->free_stack_elems, root); + root = p; + } else if ((p = root->right) != NULL) { + root->right = NULL; + PUSH_NODE(&tb->free_stack_elems, root); + root = p; + } else { + free_term((DbTable*)tb, root); + if (--num_left >= 0) { + break; + } else { + return num_left; /* Done enough for now */ + } + } + } + } + catree_deque_base_node_from_free_list(tb); + free_catree_base_node(tb, base_node_container); + base_node_container = catree_first_base_node_from_free_list(tb); + if (base_node_container != NULL) { + PUSH_NODE(&tb->free_stack_elems, base_node_container->u.base.root); + } + return num_left; +} + + +/* +** Initialization function +*/ + +void db_initialize_catree(void) +{ + return; +}; + +/* +** Table interface routines (i.e., what's called by the bif's) +*/ + +int db_create_catree(Process *p, DbTable *tbl) +{ + DbTableCATree *tb = &tbl->catree; + DbTableCATreeNode *root; + + root = create_base_node(tb, NULL); + tb->deletion = 0; + tb->base_nodes_to_free_list = NULL; + erts_atomic_init_relb(&(tb->root), (erts_aint_t)root); + return DB_ERROR_NONE; +} + +static int db_first_catree(Process *p, DbTable *tbl, Eterm *ret) +{ + TreeDbTerm *root; + CATreeRootIterator iter; + int result; + + init_root_iterator(&tbl->catree, &iter, 1); + root = *catree_find_first_root(&iter); + if (!root) { + TreeDbTerm **pp = catree_find_next_root(&iter, NULL); + root = pp ? *pp : NULL; + } + + result = db_first_tree_common(p, tbl, root, ret, NULL); + + destroy_root_iterator(&iter); + return result; +} + +static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) +{ + DbTreeStack stack; + TreeDbTerm * stack_array[STACK_NEED]; + TreeDbTerm **rootp; + CATreeRootIterator iter; + int result; + + init_root_iterator(&tbl->catree, &iter, 1); + iter.next_route_key = key; + rootp = catree_find_next_root(&iter, NULL); + + do { + init_tree_stack(&stack, stack_array, 0); + result = db_next_tree_common(p, tbl, (rootp ? *rootp : NULL), key, ret, &stack); + if (result != DB_ERROR_NONE || *ret != am_EOT) + break; + + rootp = catree_find_next_root(&iter, NULL); + } while (rootp); + + destroy_root_iterator(&iter); + return result; +} + +static int db_last_catree(Process *p, DbTable *tbl, Eterm *ret) +{ + TreeDbTerm *root; + CATreeRootIterator iter; + int result; + + init_root_iterator(&tbl->catree, &iter, 1); + root = *catree_find_last_root(&iter); + if (!root) { + TreeDbTerm **pp = catree_find_prev_root(&iter, NULL); + root = pp ? *pp : NULL; + } + + result = db_last_tree_common(p, tbl, root, ret, NULL); + + destroy_root_iterator(&iter); + return result; +} + +static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) +{ + DbTreeStack stack; + TreeDbTerm * stack_array[STACK_NEED]; + TreeDbTerm **rootp; + CATreeRootIterator iter; + int result; + + init_root_iterator(&tbl->catree, &iter, 1); + iter.next_route_key = key; + rootp = catree_find_prev_root(&iter, NULL); + + do { + init_tree_stack(&stack, stack_array, 0); + result = db_prev_tree_common(p, tbl, (rootp ? *rootp : NULL), key, ret, + &stack); + if (result != DB_ERROR_NONE || *ret != am_EOT) + break; + rootp = catree_find_prev_root(&iter, NULL); + } while (rootp); + + destroy_root_iterator(&iter); + return result; +} + +static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail) +{ + DbTableCATree *tb = &tbl->catree; + Eterm key = GETKEY(&tb->common, tuple_val(obj)); + FindBaseNode fbn; + DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn); + int result = db_put_tree_common(&tb->common, &node->u.base.root, obj, + key_clash_fail, NULL); + wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level); + return result; +} + +static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key); + int result = db_get_tree_common(p, &tb->common, + node->u.base.root, + key, ret, NULL); + runlock_base_node(node); + return result; +} + +TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator* iter) +{ + FindBaseNode fbn; + DbTableCATreeNode* base_node; + + while (1) { + base_node = find_base_node(iter->tb, key, &fbn); + lock_iter_base_node(iter, base_node, fbn.parent, fbn.current_level); + if (base_node->u.base.is_valid) + break; + unlock_iter_base_node(iter); + } + return &base_node->u.base.root; +} + +static Eterm save_iter_search_key(CATreeRootIterator* iter, Eterm key) +{ + Uint key_size; + + if (is_immed(key)) + return key; + + if (iter->search_key) { + if (key == iter->search_key->term) + return key; /* already saved */ + destroy_route_key(iter->search_key); + } + key_size = size_object(key); + if (!iter->search_key || key_size > iter->search_key->size) { + iter->search_key = erts_realloc(ERTS_ALC_T_DB_TMP, + iter->search_key, + (offsetof(DbRouteKey, heap) + + key_size*sizeof(Eterm))); + } + return copy_route_key(iter->search_key, key, key_size); +} + +TreeDbTerm** catree_find_nextprev_root(CATreeRootIterator *iter, + int forward, + Eterm *search_keyp) +{ +#ifdef DEBUG + DbTableCATreeNode *rejected_invalid = NULL; + DbTableCATreeNode *rejected_empty = NULL; +#endif + DbTableCATreeNode *node; + DbTableCATreeNode *parent; + DbTableCATreeNode* next_route_node; + Eterm route_key = iter->next_route_key; + int current_level; + + if (iter->locked_bnode) { + if (search_keyp) + *search_keyp = save_iter_search_key(iter, *search_keyp); + unlock_iter_base_node(iter); + } + + if (is_non_value(route_key)) + return NULL; + + while (1) { + node = GET_ROOT_ACQB(iter->tb); + current_level = 0; + parent = NULL; + next_route_node = NULL; + while (!node->is_base_node) { + current_level++; + parent = node; + if (forward) { + if (cmp_key_route(route_key,node) < 0) { + next_route_node = node; + node = GET_LEFT_ACQB(node); + } else { + node = GET_RIGHT_ACQB(node); + } + } + else { + if (cmp_key_route(route_key,node) > 0) { + next_route_node = node; + node = GET_RIGHT_ACQB(node); + } else { + node = GET_LEFT_ACQB(node); + } + } + } + ASSERT(node != rejected_invalid); + lock_iter_base_node(iter, node, parent, current_level); + if (node->u.base.is_valid) { + ASSERT(node != rejected_empty); + if (node->u.base.root) { + iter->next_route_key = (next_route_node ? + next_route_node->u.route.key.term : + THE_NON_VALUE); + iter->locked_bnode = node; + return &node->u.base.root; + } + if (!next_route_node) { + unlock_iter_base_node(iter); + return NULL; + } + route_key = next_route_node->u.route.key.term; + IF_DEBUG(rejected_empty = node); + } + else + IF_DEBUG(rejected_invalid = node); + + /* Retry */ + unlock_iter_base_node(iter); + } +} + +TreeDbTerm** catree_find_next_root(CATreeRootIterator *iter, Eterm* keyp) +{ + return catree_find_nextprev_root(iter, 1, keyp); +} + +TreeDbTerm** catree_find_prev_root(CATreeRootIterator *iter, Eterm* keyp) +{ + return catree_find_nextprev_root(iter, 0, keyp); +} + +/* @brief Find root of tree where object with smallest key of all larger than + * partially bound key may reside. Can be used as a starting point for + * a reverse iteration with pb_key. + * + * @param pb_key The partially bound key. Example {42, '$1'} + * @param iter An initialized root iterator. + * + * @return Pointer to found root pointer. May not be NULL. + */ +TreeDbTerm** catree_find_next_from_pb_key_root(Eterm pb_key, + CATreeRootIterator* iter) +{ +#ifdef DEBUG + DbTableCATreeNode *rejected_base = NULL; +#endif + DbTableCATreeNode *node; + DbTableCATreeNode *parent; + DbTableCATreeNode* next_route_node; + int current_level; + + ASSERT(!iter->locked_bnode); + + while (1) { + node = GET_ROOT_ACQB(iter->tb); + current_level = 0; + parent = NULL; + next_route_node = NULL; + while (!node->is_base_node) { + current_level++; + parent = node; + if (cmp_partly_bound(pb_key, GET_ROUTE_NODE_KEY(node)) >= 0) { + next_route_node = node; + node = GET_RIGHT_ACQB(node); + } else { + node = GET_LEFT_ACQB(node); + } + } + ASSERT(node != rejected_base); + lock_iter_base_node(iter, node, parent, current_level); + if (node->u.base.is_valid) { + iter->next_route_key = (next_route_node ? + next_route_node->u.route.key.term : + THE_NON_VALUE); + return &node->u.base.root; + } + /* Retry */ + unlock_iter_base_node(iter); +#ifdef DEBUG + rejected_base = node; +#endif + } +} + +/* @brief Find root of tree where object with largest key of all smaller than + * partially bound key may reside. Can be used as a starting point for + * a forward iteration with pb_key. + * + * @param pb_key The partially bound key. Example {42, '$1'} + * @param iter An initialized root iterator. + * + * @return Pointer to found root pointer. May not be NULL. + */ +TreeDbTerm** catree_find_prev_from_pb_key_root(Eterm key, + CATreeRootIterator* iter) +{ +#ifdef DEBUG + DbTableCATreeNode *rejected_base = NULL; +#endif + DbTableCATreeNode *node; + DbTableCATreeNode *parent; + DbTableCATreeNode* next_route_node; + int current_level; + + ASSERT(!iter->locked_bnode); + + while (1) { + node = GET_ROOT_ACQB(iter->tb); + current_level = 0; + parent = NULL; + next_route_node = NULL; + while (!node->is_base_node) { + current_level++; + parent = node; + if (cmp_partly_bound(key, GET_ROUTE_NODE_KEY(node)) <= 0) { + next_route_node = node; + node = GET_LEFT_ACQB(node); + } else { + node = GET_RIGHT_ACQB(node); + } + } + ASSERT(node != rejected_base); + lock_iter_base_node(iter, node, parent, current_level); + if (node->u.base.is_valid) { + iter->next_route_key = (next_route_node ? + next_route_node->u.route.key.term : + THE_NON_VALUE); + return &node->u.base.root; + } + /* Retry */ + unlock_iter_base_node(iter); +#ifdef DEBUG + rejected_base = node; +#endif + } +} + +static TreeDbTerm** catree_find_firstlast_root(CATreeRootIterator* iter, + int first) +{ +#ifdef DEBUG + DbTableCATreeNode *rejected_base = NULL; +#endif + DbTableCATreeNode *node; + DbTableCATreeNode* next_route_node; + int current_level; + + while (1) { + node = GET_ROOT_ACQB(iter->tb); + current_level = 0; + next_route_node = NULL; + while (!node->is_base_node) { + current_level++; + next_route_node = node; + node = first ? GET_LEFT_ACQB(node) : GET_RIGHT_ACQB(node); + } + ASSERT(node != rejected_base); + lock_iter_base_node(iter, node, next_route_node, current_level); + if (node->u.base.is_valid) { + iter->next_route_key = (next_route_node ? + next_route_node->u.route.key.term : + THE_NON_VALUE); + return &node->u.base.root; + } + /* Retry */ + unlock_iter_base_node(iter); +#ifdef DEBUG + rejected_base = node; +#endif + } +} + +TreeDbTerm** catree_find_first_root(CATreeRootIterator* iter) +{ + return catree_find_firstlast_root(iter, 1); +} + +TreeDbTerm** catree_find_last_root(CATreeRootIterator* iter) +{ + return catree_find_firstlast_root(iter, 0); +} + +static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key); + int result = db_member_tree_common(&tb->common, + node->u.base.root, + key, ret, NULL); + runlock_base_node(node); + return result; +} + +static int db_get_element_catree(Process *p, DbTable *tbl, + Eterm key, int ndex, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key); + int result = db_get_element_tree_common(p, &tb->common, + node->u.base.root, + key, ndex, ret, NULL); + runlock_base_node(node); + return result; +} + +static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + FindBaseNode fbn; + DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn); + int result = db_erase_tree_common(tbl, &node->u.base.root, key, + ret, NULL); + wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level); + return result; +} + +static int db_erase_object_catree(DbTable *tbl, Eterm object, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + Eterm key = GETKEY(&tb->common, tuple_val(object)); + FindBaseNode fbn; + DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn); + int result = db_erase_object_tree_common(tbl, + &node->u.base.root, + object, + ret, + NULL); + wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level); + return result; +} + + +static int db_slot_catree(Process *p, DbTable *tbl, + Eterm slot_term, Eterm *ret) +{ + int result; + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_slot_tree_common(p, tbl, *catree_find_first_root(&iter), + slot_term, ret, NULL, &iter); + destroy_root_iterator(&iter); + return result; +} + +static int db_select_continue_catree(Process *p, + DbTable *tbl, + Eterm continuation, + Eterm *ret) +{ + int result; + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_continue_tree_common(p, &tbl->common, + continuation, ret, NULL, &iter); + destroy_root_iterator(&iter); + return result; +} + +static int db_select_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, int reverse, Eterm *ret) +{ + int result; + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_tree_common(p, tbl, tid, pattern, reverse, ret, + NULL, &iter); + destroy_root_iterator(&iter); + return result; +} + +static int db_select_count_continue_catree(Process *p, + DbTable *tbl, + Eterm continuation, + Eterm *ret) +{ + int result; + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_count_continue_tree_common(p, tbl, + continuation, ret, NULL, + &iter); + destroy_root_iterator(&iter); + return result; +} + +static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret) +{ + int result; + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_count_tree_common(p, tbl, + tid, pattern, ret, NULL, &iter); + destroy_root_iterator(&iter); + return result; +} + +static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Sint chunk_size, + int reversed, Eterm *ret) +{ + int result; + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 1); + result = db_select_chunk_tree_common(p, tbl, + tid, pattern, chunk_size, reversed, ret, + NULL, &iter); + destroy_root_iterator(&iter); + return result; +} + +static int db_select_delete_continue_catree(Process *p, + DbTable *tbl, + Eterm continuation, + Eterm *ret) +{ + DbTreeStack stack; + TreeDbTerm * stack_array[STACK_NEED]; + int result; + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 0); + init_tree_stack(&stack, stack_array, 0); + result = db_select_delete_continue_tree_common(p, tbl, continuation, ret, + &stack, &iter); + destroy_root_iterator(&iter); + return result; +} + +static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret) +{ + DbTreeStack stack; + TreeDbTerm * stack_array[STACK_NEED]; + int result; + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 0); + init_tree_stack(&stack, stack_array, 0); + result = db_select_delete_tree_common(p, tbl, + tid, pattern, ret, &stack, + &iter); + destroy_root_iterator(&iter); + return result; +} + +static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret) +{ + int result; + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 0); + result = db_select_replace_tree_common(p, tbl, + tid, pattern, ret, NULL, &iter); + destroy_root_iterator(&iter); + return result; +} + +static int db_select_replace_continue_catree(Process *p, DbTable *tbl, + Eterm continuation, Eterm *ret) +{ + int result; + CATreeRootIterator iter; + + init_root_iterator(&tbl->catree, &iter, 0); + result = db_select_replace_continue_tree_common(p, tbl, continuation, ret, + NULL, &iter); + destroy_root_iterator(&iter); + return result; +} + +static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + FindBaseNode fbn; + DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn); + int result = db_take_tree_common(p, tbl, &node->u.base.root, key, + ret, NULL); + wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level); + return result; +} + +/* +** Other interface routines (not directly coupled to one bif) +*/ + + +/* Display tree contents (for dump) */ +static void db_print_catree(fmtfn_t to, void *to_arg, + int show, DbTable *tbl) +{ + CATreeRootIterator iter; + TreeDbTerm** root; + + init_root_iterator(&tbl->catree, &iter, 1); + root = catree_find_first_root(&iter); + do { + db_print_tree_common(to, to_arg, show, *root, tbl); + root = catree_find_next_root(&iter, NULL); + } while (root); + destroy_root_iterator(&iter); +} + +/* Release all memory occupied by a single table */ +static int db_free_table_catree(DbTable *tbl) +{ + while (db_free_table_continue_catree(tbl, ERTS_SWORD_MAX) < 0) + ; + return 1; +} + +static SWord db_free_table_continue_catree(DbTable *tbl, SWord reds) +{ + DbTableCATreeNode *first_base_node; + DbTableCATree *tb = &tbl->catree; + if (!tb->deletion) { + tb->deletion = 1; + tb->free_stack_elems.array = + erts_db_alloc(ERTS_ALC_T_DB_STK, + (DbTable *) tb, + sizeof(TreeDbTerm *) * STACK_NEED); + tb->free_stack_elems.pos = 0; + tb->free_stack_elems.slot = 0; + tb->free_stack_rnodes.array = + erts_db_alloc(ERTS_ALC_T_DB_STK, + (DbTable *) tb, + sizeof(DbTableCATreeNode *) * STACK_NEED); + tb->free_stack_rnodes.pos = 0; + tb->free_stack_rnodes.size = STACK_NEED; + PUSH_NODE(&tb->free_stack_rnodes, GET_ROOT(tb)); + tb->is_routing_nodes_freed = 0; + tb->base_nodes_to_free_list = NULL; + } + if ( ! tb->is_routing_nodes_freed ) { + reds = do_free_routing_nodes_catree_cont(tb, reds); + if (reds < 0) { + return reds; /* Not finished */ + } else { + tb->is_routing_nodes_freed = 1; /* Ready with the routing nodes */ + first_base_node = catree_first_base_node_from_free_list(tb); + PUSH_NODE(&tb->free_stack_elems, first_base_node->u.base.root); + } + } + while (catree_first_base_node_from_free_list(tb) != NULL) { + reds = do_free_base_node_cont(tb, reds); + if (reds < 0) { + return reds; /* Continue later */ + } + } + /* Time to free the main structure*/ + erts_db_free(ERTS_ALC_T_DB_STK, + (DbTable *) tb, + (void *) tb->free_stack_elems.array, + sizeof(TreeDbTerm *) * STACK_NEED); + erts_db_free(ERTS_ALC_T_DB_STK, + (DbTable *) tb, + (void *) tb->free_stack_rnodes.array, + sizeof(DbTableCATreeNode *) * STACK_NEED); + return 1; +} + +static SWord db_delete_all_objects_catree(Process* p, DbTable* tbl, SWord reds) +{ + reds = db_free_table_continue_catree(tbl, reds); + if (reds < 0) + return reds; + db_create_catree(p, tbl); + erts_atomic_set_nob(&tbl->catree.common.nitems, 0); + return reds; +} + + +static void db_foreach_offheap_catree(DbTable *tbl, + void (*func)(ErlOffHeap *, void *), + void *arg) +{ + CATreeRootIterator iter; + TreeDbTerm** root; + + init_root_iterator(&tbl->catree, &iter, 1); + root = catree_find_first_root(&iter); + do { + db_foreach_offheap_tree_common(*root, func, arg); + root = catree_find_next_root(&iter, NULL); + } while (root); + destroy_root_iterator(&iter); +} + +static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm obj, + DbUpdateHandle *handle) +{ + DbTableCATree *tb = &tbl->catree; + FindBaseNode fbn; + DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn); + int res = db_lookup_dbterm_tree_common(p, tbl, &node->u.base.root, key, + obj, handle, NULL); + if (res == 0) { + wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level); + } else { + /* db_finalize_dbterm_catree will unlock */ + handle->u.catree.base_node = node; + handle->u.catree.parent = fbn.parent; + handle->u.catree.current_level = fbn.current_level; + } + return res; +} + +static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *handle) +{ + DbTableCATree *tb = &(handle->tb->catree); + db_finalize_dbterm_tree_common(cret, handle, NULL); + wunlock_adapt_base_node(tb, handle->u.catree.base_node, + handle->u.catree.parent, + handle->u.catree.current_level); + return; +} + +#ifdef ERTS_ENABLE_LOCK_COUNT +static void erts_lcnt_enable_db_catree_lock_count_helper(DbTableCATree *tb, + DbTableCATreeNode *node, + int enable) +{ + erts_lcnt_ref_t *lcnt_ref; + erts_lock_flags_t lock_type; + if (node->is_base_node) { + lcnt_ref = &GET_BASE_NODE_LOCK(node)->lcnt; + lock_type = ERTS_LOCK_TYPE_RWMUTEX; + } else { + erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_LEFT(node), enable); + erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_RIGHT(node), enable); + lcnt_ref = &GET_ROUTE_NODE_LOCK(node)->lcnt; + lock_type = ERTS_LOCK_TYPE_MUTEX; + } + if (enable) { + erts_lcnt_install_new_lock_info(lcnt_ref, "db_hash_slot", tb->common.the_name, + lock_type | ERTS_LOCK_FLAGS_CATEGORY_DB); + } else { + erts_lcnt_uninstall(lcnt_ref); + } +} + +void erts_lcnt_enable_db_catree_lock_count(DbTableCATree *tb, int enable) +{ + erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_ROOT(tb), enable); +} +#endif /* ERTS_ENABLE_LOCK_COUNT */ + +void db_catree_force_split(DbTableCATree* tb, int on) +{ + CATreeRootIterator iter; + TreeDbTerm** root; + + init_root_iterator(tb, &iter, 1); + root = catree_find_first_root(&iter); + do { + iter.locked_bnode->u.base.lock_statistics = (on ? INT_MAX : 0); + root = catree_find_next_root(&iter, NULL); + } while (root); + destroy_root_iterator(&iter); + + if (on) + tb->common.status |= DB_CATREE_FORCE_SPLIT; + else + tb->common.status &= ~DB_CATREE_FORCE_SPLIT; +} + +void db_calc_stats_catree(DbTableCATree* tb, DbCATreeStats* stats) +{ + DbTableCATreeNode* stack[ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT]; + DbTableCATreeNode* node; + Uint depth = 0; + + stats->route_nodes = 0; + stats->base_nodes = 0; + stats->max_depth = 0; + + node = GET_ROOT(tb); + do { + while (!node->is_base_node) { + stats->route_nodes++; + ASSERT(depth < sizeof(stack)/sizeof(*stack)); + stack[depth++] = node; /* PUSH parent */ + if (stats->max_depth < depth) + stats->max_depth = depth; + node = GET_LEFT(node); + } + stats->base_nodes++; + + while (depth > 0) { + DbTableCATreeNode* parent = stack[depth-1]; + if (node == GET_LEFT(parent)) { + node = GET_RIGHT(parent); + break; + } + else { + ASSERT(node == GET_RIGHT(parent)); + node = parent; + depth--; /* POP parent */ + } + } + } while (depth > 0); +} + +#ifdef HARDDEBUG + +/* + * Not called, but kept as it might come to use + */ +static inline int my_check_table_tree(TreeDbTerm *t) +{ + int lh, rh; + if (t == NULL) + return 0; + lh = my_check_table_tree(t->left); + rh = my_check_table_tree(t->right); + if ((rh - lh) != t->balance) { + erts_fprintf(stderr, "Invalid tree balance for this node:\n"); + erts_fprintf(stderr,"balance = %d, left = 0x%08X, right = 0x%08X\n", + t->balance, t->left, t->right); + erts_fprintf(stderr,"\nDump:\n---------------------------------\n"); + erts_fprintf(stderr,"\n---------------------------------\n"); + abort(); + } + return ((rh > lh) ? rh : lh) + 1; +} + +#endif |