diff options
Diffstat (limited to 'erts/emulator')
-rw-r--r-- | erts/emulator/Makefile.in | 2 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db.c | 40 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db.h | 2 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_catree.c | 2174 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_catree.h | 91 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_tree.c | 914 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_tree_util.h | 151 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_util.h | 29 | ||||
-rw-r--r-- | erts/emulator/beam/erl_lock_check.c | 54 | ||||
-rw-r--r-- | erts/emulator/beam/erl_lock_check.h | 4 |
10 files changed, 3064 insertions, 397 deletions
diff --git a/erts/emulator/Makefile.in b/erts/emulator/Makefile.in index 054692819e..eeb9577a33 100644 --- a/erts/emulator/Makefile.in +++ b/erts/emulator/Makefile.in @@ -874,7 +874,7 @@ RUN_OBJS += \ $(OBJDIR)/erl_thr_queue.o $(OBJDIR)/erl_sched_spec_pre_alloc.o \ $(OBJDIR)/erl_ptab.o $(OBJDIR)/erl_map.o \ $(OBJDIR)/erl_msacc.o $(OBJDIR)/erl_lock_flags.o \ - $(OBJDIR)/erl_io_queue.o + $(OBJDIR)/erl_io_queue.o $(OBJDIR)/erl_db_catree.o LTTNG_OBJS = $(OBJDIR)/erlang_lttng.o NIF_OBJS = \ diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c index c009a3bde8..1df972f4b6 100644 --- a/erts/emulator/beam/erl_db.c +++ b/erts/emulator/beam/erl_db.c @@ -90,7 +90,8 @@ enum DbIterSafety { ITER_SAFE /* No need to fixate at all */ }; # define ITERATION_SAFETY(Proc,Tab) \ - ((IS_TREE_TABLE((Tab)->common.status) || ONLY_WRITER(Proc,Tab)) ? ITER_SAFE \ + ((IS_TREE_TABLE((Tab)->common.status) || IS_CATREE_TABLE((Tab)->common.status) \ + || ONLY_WRITER(Proc,Tab)) ? ITER_SAFE \ : (((Tab)->common.status & DB_FINE_LOCKED) ? ITER_UNSAFE : ITER_SAFE_LOCKED)) #define DID_TRAP(P,Ret) (!is_value(Ret) && ((P)->freason == TRAP)) @@ -359,6 +360,7 @@ typedef enum { extern DbTableMethod db_hash; extern DbTableMethod db_tree; +extern DbTableMethod db_catree; int user_requested_db_max_tabs; int erts_ets_realloc_always_moves; @@ -414,6 +416,15 @@ free_dbtable(void *vtb) tb->common.fixations); } #endif + if (erts_atomic_read_nob(&tb->common.memory_size) > sizeof(DbTable)) { + /* The CA tree implementation use delayed freeing and the DbTable needs to + be freed after all other memory blocks that are allocated by the table. */ + erts_schedule_thr_prgr_later_cleanup_op(free_dbtable, + (void *) tb, + &tb->release.data, + sizeof(DbTable)); + return; + } erts_rwmtx_destroy(&tb->common.rwlock); erts_mtx_destroy(&tb->common.fixlock); ASSERT(is_immed(tb->common.heir_data)); @@ -1076,7 +1087,7 @@ BIF_RETTYPE ets_update_element_3(BIF_ALIST_3) DB_BIF_GET_TABLE(tb, DB_WRITE, LCK_WRITE_REC, BIF_ets_update_element_3); UseTmpHeap(2,BIF_P); - if (!(tb->common.status & (DB_SET | DB_ORDERED_SET))) { + if (!(tb->common.status & (DB_SET | DB_ORDERED_SET | DB_CA_ORDERED_SET))) { goto bail_out; } if (is_tuple(BIF_ARG_3)) { @@ -1165,7 +1176,7 @@ do_update_counter(Process *p, DbTable* tb, UseTmpHeap(5, p); - if (!(tb->common.status & (DB_SET | DB_ORDERED_SET))) { + if (!(tb->common.status & (DB_SET | DB_ORDERED_SET | DB_CA_ORDERED_SET))) { goto bail_out; } if (is_integer(arg3)) { /* Incr */ @@ -1647,15 +1658,15 @@ BIF_RETTYPE ets_new_2(BIF_ALIST_2) val = CAR(list_val(list)); if (val == am_bag) { status |= DB_BAG; - status &= ~(DB_SET | DB_DUPLICATE_BAG | DB_ORDERED_SET); + status &= ~(DB_SET | DB_DUPLICATE_BAG | DB_ORDERED_SET | DB_CA_ORDERED_SET); } else if (val == am_duplicate_bag) { status |= DB_DUPLICATE_BAG; - status &= ~(DB_SET | DB_BAG | DB_ORDERED_SET); + status &= ~(DB_SET | DB_BAG | DB_ORDERED_SET | DB_CA_ORDERED_SET); } else if (val == am_ordered_set) { status |= DB_ORDERED_SET; - status &= ~(DB_SET | DB_BAG | DB_DUPLICATE_BAG); + status &= ~(DB_SET | DB_BAG | DB_DUPLICATE_BAG | DB_CA_ORDERED_SET); } else if (is_tuple(val)) { Eterm *tp = tuple_val(val); @@ -1716,7 +1727,13 @@ BIF_RETTYPE ets_new_2(BIF_ALIST_2) if (is_not_nil(list)) { /* bad opt or not a well formed list */ BIF_ERROR(BIF_P, BADARG); } - if (IS_HASH_TABLE(status)) { + if (IS_TREE_TABLE(status) && is_fine_locked && !(status & DB_PRIVATE)) { + meth = &db_catree; + status |= DB_CA_ORDERED_SET; + status &= ~(DB_SET | DB_BAG | DB_DUPLICATE_BAG | DB_ORDERED_SET); + status |= DB_FINE_LOCKED; + } + else if (IS_HASH_TABLE(status)) { meth = &db_hash; if (is_fine_locked && !(status & DB_PRIVATE)) { status |= DB_FINE_LOCKED; @@ -3506,6 +3523,7 @@ void init_db(ErtsDbSpinCount db_spin_count) db_initialize_hash(); db_initialize_tree(); + db_initialize_catree(); /* Non visual BIF to trap to. */ erts_init_trap_export(&ets_select_delete_continue_exp, @@ -4114,6 +4132,8 @@ static Eterm table_info(Process* p, DbTable* tb, Eterm What) ret = am_duplicate_bag; } else if (tb->common.status & DB_ORDERED_SET) { ret = am_ordered_set; + } else if (tb->common.status & DB_CA_ORDERED_SET) { + ret = am_ordered_set; } else { /*TT*/ ASSERT(tb->common.status & DB_BAG); ret = am_bag; @@ -4409,6 +4429,12 @@ void erts_lcnt_enable_db_lock_count(DbTable *tb, int enable) { if(IS_HASH_TABLE(tb->common.status)) { erts_lcnt_enable_db_hash_lock_count(&tb->hash, enable); + } else if(IS_CATREE_TABLE(tb->common.status)) { + /* erts_lcnt_enable_db_catree_lock_count is not thread safe so + the table needs to get locked */ + db_lock(tb, LCK_WRITE); + erts_lcnt_enable_db_catree_lock_count(&tb->catree, enable); + db_unlock(tb, LCK_WRITE); } } diff --git a/erts/emulator/beam/erl_db.h b/erts/emulator/beam/erl_db.h index 23975d208f..45d120ac0e 100644 --- a/erts/emulator/beam/erl_db.h +++ b/erts/emulator/beam/erl_db.h @@ -66,6 +66,7 @@ typedef struct { #include "erl_db_util.h" /* Flags */ #include "erl_db_hash.h" /* DbTableHash */ #include "erl_db_tree.h" /* DbTableTree */ +#include "erl_db_catree.h" /* DbTableCATree */ /*TT*/ Uint erts_get_ets_misc_mem_size(void); @@ -90,6 +91,7 @@ union db_table { DbTableCommon common; /* Any type of db table */ DbTableHash hash; /* Linear hash array specific data */ DbTableTree tree; /* AVL tree specific data */ + DbTableCATree catree; /* CA tree specific data */ DbTableRelease release; /*TT*/ }; diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c new file mode 100644 index 0000000000..37a299df35 --- /dev/null +++ b/erts/emulator/beam/erl_db_catree.c @@ -0,0 +1,2174 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB and Kjell Winblad 1998-2018. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * %CopyrightEnd% + */ + +/* + * Description: Implementation of ETS ordered_set table type with + * fine-grained synchronization. + * + * Author: Kjell Winblad + * + * This implementation is based on the contention adapting search tree + * (CA tree). The CA tree is a concurrent data structure that + * dynamically adapts its synchronization granularity based on how + * much contention is detected in locks. The following publication + * contains a detailed description of CA trees: + * + * A Contention Adapting Approach to Concurrent Ordered Sets + * Journal of Parallel and Distributed Computing, 2018 + * Kjell Winblad and Konstantinos Sagonas + * https://doi.org/10.1016/j.jpdc.2017.11.007 + * + * The following publication may also be interesting as it discusses + * how the CA tree can be used as an ETS ordered_set table type + * backend: + * + * More Scalable Ordered Set for ETS Using Adaptation + * In Thirteenth ACM SIGPLAN workshop on Erlang (2014) + * Kjell Winblad and Konstantinos Sagonas + * https://doi.org/10.1145/2633448.2633455 + * + * This implementation of the ordered_set ETS table type is only + * activated when the options {write_concurrency, true}, public and + * ordered_set are passed to the ets:new/2 function. This + * implementation is expected to scale better than the default + * implementation (located in "erl_db_tree.c") when concurrent + * processes use the following ETS operations to operate on a table: + * + * delete/2, delete_object/2, first/1, insert/2 (single object), + * insert_new/2 (single object), lookup/2, lookup_element/2, member/2, + * next/2, take/2 and update_element/3 (single object). + * + * Currently, the implementation does not have scalable support for + * the other operations (e.g., select/2). These operations are handled + * by merging all locks so that all terms get protected by a single + * lock. This implementation may thus perform worse than the default + * implementation in some scenarios. For example, when concurrent + * processes access a table with the operations insert/2, delete/2 and + * select/2, the insert/2 and delete/2 operations will trigger splits + * of locks (to get more fine-grained synchronization) but this will + * quickly be undone by the select/2 operation if this operation is + * also called frequently. + * + * The default implementation has a static stack optimization (see + * get_static_stack in erl_db_tree.c). This implementation does not + * have such an optimization as it induces bad scalability when + * concurrent read operations are frequent (they all try to get hold + * of the same stack). The default implementation may thus perform + * better compared to this implementation in scenarios where the + * static stack optimization is useful. One such scenario is when only + * one process is accessing the table and this process is traversing + * the table with a sequence of next/2 calls. + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include "sys.h" +#include "erl_vm.h" +#include "global.h" +#include "erl_process.h" +#include "error.h" +#define ERTS_WANT_DB_INTERNAL__ +#include "erl_db.h" +#include "bif.h" +#include "big.h" +#include "erl_binary.h" + +#include "erl_db_catree.h" +#include "erl_db_tree.h" +#include "erl_db_tree_util.h" + +/* +** Forward declarations +*/ + +static SWord do_free_base_node_cont(DbTableCATree *tb, SWord num_left); +static SWord do_free_routing_nodes_catree_cont(DbTableCATree *tb, SWord num_left); +static DbTableCATreeNode *catree_first_base_node_from_free_list(DbTableCATree *tb); + +/* Method interface functions */ +static int db_first_catree(Process *p, DbTable *tbl, + Eterm *ret); +static int db_next_catree(Process *p, DbTable *tbl, + Eterm key, Eterm *ret); +static int db_last_catree(Process *p, DbTable *tbl, + Eterm *ret); +static int db_prev_catree(Process *p, DbTable *tbl, + Eterm key, + Eterm *ret); +static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail); +static int db_get_catree(Process *p, DbTable *tbl, + Eterm key, Eterm *ret); +static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret); +static int db_get_element_catree(Process *p, DbTable *tbl, + Eterm key,int ndex, + Eterm *ret); +static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret); +static int db_erase_object_catree(DbTable *tbl, Eterm object,Eterm *ret); +static int db_slot_catree(Process *p, DbTable *tbl, + Eterm slot_term, Eterm *ret); +static int db_select_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, int reversed, Eterm *ret); +static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret); +static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Sint chunk_size, + int reversed, Eterm *ret); +static int db_select_continue_catree(Process *p, DbTable *tbl, + Eterm continuation, Eterm *ret); +static int db_select_count_continue_catree(Process *p, DbTable *tbl, + Eterm continuation, Eterm *ret); +static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret); +static int db_select_delete_continue_catree(Process *p, DbTable *tbl, + Eterm continuation, Eterm *ret); +static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret); +static int db_select_replace_continue_catree(Process *p, DbTable *tbl, + Eterm continuation, Eterm *ret); +static int db_take_catree(Process *, DbTable *, Eterm, Eterm *); +static void db_print_catree(fmtfn_t to, void *to_arg, + int show, DbTable *tbl); +static int db_free_table_catree(DbTable *tbl); +static SWord db_free_table_continue_catree(DbTable *tbl, SWord); +static void db_foreach_offheap_catree(DbTable *, + void (*)(ErlOffHeap *, void *), + void *); +static SWord db_delete_all_objects_catree(Process* p, DbTable* tbl, SWord reds); +static int +db_lookup_dbterm_catree(Process *, DbTable *, Eterm key, Eterm obj, + DbUpdateHandle*); +static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *); + +/* +** External interface +*/ +DbTableMethod db_catree = +{ + db_create_catree, + db_first_catree, + db_next_catree, + db_last_catree, + db_prev_catree, + db_put_catree, + db_get_catree, + db_get_element_catree, + db_member_catree, + db_erase_catree, + db_erase_object_catree, + db_slot_catree, + db_select_chunk_catree, + db_select_catree, + db_select_delete_catree, + db_select_continue_catree, + db_select_delete_continue_catree, + db_select_count_catree, + db_select_count_continue_catree, + db_select_replace_catree, + db_select_replace_continue_catree, + db_take_catree, + db_delete_all_objects_catree, + db_free_table_catree, + db_free_table_continue_catree, + db_print_catree, + db_foreach_offheap_catree, + db_lookup_dbterm_catree, + db_finalize_dbterm_catree + +}; + +/* + * Constants + */ + +#define ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION 200 +#define ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION (-1) +#define ERL_DB_CATREE_LOCK_MORE_THAN_ONE_CONTRIBUTION (-10) +#define ERL_DB_CATREE_HIGH_CONTENTION_LIMIT 1000 +#define ERL_DB_CATREE_LOW_CONTENTION_LIMIT (-1000) +#define ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT 14 + +/* + * Internal CA tree related helper functions and macros + */ + +#define GET_ROUTE_NODE_KEY(node) (node->baseOrRoute.route.key.tpl[0]) +#define GET_BASE_NODE_LOCK(node) (&(node->baseOrRoute.base.lock)) +#define GET_ROUTE_NODE_LOCK(node) (&(node->baseOrRoute.route.lock)) + + +/* Helpers for reading and writing shared atomic variables */ + +/* No memory barrier */ +#define GET_ROOT(tb) ((DbTableCATreeNode*)erts_atomic_read_nob(&(tb->root))) +#define GET_LEFT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->baseOrRoute.route.left))) +#define GET_RIGHT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->baseOrRoute.route.right))) +#define SET_ROOT(tb, v) erts_atomic_set_nob(&((tb)->root), (erts_aint_t)(v)) +#define SET_LEFT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->baseOrRoute.route.left), (erts_aint_t)(v)); +#define SET_RIGHT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->baseOrRoute.route.right), (erts_aint_t)(v)); + + +/* Release or acquire barriers */ +#define GET_ROOT_ACQB(tb) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(tb->root))) +#define GET_LEFT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->baseOrRoute.route.left))) +#define GET_RIGHT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->baseOrRoute.route.right))) +#define SET_ROOT_RELB(tb, v) erts_atomic_set_relb(&((tb)->root), (erts_aint_t)(v)) +#define SET_LEFT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->baseOrRoute.route.left), (erts_aint_t)(v)); +#define SET_RIGHT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->baseOrRoute.route.right), (erts_aint_t)(v)); + +/* Compares a key to the key in a route node */ +static ERTS_INLINE Sint cmp_key_route(DbTableCommon * tb, + Eterm key, + DbTableCATreeNode *obj) +{ + return CMP(key, GET_ROUTE_NODE_KEY(obj)); +} + +static ERTS_INLINE void push_node_dyn_array(DbTable *tb, + CATreeNodeStack *stack, + DbTableCATreeNode *node) +{ + int i; + if (stack->pos == stack->size) { + DbTableCATreeNode **newArray = + erts_db_alloc(ERTS_ALC_T_DB_STK, tb, + sizeof(DbTableCATreeNode*) * (stack->size*2)); + for (i = 0; i < stack->pos; i++) { + newArray[i] = stack->array[i]; + } + if (stack->size > STACK_NEED) { + /* Dynamically allocated array that needs to be deallocated */ + erts_db_free(ERTS_ALC_T_DB_STK, tb, + stack->array, + sizeof(DbTableCATreeNode *) * stack->size); + } + stack->array = newArray; + stack->size = stack->size*2; + } + PUSH_NODE(stack, node); +} + +/* + * Used by the split_tree function + */ +static ERTS_INLINE +int less_than_two_elements(TreeDbTerm *root) +{ + return root == NULL || (root->left == NULL && root->right == NULL); +} + +/* + * Inserts a TreeDbTerm into a tree. Returns the new root. + */ +static ERTS_INLINE +TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data, + TreeDbTerm *insert_to_root, + TreeDbTerm *value_to_insert) { + /* Non recursive insertion in AVL tree, building our own stack */ + TreeDbTerm **tstack[STACK_NEED]; + int tpos = 0; + int dstack[STACK_NEED+1]; + int dpos = 0; + int state = 0; + TreeDbTerm * base = insert_to_root; + TreeDbTerm **this = &base; + Sint c; + Eterm key; + int dir; + TreeDbTerm *p1, *p2, *p; + + key = GETKEY(common_table_data, value_to_insert->dbterm.tpl); + + dstack[dpos++] = DIR_END; + for (;;) + if (!*this) { /* Found our place */ + state = 1; + *this = value_to_insert; + (*this)->balance = 0; + (*this)->left = (*this)->right = NULL; + break; + } else if ((c = cmp_key(common_table_data, key, *this)) < 0) { + /* go lefts */ + dstack[dpos++] = DIR_LEFT; + tstack[tpos++] = this; + this = &((*this)->left); + } else { /* go right */ + dstack[dpos++] = DIR_RIGHT; + tstack[tpos++] = this; + this = &((*this)->right); + } + + while (state && ( dir = dstack[--dpos] ) != DIR_END) { + this = tstack[--tpos]; + p = *this; + if (dir == DIR_LEFT) { + switch (p->balance) { + case 1: + p->balance = 0; + state = 0; + break; + case 0: + p->balance = -1; + break; + case -1: /* The icky case */ + p1 = p->left; + if (p1->balance == -1) { /* Single LL rotation */ + p->left = p1->right; + p1->right = p; + p->balance = 0; + (*this) = p1; + } else { /* Double RR rotation */ + p2 = p1->right; + p1->right = p2->left; + p2->left = p1; + p->left = p2->right; + p2->right = p; + p->balance = (p2->balance == -1) ? +1 : 0; + p1->balance = (p2->balance == 1) ? -1 : 0; + (*this) = p2; + } + (*this)->balance = 0; + state = 0; + break; + } + } else { /* dir == DIR_RIGHT */ + switch (p->balance) { + case -1: + p->balance = 0; + state = 0; + break; + case 0: + p->balance = 1; + break; + case 1: + p1 = p->right; + if (p1->balance == 1) { /* Single RR rotation */ + p->right = p1->left; + p1->left = p; + p->balance = 0; + (*this) = p1; + } else { /* Double RL rotation */ + p2 = p1->left; + p1->left = p2->right; + p2->right = p1; + p->right = p2->left; + p2->left = p; + p->balance = (p2->balance == 1) ? -1 : 0; + p1->balance = (p2->balance == -1) ? 1 : 0; + (*this) = p2; + } + (*this)->balance = 0; + state = 0; + break; + } + } + } + return base; +} + +/* + * Split an AVL tree into two trees. The function stores the node + * containing the "split key" in the write back parameter + * split_key_wb. The function stores the left tree containing the keys + * that are smaller than the "split key" in the write back parameter + * left_wb and the tree containing the rest of the keys in the write + * back parameter right_wb. + */ +static void split_tree(DbTableCommon *tb, + TreeDbTerm *root, + TreeDbTerm **split_key_node_wb, + TreeDbTerm **left_wb, + TreeDbTerm **right_wb) { + TreeDbTerm * split_node = NULL; + TreeDbTerm * left_root; + TreeDbTerm * right_root; + if (root->left == NULL) { /* To get non empty split */ + *right_wb = root->right; + *split_key_node_wb = root->right; + root->right = NULL; + root->balance = 0; + *left_wb = root; + return; + } + split_node = root; + left_root = split_node->left; + split_node->left = NULL; + right_root = split_node->right; + split_node->right = NULL; + right_root = insert_TreeDbTerm(tb, + right_root, + split_node); + *split_key_node_wb = split_node; + *left_wb = left_root; + *right_wb = right_root; +} + +/* + * Used by the join_trees function + */ +static ERTS_INLINE int compute_tree_hight(TreeDbTerm * root) +{ + if(root == NULL) { + return 0; + } else { + TreeDbTerm * current_node = root; + int hight_so_far = 1; + while (current_node->left != NULL || current_node->right != NULL) { + if (current_node->balance == -1) { + current_node = current_node->left; + } else { + current_node = current_node->right; + } + hight_so_far = hight_so_far + 1; + } + return hight_so_far; + } +} + +/* + * Used by the join_trees function + */ +static ERTS_INLINE +TreeDbTerm* linkout_min_or_max_tree_node(TreeDbTerm **root, int is_min) +{ + TreeDbTerm **tstack[STACK_NEED]; + int tpos = 0; + int dstack[STACK_NEED+1]; + int dpos = 0; + int state = 0; + TreeDbTerm **this = root; + int dir; + TreeDbTerm *q = NULL; + + dstack[dpos++] = DIR_END; + for (;;) { + if (!*this) { /* Failure */ + return NULL; + } else if (is_min && (*this)->left != NULL) { + dstack[dpos++] = DIR_LEFT; + tstack[tpos++] = this; + this = &((*this)->left); + } else if (!is_min && (*this)->right != NULL) { + dstack[dpos++] = DIR_RIGHT; + tstack[tpos++] = this; + this = &((*this)->right); + } else { /* Min value, found the one to splice out */ + q = (*this); + if (q->right == NULL) { + (*this) = q->left; + state = 1; + } else if (q->left == NULL) { + (*this) = q->right; + state = 1; + } + break; + } + } + while (state && ( dir = dstack[--dpos] ) != DIR_END) { + this = tstack[--tpos]; + if (dir == DIR_LEFT) { + state = tree_balance_left(this); + } else { + state = tree_balance_right(this); + } + } + return q; +} + +#define LINKOUT_MIN_TREE_NODE(root) linkout_min_or_max_tree_node(root, 1) +#define LINKOUT_MAX_TREE_NODE(root) linkout_min_or_max_tree_node(root, 0) + +/* + * Joins two AVL trees where all the keys in the left one are smaller + * then the keys in the right one and returns the resulting tree. + * + * The algorithm is described on page 474 in D. E. Knuth. The Art of + * Computer Programming: Sorting and Searching, + * vol. 3. Addison-Wesley, 2nd edition, 1998. + */ +static TreeDbTerm* join_trees(TreeDbTerm *left_root_param, + TreeDbTerm *right_root_param) +{ + TreeDbTerm **tstack[STACK_NEED]; + int tpos = 0; + int dstack[STACK_NEED+1]; + int dpos = 0; + int state = 1; + TreeDbTerm **this; + int dir; + TreeDbTerm *p1, *p2, *p; + TreeDbTerm *left_root = left_root_param; + TreeDbTerm *right_root = right_root_param; + int left_height; + int right_height; + int current_height; + dstack[dpos++] = DIR_END; + if (left_root == NULL) { + return right_root; + } else if (right_root == NULL) { + return left_root; + } + + left_height = compute_tree_hight(left_root); + right_height = compute_tree_hight(right_root); + if (left_height >= right_height) { + TreeDbTerm * new_root = + LINKOUT_MIN_TREE_NODE(&right_root); + int new_right_height = compute_tree_hight(right_root); + TreeDbTerm * current_node = left_root; + this = &left_root; + current_height = left_height; + while(current_height > new_right_height + 1) { + if (current_node->balance == -1) { + current_height = current_height - 2; + } else { + current_height = current_height - 1; + } + dstack[dpos++] = DIR_RIGHT; + tstack[tpos++] = this; + this = &((*this)->right); + current_node = current_node->right; + } + new_root->left = current_node; + new_root->right = right_root; + new_root->balance = new_right_height - current_height; + *this = new_root; + } else { + /* This case is symmetric to the previous case */ + TreeDbTerm * new_root = + LINKOUT_MAX_TREE_NODE(&left_root); + int new_left_height = compute_tree_hight(left_root); + TreeDbTerm * current_node = right_root; + this = &right_root; + current_height = right_height; + while (current_height > new_left_height + 1) { + if (current_node->balance == 1) { + current_height = current_height - 2; + } else { + current_height = current_height - 1; + } + dstack[dpos++] = DIR_LEFT; + tstack[tpos++] = this; + this = &((*this)->left); + current_node = current_node->left; + } + new_root->right = current_node; + new_root->left = left_root; + new_root->balance = current_height - new_left_height; + *this = new_root; + } + /* Now we need to continue as if this was during the insert */ + while (state && ( dir = dstack[--dpos] ) != DIR_END) { + this = tstack[--tpos]; + p = *this; + if (dir == DIR_LEFT) { + switch (p->balance) { + case 1: + p->balance = 0; + state = 0; + break; + case 0: + p->balance = -1; + break; + case -1: /* The icky case */ + p1 = p->left; + if (p1->balance == -1) { /* Single LL rotation */ + p->left = p1->right; + p1->right = p; + p->balance = 0; + (*this) = p1; + } else { /* Double RR rotation */ + p2 = p1->right; + p1->right = p2->left; + p2->left = p1; + p->left = p2->right; + p2->right = p; + p->balance = (p2->balance == -1) ? +1 : 0; + p1->balance = (p2->balance == 1) ? -1 : 0; + (*this) = p2; + } + (*this)->balance = 0; + state = 0; + break; + } + } else { /* dir == DIR_RIGHT */ + switch (p->balance) { + case -1: + p->balance = 0; + state = 0; + break; + case 0: + p->balance = 1; + break; + case 1: + p1 = p->right; + if (p1->balance == 1) { /* Single RR rotation */ + p->right = p1->left; + p1->left = p; + p->balance = 0; + (*this) = p1; + } else { /* Double RL rotation */ + p2 = p1->left; + p1->left = p2->right; + p2->right = p1; + p->right = p2->left; + p2->left = p; + p->balance = (p2->balance == 1) ? -1 : 0; + p1->balance = (p2->balance == -1) ? 1 : 0; + (*this) = p2; + } + (*this)->balance = 0; + state = 0; + break; + } + } + } + /* Return the joined tree */ + if (left_height >= right_height) { + return left_root; + } else { + return right_root; + } +} + + +static ERTS_INLINE +int try_wlock_base_node(DbTableCATreeBaseNode *base_node) +{ + return EBUSY == erts_rwmtx_tryrwlock(&base_node->lock); +} + +/* + * Locks a base node without adjusting the lock statistics + */ +static ERTS_INLINE +void wlock_base_node_no_stats(DbTableCATreeBaseNode *base_node) +{ + erts_rwmtx_rwlock(&base_node->lock); +} + +/* + * Locks a base node and adjusts the lock statistics according to if + * the lock was contended or not + */ +static ERTS_INLINE +void wlock_base_node(DbTableCATreeBaseNode *base_node) +{ + if (try_wlock_base_node(base_node)) { + /* The lock is contended */ + wlock_base_node_no_stats(base_node); + base_node->lock_statistics += ERL_DB_CATREE_LOCK_FAILURE_CONTRIBUTION; + } else { + base_node->lock_statistics += ERL_DB_CATREE_LOCK_SUCCESS_CONTRIBUTION; + } +} + +static ERTS_INLINE +void wunlock_base_node(DbTableCATreeBaseNode *base_node) +{ + erts_rwmtx_rwunlock(&base_node->lock); +} + +static ERTS_INLINE +void rlock_base_node(DbTableCATreeBaseNode *base_node) +{ + erts_rwmtx_rlock(&base_node->lock); +} + +static ERTS_INLINE +void runlock_base_node(DbTableCATreeBaseNode *base_node) +{ + erts_rwmtx_runlock(&base_node->lock); +} + +static ERTS_INLINE +void lock_route_node(DbTableCATreeRouteNode *route_node) +{ + erts_mtx_lock(&route_node->lock); +} + +static ERTS_INLINE +void unlock_route_node(DbTableCATreeRouteNode *route_node) +{ + erts_mtx_unlock(&route_node->lock); +} + + +/* + * The following macros are used to create the ETS functions that only + * need to access one element (e.g. db_put_catree, db_get_catree and + * db_erase_catree). + */ + +#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(LOCK,UNLOCK) \ + int retry; \ + DbTableCATreeNode *current_node; \ + DbTableCATreeNode *prev_node; \ + DbTableCommon* common_table_data = &tb->common; \ + DbTableCATreeBaseNode *base_node; \ + int current_level; \ + (void)prev_node; \ + do { \ + retry = 0; \ + current_node = GET_ROOT_ACQB(tb); \ + prev_node = NULL; \ + current_level = 0; \ + while ( ! current_node->is_base_node ) { \ + current_level = current_level + 1; \ + prev_node = current_node; \ + if (cmp_key_route(common_table_data,key,current_node) < 0) { \ + current_node = GET_LEFT_ACQB(current_node); \ + } else { \ + current_node = GET_RIGHT_ACQB(current_node); \ + } \ + } \ + base_node = ¤t_node->baseOrRoute.base; \ + LOCK(base_node); \ + if ( ! base_node->is_valid ) { \ + /* Retry */ \ + UNLOCK(base_node); \ + retry = 1; \ + } \ + } while(retry); + + +#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \ + if (base_node->lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT \ + && current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) { \ + split_catree(&tb->common, prev_node, current_node); \ + } else if (base_node->lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) { \ + join_catree(tb, prev_node, current_node); \ + } else { \ + wunlock_base_node(base_node); \ + } + +#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \ + static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \ + Eterm key, \ + PARAMETERS){ \ + int result; \ + ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node) \ + result = SEQUENTAIL_CALL; \ + ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \ + return result; \ + } + + +#define ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \ + static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \ + Eterm key, \ + PARAMETERS){ \ + int result; \ + ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(rlock_base_node,runlock_base_node) \ + result = SEQUENTAIL_CALL; \ + runlock_base_node(base_node); \ + return result; \ + } + + + +static DbTableCATreeNode *create_catree_base_node(DbTableCATree *tb) +{ + DbTableCATreeNode *new_base_node_container = + erts_db_alloc(ERTS_ALC_T_DB_TABLE, + (DbTable *) tb, + sizeof(DbTableCATreeNode)); + DbTableCATreeBaseNode *new_base_node = + &new_base_node_container->baseOrRoute.base; + erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER; + new_base_node_container->is_base_node = 1; + new_base_node->root = NULL; + if (tb->common.type & DB_FREQ_READ) + rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ; + if (erts_ets_rwmtx_spin_count >= 0) + rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count; + erts_rwmtx_init_opt(&new_base_node->lock, &rwmtx_opt, + "erl_db_catree_base_node", tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB); + new_base_node->lock_statistics = 0; + new_base_node->is_valid = 1; + new_base_node->tab = (DbTable *) tb; + return new_base_node_container; +} + +static DbTableCATreeNode* +create_catree_route_node(DbTableCommon * common_table_data, + DbTableCATreeNode *left, + DbTableCATreeNode *right, + DbTerm * keyTerm) +{ + Eterm* top; + Eterm key = GETKEY(common_table_data,keyTerm->tpl); + int key_size = size_object(key); + Uint offset = offsetof(DbTableCATreeNode,baseOrRoute) + + offsetof(DbTableCATreeRouteNode,key); + size_t route_node_container_size = + offset + + sizeof(DbTerm) + + sizeof(Eterm)*key_size; + ErlOffHeap tmp_offheap; + byte* new_route_node_container_bytes = + erts_db_alloc(ERTS_ALC_T_DB_TABLE, + (DbTable *) common_table_data, + route_node_container_size); + DbTerm* newp = (DbTerm*) (new_route_node_container_bytes + offset); + DbTableCATreeNode *new_route_node_container = + (DbTableCATreeNode*)new_route_node_container_bytes; + DbTableCATreeRouteNode *new_route_node = + &new_route_node_container->baseOrRoute.route; + new_route_node->tab = (DbTable *)common_table_data; + if (key_size != 0) { + newp->size = key_size; + top = &newp->tpl[1]; + tmp_offheap.first = NULL; + newp->tpl[0] = copy_struct(key, key_size, &top, &tmp_offheap); + newp->first_oh = tmp_offheap.first; + } else { + newp->size = key_size; + newp->first_oh = NULL; + newp->tpl[0] = key; + } + new_route_node_container->is_base_node = 0; + new_route_node->is_valid = 1; + erts_atomic_init_nob(&(new_route_node->left), (erts_aint_t)left); + erts_atomic_init_nob(&(new_route_node->right), (erts_aint_t)right); + erts_mtx_init(&new_route_node->lock, "erl_db_catree_route_node", + NIL, ERTS_LOCK_FLAGS_CATEGORY_DB); + return new_route_node_container; +} + +static void free_catree_base_node(void* base_node_container_ptr) +{ + DbTableCATreeNode *base_node_container = + (DbTableCATreeNode *)base_node_container_ptr; + DbTableCATreeBaseNode *base_node = + &base_node_container->baseOrRoute.base; + erts_rwmtx_destroy(&base_node->lock); + erts_db_free(ERTS_ALC_T_DB_TABLE, + base_node->tab, base_node_container, + sizeof(DbTableCATreeNode)); +} + +static void free_catree_routing_node(void *route_node_container_ptr) +{ + size_t route_node_container_size; + byte* route_node_container_bytes = route_node_container_ptr; + DbTableCATreeNode *route_node_container = + (DbTableCATreeNode *)route_node_container_bytes; + DbTableCATreeRouteNode *route_node = + &route_node_container->baseOrRoute.route; + int key_size = route_node->key.size; + Uint offset = offsetof(DbTableCATreeNode,baseOrRoute) + + offsetof(DbTableCATreeRouteNode,key); + ErlOffHeap tmp_oh; + DbTerm* db_term = (DbTerm*) (route_node_container_bytes + offset); + erts_mtx_destroy(&route_node->lock); + route_node_container_size = + offset + + sizeof(DbTerm) + + sizeof(Eterm)*key_size; + if (key_size != 0) { + tmp_oh.first = db_term->first_oh; + erts_cleanup_offheap(&tmp_oh); + } + erts_db_free(ERTS_ALC_T_DB_TABLE, + route_node->tab, + route_node_container, + route_node_container_size); +} + +/* + * Returns the parent routing node of the specified + * route_node_container if such a routing node exists or NULL if + * route_node_container is attached to the root + */ +static ERTS_INLINE DbTableCATreeNode * +parent_of(DbTableCATree *tb, + DbTableCATreeNode *route_node_container) +{ + + Eterm key = GET_ROUTE_NODE_KEY(route_node_container); + DbTableCATreeNode *current_node = GET_ROOT_ACQB(tb); + DbTableCATreeNode *prev_node = NULL; + if (current_node == route_node_container) { + return NULL; + } + while (current_node != route_node_container) { + prev_node = current_node; + if (cmp_key_route((DbTableCommon *)tb, key, current_node) < 0) { + current_node = GET_LEFT_ACQB(current_node); + } else { + current_node = GET_RIGHT_ACQB(current_node); + } + } + return prev_node; +} + + +static ERTS_INLINE DbTableCATreeNode * +leftmost_base_node(DbTableCATreeNode *root) +{ + DbTableCATreeNode *node = root; + while (!node->is_base_node) { + node = GET_LEFT_ACQB(node); + } + return node; +} + + +static ERTS_INLINE DbTableCATreeNode * +rightmost_base_node(DbTableCATreeNode *root) +{ + DbTableCATreeNode *node = root; + while (!node->is_base_node) { + node = GET_RIGHT_ACQB(node); + } + return node; +} + + +static ERTS_INLINE DbTableCATreeNode * +leftmost_route_node(DbTableCATreeNode *root) +{ + DbTableCATreeNode *node = root; + DbTableCATreeNode *prev_node = NULL; + while (!node->is_base_node) { + prev_node = node; + node = GET_LEFT_ACQB(node); + } + if (prev_node == NULL) { + return NULL; + } else { + return prev_node; + } +} + +static ERTS_INLINE DbTableCATreeNode* +rightmost_route_node(DbTableCATreeNode *root) +{ + DbTableCATreeNode * node = root; + DbTableCATreeNode * prev_node = NULL; + while (!node->is_base_node) { + prev_node = node; + node = GET_RIGHT_ACQB(node); + } + if (prev_node == NULL) { + return NULL; + } else { + return prev_node; + } +} + +static ERTS_INLINE DbTableCATreeNode* +leftmost_base_node_and_path(DbTableCATreeNode *root, CATreeNodeStack * stack) +{ + DbTableCATreeNode * node = root; + while (!node->is_base_node) { + PUSH_NODE(stack, node); + node = GET_LEFT_ACQB(node); + } + return node; +} + +static ERTS_INLINE DbTableCATreeNode* +get_next_base_node_and_path(DbTableCommon *common_table_data, + DbTableCATreeNode *base_node, + CATreeNodeStack *stack) +{ + if (EMPTY_NODE(stack)) { /* The parent of b is the root */ + return NULL; + } else { + if (GET_LEFT(TOP_NODE(stack)) == base_node) { + return leftmost_base_node_and_path( + GET_RIGHT_ACQB(TOP_NODE(stack)), + stack); + } else { + Eterm pkey = + TOP_NODE(stack)->baseOrRoute.route.key.tpl[0]; /* pKey = key of parent */ + POP_NODE(stack); + while (!EMPTY_NODE(stack)) { + if (TOP_NODE(stack)->baseOrRoute.route.is_valid && + cmp_key_route(common_table_data, pkey, TOP_NODE(stack)) <= 0) { + return leftmost_base_node_and_path(GET_RIGHT_ACQB(TOP_NODE(stack)), stack); + } else { + POP_NODE(stack); + } + } + } + return NULL; + } +} + +static ERTS_INLINE void +clone_stack(CATreeNodeStack *search_stack_ptr, + CATreeNodeStack *search_stack_copy_ptr) +{ + int i; + search_stack_copy_ptr->pos = search_stack_ptr->pos; + for (i = 0; i < search_stack_ptr->pos; i++) { + search_stack_copy_ptr->array[i] = search_stack_ptr->array[i]; + } +} + + + +static ERTS_INLINE DbTableCATreeNode* +lock_first_base_node(DbTable *tbl, + CATreeNodeStack *search_stack_ptr, + CATreeNodeStack *locked_base_nodes_stack_ptr) +{ + int retry; + DbTableCATreeNode *current_node; + DbTableCATreeBaseNode *base_node; + DbTableCATree* tb = &tbl->catree; + do { + retry = 0; + current_node = GET_ROOT_ACQB(tb); + while ( ! current_node->is_base_node ) { + PUSH_NODE(search_stack_ptr, current_node); + current_node = GET_LEFT_ACQB(current_node); + } + base_node = ¤t_node->baseOrRoute.base; + rlock_base_node(base_node); + if ( ! base_node->is_valid ) { + /* Retry */ + runlock_base_node(base_node); + retry = 1; + } + } while(retry); + push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node); + return current_node; +} + +static ERTS_INLINE DbTableCATreeBaseNode* +find_and_lock_next_base_node_and_path(DbTable *tbl, + CATreeNodeStack **search_stack_ptr_ptr, + CATreeNodeStack **search_stack_copy_ptr_ptr, + CATreeNodeStack *locked_base_nodes_stack_ptr) +{ + DbTableCATreeNode *current_node; + DbTableCATreeBaseNode *base_node; + CATreeNodeStack * tmp_stack_ptr; + DbTableCommon* common_table_data; + retry_find_and_lock_next_base_node: + current_node = TOP_NODE(locked_base_nodes_stack_ptr); + common_table_data = &tbl->common; + clone_stack(*search_stack_ptr_ptr, *search_stack_copy_ptr_ptr); + current_node = + get_next_base_node_and_path(common_table_data, current_node, *search_stack_ptr_ptr); + if (current_node == NULL) { + return NULL; + } + base_node = ¤t_node->baseOrRoute.base; + rlock_base_node(base_node); + if ( ! base_node->is_valid ) { + /* Retry */ + runlock_base_node(base_node); + /* Revert to previous state */ + current_node = TOP_NODE(locked_base_nodes_stack_ptr); + tmp_stack_ptr = *search_stack_ptr_ptr; + *search_stack_ptr_ptr = *search_stack_copy_ptr_ptr; + *search_stack_copy_ptr_ptr = tmp_stack_ptr; + goto retry_find_and_lock_next_base_node; + } else { + push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node); + } + return base_node; +} + +static ERTS_INLINE +void unlock_and_release_locked_base_node_stack(DbTable *tbl, + CATreeNodeStack *locked_base_nodes_stack_ptr) +{ + DbTableCATreeNode *current_node; + DbTableCATreeBaseNode *base_node; + int i; + for (i = 0; i < locked_base_nodes_stack_ptr->pos; i++) { + current_node = locked_base_nodes_stack_ptr->array[i]; + base_node = ¤t_node->baseOrRoute.base; + if (locked_base_nodes_stack_ptr->pos > 1) { + base_node->lock_statistics = /* This is not atomic which is fine as */ + base_node->lock_statistics + /* correctness does not depend on that. */ + ERL_DB_CATREE_LOCK_MORE_THAN_ONE_CONTRIBUTION; + } + runlock_base_node(base_node); + } + if (locked_base_nodes_stack_ptr->size > STACK_NEED) { + erts_db_free(ERTS_ALC_T_DB_STK, tbl, + locked_base_nodes_stack_ptr->array, + sizeof(DbTableCATreeNode *) * locked_base_nodes_stack_ptr->size); + } +} + +static ERTS_INLINE +void init_stack(CATreeNodeStack *stack, + DbTableCATreeNode **stack_array, + Uint init_size) +{ + stack->array = stack_array; + stack->pos = 0; + stack->size = init_size; +} + +static ERTS_INLINE +void init_tree_stack(DbTreeStack *stack, + TreeDbTerm **stack_array, + Uint init_slot) +{ + stack->array = stack_array; + stack->pos = 0; + stack->slot = init_slot; +} + +#define DEC_ROUTE_NODE_STACK_AND_ARRAY(STACK_NAME) \ + CATreeNodeStack STACK_NAME; \ + CATreeNodeStack * STACK_NAME##_ptr = &(STACK_NAME); \ + DbTableCATreeNode *STACK_NAME##_array[STACK_NEED]; + +#define DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS \ +DEC_ROUTE_NODE_STACK_AND_ARRAY(search_stack) \ +DEC_ROUTE_NODE_STACK_AND_ARRAY(search_stack_copy) \ +DEC_ROUTE_NODE_STACK_AND_ARRAY(locked_base_nodes_stack) \ +init_stack(&search_stack, search_stack_array, 0); \ +init_stack(&search_stack_copy, search_stack_copy_array, 0); \ +init_stack(&locked_base_nodes_stack, locked_base_nodes_stack_array, STACK_NEED);/* Abuse as stack array size*/ + +/* + * Locks and returns the base node that contains the specified key if + * it is present. The function saves the search path to the found base + * node in search_stack_ptr and adds the found base node to + * locked_base_nodes_stack_ptr. + */ +static ERTS_INLINE DbTableCATreeBaseNode * +lock_base_node_with_key(DbTable *tbl, + Eterm key, + CATreeNodeStack * search_stack_ptr, + CATreeNodeStack * locked_base_nodes_stack_ptr) +{ + int retry; + DbTableCATreeNode *current_node; + DbTableCATreeBaseNode *base_node; + DbTableCATree* tb = &tbl->catree; + DbTableCommon* common_table_data = &tbl->common; + do { + retry = 0; + current_node = GET_ROOT_ACQB(tb); + while ( ! current_node->is_base_node ) { + PUSH_NODE(search_stack_ptr, current_node); + if( cmp_key_route(common_table_data,key,current_node) < 0 ) { + current_node = GET_LEFT_ACQB(current_node); + } else { + current_node = GET_RIGHT_ACQB(current_node); + } + } + base_node = ¤t_node->baseOrRoute.base; + rlock_base_node(base_node); + if ( ! base_node->is_valid ) { + /* Retry */ + runlock_base_node(base_node); + retry = 1; + } + } while(retry); + push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node); + return base_node; +} + +/* + * Joins a base node with it's right neighbor. Returns the base node + * resulting from the join in locked state or NULL if there is no base + * node to join with. + */ +static DbTableCATreeNode* +erl_db_catree_force_join_right(DbTableCommon *common_table_data, + DbTableCATreeNode *parent_container, + DbTableCATreeNode *base_container, + DbTableCATreeNode **result_parent_wb) +{ + DbTableCATreeRouteNode *parent; + DbTableCATreeNode *gparent_container; + DbTableCATreeRouteNode *gparent; + DbTableCATreeBaseNode *base = &base_container->baseOrRoute.base; + DbTableCATree *tb = (DbTableCATree *)common_table_data; + DbTableCATreeNode *neighbor_base_container; + DbTableCATreeBaseNode *neighbor_base; + DbTableCATreeNode *new_neighbor_base; + DbTableCATreeNode *neighbor_base_parent; + int neighbour_not_valid; + if (parent_container == NULL) { + return NULL; + } + parent = &parent_container->baseOrRoute.route; + do { + neighbor_base_container = leftmost_base_node(GET_RIGHT_ACQB(parent_container)); + neighbor_base = &neighbor_base_container->baseOrRoute.base; + wlock_base_node_no_stats(neighbor_base); + neighbour_not_valid = !neighbor_base->is_valid; + if (neighbour_not_valid) { + wunlock_base_node(neighbor_base); + } + } while (neighbour_not_valid); + lock_route_node(parent); + parent->is_valid = 0; + neighbor_base->is_valid = 0; + base->is_valid = 0; + gparent = NULL; + gparent_container = NULL; + do { + if (gparent != NULL) { + unlock_route_node(gparent); + } + gparent_container = parent_of(tb, parent_container); + if (gparent_container != NULL) { + gparent = &gparent_container->baseOrRoute.route; + lock_route_node(gparent); + } else { + gparent = NULL; + } + } while (gparent != NULL && !gparent->is_valid); + if (gparent == NULL) { + SET_ROOT_RELB(tb, GET_RIGHT(parent_container)); + } else if (GET_LEFT(gparent_container) == parent_container) { + SET_LEFT_RELB(gparent_container, GET_RIGHT(parent_container)); + } else { + SET_RIGHT_RELB(gparent_container, GET_RIGHT(parent_container)); + } + unlock_route_node(parent); + if (gparent != NULL) { + unlock_route_node(gparent); + } + new_neighbor_base = create_catree_base_node(tb); + new_neighbor_base->baseOrRoute.base.root = + join_trees(base->root, neighbor_base->root); + wlock_base_node_no_stats(&(new_neighbor_base->baseOrRoute.base)); + neighbor_base_parent = NULL; + if (GET_RIGHT(parent_container) == neighbor_base_container) { + neighbor_base_parent = gparent_container; + } else { + neighbor_base_parent = + leftmost_route_node(GET_RIGHT(parent_container)); + } + if(neighbor_base_parent == NULL) { + SET_ROOT_RELB(tb, new_neighbor_base); + } else if (GET_LEFT(neighbor_base_parent) == neighbor_base_container) { + SET_LEFT_RELB(neighbor_base_parent, new_neighbor_base); + } else { + SET_RIGHT_RELB(neighbor_base_parent, new_neighbor_base); + } + wunlock_base_node(base); + wunlock_base_node(neighbor_base); + /* Free the parent and base */ + erts_schedule_thr_prgr_later_op(free_catree_routing_node, + parent_container, + &parent->free_item); + erts_schedule_thr_prgr_later_op(free_catree_base_node, + base_container, + &base->free_item); + erts_schedule_thr_prgr_later_op(free_catree_base_node, + neighbor_base_container, + &neighbor_base->free_item); + + if (parent_container == neighbor_base_container) { + *result_parent_wb = gparent_container; + } else { + *result_parent_wb = neighbor_base_parent; + } + return new_neighbor_base; +} + +/* + * Used to merge together all base nodes for operations such as last + * and select_*. Returns the base node resulting from the merge in + * locked state. + */ +static DbTableCATreeNode * +merge_to_one_locked_base_node(DbTableCommon * common_table_data) +{ + DbTableCATreeNode *parent_container; + DbTableCATreeNode *new_parent_container; + DbTableCATree *tb = (DbTableCATree *)common_table_data; + DbTableCATreeNode *base_container; + DbTableCATreeNode *new_base_container; + int is_not_valid; + /* Find first base node */ + do { + parent_container = NULL; + base_container = GET_ROOT_ACQB(tb); + while ( ! base_container->is_base_node ) { + parent_container = base_container; + base_container = GET_LEFT_ACQB(base_container); + } + wlock_base_node_no_stats(&(base_container->baseOrRoute.base)); + is_not_valid = ! base_container->baseOrRoute.base.is_valid; + if (is_not_valid) { + wunlock_base_node(&(base_container->baseOrRoute.base)); + } + } while(is_not_valid); + do { + new_base_container = erl_db_catree_force_join_right(common_table_data, + parent_container, + base_container, + &new_parent_container); + if (new_base_container != NULL) { + base_container = new_base_container; + parent_container = new_parent_container; + } + } while(new_base_container != NULL); + return base_container; +} + + +static void join_catree(DbTableCATree *tb, + DbTableCATreeNode *parent_container, + DbTableCATreeNode *base_container) +{ + DbTableCATreeRouteNode *parent; + DbTableCATreeNode *gparent_container; + DbTableCATreeRouteNode *gparent; + DbTableCATreeBaseNode *base = &base_container->baseOrRoute.base; + DbTableCATreeNode *neighbor_base_container; + DbTableCATreeBaseNode *neighbor_base; + DbTableCATreeNode *new_neighbor_base; + DbTableCATreeNode *neighbor_base_parent; + if (parent_container == NULL) { + base->lock_statistics = 0; + wunlock_base_node(base); + return; + } + parent = &parent_container->baseOrRoute.route; + if (GET_LEFT(parent_container) == base_container) { + neighbor_base_container = leftmost_base_node(GET_RIGHT_ACQB(parent_container)); + neighbor_base = &neighbor_base_container->baseOrRoute.base; + if (try_wlock_base_node(neighbor_base)) { + /* Failed to acquire lock */ + base->lock_statistics = 0; + wunlock_base_node(base); + return; + } else if (!neighbor_base->is_valid) { + base->lock_statistics = 0; + wunlock_base_node(base); + wunlock_base_node(neighbor_base); + return; + } else { + lock_route_node(parent); + parent->is_valid = 0; + neighbor_base->is_valid = 0; + base->is_valid = 0; + gparent = NULL; + gparent_container = NULL; + do { + if (gparent != NULL) { + unlock_route_node(gparent); + } + gparent_container = parent_of(tb, parent_container); + if (gparent_container != NULL) { + gparent = &gparent_container->baseOrRoute.route; + lock_route_node(gparent); + } else { + gparent = NULL; + } + } while (gparent != NULL && !gparent->is_valid); + if (gparent == NULL) { + SET_ROOT_RELB(tb, GET_RIGHT(parent_container)); + } else if (GET_LEFT(gparent_container) == parent_container) { + SET_LEFT_RELB(gparent_container, GET_RIGHT(parent_container)); + } else { + SET_RIGHT_RELB(gparent_container, GET_RIGHT(parent_container)); + } + unlock_route_node(parent); + if (gparent != NULL) { + unlock_route_node(gparent); + } + new_neighbor_base = create_catree_base_node(tb); + new_neighbor_base->baseOrRoute.base.root = + join_trees(base->root, neighbor_base->root); + neighbor_base_parent = NULL; + if (GET_RIGHT(parent_container) == neighbor_base_container) { + neighbor_base_parent = gparent_container; + } else { + neighbor_base_parent = + leftmost_route_node(GET_RIGHT(parent_container)); + } + } + } else { /* Symetric case */ + neighbor_base_container = rightmost_base_node(GET_LEFT_ACQB(parent_container)); + neighbor_base = &neighbor_base_container->baseOrRoute.base; + if (try_wlock_base_node(neighbor_base)) { + /* Failed to acquire lock */ + base->lock_statistics = 0; + wunlock_base_node(base); + return; + } else if (!neighbor_base->is_valid) { + base->lock_statistics = 0; + wunlock_base_node(base); + wunlock_base_node(neighbor_base); + return; + } else { + lock_route_node(parent); + parent->is_valid = 0; + neighbor_base->is_valid = 0; + base->is_valid = 0; + gparent = NULL; + gparent_container = NULL; + do { + if (gparent != NULL) { + unlock_route_node(gparent); + } + gparent_container = parent_of(tb, parent_container); + if (gparent_container != NULL) { + gparent = &gparent_container->baseOrRoute.route; + lock_route_node(gparent); + } else { + gparent = NULL; + } + } while (gparent != NULL && !gparent->is_valid); + if (gparent == NULL) { + SET_ROOT_RELB(tb, GET_LEFT(parent_container)); + } else if (GET_RIGHT(gparent_container) == parent_container) { + SET_RIGHT_RELB(gparent_container, GET_LEFT(parent_container)); + } else { + SET_LEFT_RELB(gparent_container, GET_LEFT(parent_container)); + } + unlock_route_node(parent); + if (gparent != NULL) { + unlock_route_node(gparent); + } + new_neighbor_base = create_catree_base_node(tb); + new_neighbor_base->baseOrRoute.base.root = + join_trees(neighbor_base->root, base->root); + neighbor_base_parent = NULL; + if (GET_LEFT(parent_container) == neighbor_base_container) { + neighbor_base_parent = gparent_container; + } else { + neighbor_base_parent = + rightmost_route_node(GET_LEFT(parent_container)); + } + } + } + /* Link in new neighbor and free nodes that are no longer in the tree */ + if (neighbor_base_parent == NULL) { + SET_ROOT_RELB(tb, new_neighbor_base); + } else if (GET_LEFT(neighbor_base_parent) == neighbor_base_container) { + SET_LEFT_RELB(neighbor_base_parent, new_neighbor_base); + } else { + SET_RIGHT_RELB(neighbor_base_parent, new_neighbor_base); + } + wunlock_base_node(base); + wunlock_base_node(neighbor_base); + /* Free the parent and base */ + erts_schedule_thr_prgr_later_op(free_catree_routing_node, + parent_container, + &parent->free_item); + erts_schedule_thr_prgr_later_op(free_catree_base_node, + base_container, + &base->free_item); + erts_schedule_thr_prgr_later_op(free_catree_base_node, + neighbor_base_container, + &neighbor_base->free_item); +} + + +static void split_catree(DbTableCommon *tb, + DbTableCATreeNode *parent_container, + DbTableCATreeNode *base_container) { + TreeDbTerm *splitOutWriteBack; + TreeDbTerm *leftWriteBack; + TreeDbTerm *rightWriteBack; + DbTableCATreeNode *left_base_node; + DbTableCATreeNode *right_base_node; + DbTableCATreeNode *routing_node_container; + DbTableCATreeBaseNode *base = &base_container->baseOrRoute.base; + DbTableCATreeRouteNode *parent; + if (parent_container == NULL) { + parent = NULL; + } else { + parent = &parent_container->baseOrRoute.route; + } + + if (less_than_two_elements(base->root)) { + base->lock_statistics = 0; + wunlock_base_node(base); + return; + } else { + /* Split the tree */ + split_tree(tb, + base->root, + &splitOutWriteBack, + &leftWriteBack, + &rightWriteBack); + /* Create new base nodes */ + left_base_node = + create_catree_base_node((DbTableCATree*)tb); + right_base_node = + create_catree_base_node((DbTableCATree*)tb); + left_base_node->baseOrRoute.base.root = leftWriteBack; + right_base_node->baseOrRoute.base.root = rightWriteBack; + routing_node_container = create_catree_route_node(tb, + left_base_node, + right_base_node, + &splitOutWriteBack->dbterm); + if (parent == NULL) { + SET_ROOT_RELB((DbTableCATree*)tb, routing_node_container); + } else if(GET_LEFT(parent_container) == base_container) { + SET_LEFT_RELB(parent_container, routing_node_container); + } else { + SET_RIGHT_RELB(parent_container, routing_node_container); + } + base->is_valid = 0; + wunlock_base_node(base); + erts_schedule_thr_prgr_later_op(free_catree_base_node, + base_container, + &base->free_item); + } +} + +/* + * Helper functions for removing the table + */ + +static void catree_add_base_node_to_free_list( + DbTableCATree *tb, + DbTableCATreeNode *base_node_container) +{ + base_node_container->baseOrRoute.base.next = + tb->base_nodes_to_free_list; + tb->base_nodes_to_free_list = base_node_container; +} + +static void catree_deque_base_node_from_free_list(DbTableCATree *tb) +{ + if (tb->base_nodes_to_free_list == NULL) { + return; /* List empty */ + } else { + DbTableCATreeNode *first = tb->base_nodes_to_free_list; + tb->base_nodes_to_free_list = first->baseOrRoute.base.next; + } +} + +static DbTableCATreeNode *catree_first_base_node_from_free_list( + DbTableCATree *tb) +{ + return tb->base_nodes_to_free_list; +} + +static SWord do_free_routing_nodes_catree_cont(DbTableCATree *tb, SWord num_left) +{ + DbTableCATreeNode *root; + DbTableCATreeNode *p; + for (;;) { + root = POP_NODE(&tb->free_stack_rnodes); + if (root == NULL) break; + else if(root->is_base_node) { + catree_add_base_node_to_free_list(tb, root); + break; + } + for (;;) { + if ((GET_LEFT(root) != NULL) && + (p = GET_LEFT(root))->is_base_node) { + SET_LEFT(root, NULL); + catree_add_base_node_to_free_list(tb, p); + } else if ((GET_RIGHT(root) != NULL) && + (p = GET_RIGHT(root))->is_base_node) { + SET_RIGHT(root, NULL); + catree_add_base_node_to_free_list(tb, p); + } else if ((p = GET_LEFT(root)) != NULL) { + SET_LEFT(root, NULL); + PUSH_NODE(&tb->free_stack_rnodes, root); + root = p; + } else if ((p = GET_RIGHT(root)) != NULL) { + SET_RIGHT(root, NULL); + PUSH_NODE(&tb->free_stack_rnodes, root); + root = p; + } else { + free_catree_routing_node(root); + if (--num_left >= 0) { + break; + } else { + return num_left; /* Done enough for now */ + } + } + } + } + return num_left; +} + +static SWord do_free_base_node_cont(DbTableCATree *tb, SWord num_left) +{ + TreeDbTerm *root; + TreeDbTerm *p; + DbTableCATreeNode *base_node_container = + catree_first_base_node_from_free_list(tb); + for (;;) { + root = POP_NODE(&tb->free_stack_elems); + if (root == NULL) break; + for (;;) { + if ((p = root->left) != NULL) { + root->left = NULL; + PUSH_NODE(&tb->free_stack_elems, root); + root = p; + } else if ((p = root->right) != NULL) { + root->right = NULL; + PUSH_NODE(&tb->free_stack_elems, root); + root = p; + } else { + free_term((DbTable*)tb, root); + if (--num_left >= 0) { + break; + } else { + return num_left; /* Done enough for now */ + } + } + } + } + catree_deque_base_node_from_free_list(tb); + free_catree_base_node(base_node_container); + base_node_container = catree_first_base_node_from_free_list(tb); + if (base_node_container != NULL) { + PUSH_NODE(&tb->free_stack_elems, base_node_container->baseOrRoute.base.root); + } + return num_left; +} + + +/* +** Initialization function +*/ + +void db_initialize_catree(void) +{ + return; +}; + +/* +** Table interface routines (i.e., what's called by the bif's) +*/ + +int db_create_catree(Process *p, DbTable *tbl) +{ + DbTableCATree *tb = &tbl->catree; + DbTableCATreeNode *root = create_catree_base_node(tb); + tb->deletion = 0; + tb->base_nodes_to_free_list = NULL; + erts_atomic_init_relb(&(tb->root), (erts_aint_t)root); + return DB_ERROR_NONE; +} + +static int db_first_catree(Process *p, DbTable *tbl, Eterm *ret) +{ + DbTableCATreeBaseNode *base_node; + int result; + DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS; + /* Find first base node */ + base_node = &(lock_first_base_node(tbl, &search_stack, &locked_base_nodes_stack)->baseOrRoute.base); + /* Find next base node until non-empty base node is found */ + while (base_node != NULL && base_node->root == NULL) { + base_node = find_and_lock_next_base_node_and_path(tbl, &search_stack_ptr, &search_stack_copy_ptr, locked_base_nodes_stack_ptr); + } + /* Get the return value */ + result = db_first_tree_common(p, tbl, (base_node == NULL ? NULL : base_node->root), ret, NULL); + /* Unlock base nodes */ + unlock_and_release_locked_base_node_stack(tbl, locked_base_nodes_stack_ptr); + return result; +} + +static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) +{ + DbTreeStack next_search_stack; + TreeDbTerm *next_search_stack_array[STACK_NEED]; + DbTableCATreeBaseNode *base_node; + int result = 0; + DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS; + init_tree_stack(&next_search_stack, next_search_stack_array, 0); + /* Lock base node with key */ + base_node = lock_base_node_with_key(tbl, key, &search_stack, &locked_base_nodes_stack); + /* Continue until key is not >= greatest key in base_node */ + while (base_node != NULL) { + result = db_next_tree_common(p, tbl, base_node->root, key, + ret, &next_search_stack); + if (result != DB_ERROR_NONE || *ret != am_EOT) { + break; + } + base_node = + find_and_lock_next_base_node_and_path(tbl, + &search_stack_ptr, + &search_stack_copy_ptr, + locked_base_nodes_stack_ptr); + } + unlock_and_release_locked_base_node_stack(tbl, &locked_base_nodes_stack); + return result; +} + +static int db_last_catree(Process *p, DbTable *tbl, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common); + int result = db_last_tree_common(p, tbl, base->baseOrRoute.base.root, ret, NULL); + wunlock_base_node(&(base->baseOrRoute.base)); + return result; + +} + +static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) +{ + DbTreeStack stack; + TreeDbTerm * stack_array[STACK_NEED]; + DbTableCATree *tb = &tbl->catree; + int result; + DbTableCATreeNode *base; + init_tree_stack(&stack, stack_array, 0); + base = merge_to_one_locked_base_node(&tb->common); + result = db_prev_tree_common(p, tbl, base->baseOrRoute.base.root, key, ret, &stack); + wunlock_base_node(&(base->baseOrRoute.base)); + return result; +} + +#define ERL_DB_CATREE_DO_OPERATION_PUT_PARAMS Eterm obj, int key_clash_fail +ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(put, + ERL_DB_CATREE_DO_OPERATION_PUT_PARAMS, + db_put_tree_common(&tb->common, + &base_node->root, + obj, + key_clash_fail, + NULL);) + +static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail) +{ + DbTableCATree *tb = &tbl->catree; + Eterm key = GETKEY(&tb->common, tuple_val(obj)); + return erl_db_catree_do_operation_put(tb, + key, + obj, + key_clash_fail); +} + +#define ERL_DB_CATREE_DO_OPERATION_GET_PARAMS Process *p, Eterm *ret +ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(get, + ERL_DB_CATREE_DO_OPERATION_GET_PARAMS, + db_get_tree_common(p, &tb->common, + base_node->root, + key, ret, NULL);) + +static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + return erl_db_catree_do_operation_get(tb, key, p, ret); +} + +#define ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS Eterm *ret +ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(member, + ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS, + db_member_tree_common(&tb->common, + base_node->root, + key, + ret, + NULL);) + +static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + return erl_db_catree_do_operation_member(tb, key, ret); +} + +#define ERL_DB_CATREE_DO_OPERATION_GET_ELEMENT_PARAMS Process *p, int ndex, Eterm *ret +ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(get_element, + ERL_DB_CATREE_DO_OPERATION_GET_ELEMENT_PARAMS, + db_get_element_tree_common(p, &tb->common, + base_node->root, + key, ndex, + ret, NULL)) + +static int db_get_element_catree(Process *p, DbTable *tbl, + Eterm key, int ndex, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + return erl_db_catree_do_operation_get_element(tb, key, p, ndex, ret); +} + +#define ERL_DB_CATREE_DO_OPERATION_ERASE_PARAMS DbTable *tbl, Eterm *ret +ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(erase, + ERL_DB_CATREE_DO_OPERATION_ERASE_PARAMS, + db_erase_tree_common(tbl, + &base_node->root, + key, + ret, + NULL);) + +static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + return erl_db_catree_do_operation_erase(tb, key, tbl, ret); +} + +#define ERL_DB_CATREE_DO_OPERATION_ERASE_OBJECT_PARAMS DbTable *tbl, Eterm object, Eterm *ret +ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(erase_object, + ERL_DB_CATREE_DO_OPERATION_ERASE_OBJECT_PARAMS, + db_erase_object_tree_common(tbl, + &base_node->root, + object, + ret, + NULL);) + +static int db_erase_object_catree(DbTable *tbl, Eterm object, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + Eterm key = GETKEY(&tb->common, tuple_val(object)); + return erl_db_catree_do_operation_erase_object(tb, key, tbl, object, ret); +} + + +static int db_slot_catree(Process *p, DbTable *tbl, + Eterm slot_term, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + int result; + DbTableCATreeNode *base; + base = merge_to_one_locked_base_node(&tb->common); + result = db_slot_tree_common(p, tbl, base->baseOrRoute.base.root, + slot_term, ret, NULL); + wunlock_base_node(&(base->baseOrRoute.base)); + return result; +} + +static int db_select_continue_catree(Process *p, + DbTable *tbl, + Eterm continuation, + Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + int result; + DbTableCATreeNode *base; + base = merge_to_one_locked_base_node(&tb->common); + result = db_select_continue_tree_common(p, &tb->common, &base->baseOrRoute.base.root, + continuation, ret, NULL); + wunlock_base_node(&(base->baseOrRoute.base)); + return result; +} + + +static int db_select_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, int reverse, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + int result; + DbTableCATreeNode *base; + base = merge_to_one_locked_base_node(&tb->common); + result = db_select_tree_common(p, &tb->common, &base->baseOrRoute.base.root, + tid, pattern, reverse, ret, + NULL); + wunlock_base_node(&(base->baseOrRoute.base)); + return result; +} + +static int db_select_count_continue_catree(Process *p, + DbTable *tbl, + Eterm continuation, + Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + int result; + DbTableCATreeNode *base; + base = merge_to_one_locked_base_node(&tb->common); + result = db_select_count_continue_tree_common(p, &tb->common, + &base->baseOrRoute.base.root, + continuation, ret, NULL); + wunlock_base_node(&(base->baseOrRoute.base)); + return result; +} + +static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + int result; + DbTableCATreeNode *base; + base = merge_to_one_locked_base_node(&tb->common); + result = db_select_count_tree_common(p, &tb->common, &base->baseOrRoute.base.root, + tid, pattern, ret, NULL); + wunlock_base_node(&(base->baseOrRoute.base)); + return result; +} + +static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Sint chunk_size, + int reversed, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + int result; + DbTableCATreeNode *base; + base = merge_to_one_locked_base_node(&tb->common); + result = db_select_chunk_tree_common(p, &tb->common, &base->baseOrRoute.base.root, + tid, pattern, chunk_size, reversed, ret, NULL); + wunlock_base_node(&(base->baseOrRoute.base)); + return result; +} + +static int db_select_delete_continue_catree(Process *p, + DbTable *tbl, + Eterm continuation, + Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + DbTreeStack stack; + TreeDbTerm * stack_array[STACK_NEED]; + int result; + DbTableCATreeNode *base; + init_tree_stack(&stack, stack_array, 0); + base = merge_to_one_locked_base_node(&tb->common); + result = db_select_delete_continue_tree_common(p, tbl, &base->baseOrRoute.base.root, + continuation, ret, &stack); + wunlock_base_node(&(base->baseOrRoute.base)); + return result; +} + +static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + DbTreeStack stack; + TreeDbTerm * stack_array[STACK_NEED]; + int result; + DbTableCATreeNode *base; + init_tree_stack(&stack, stack_array, 0); + base = merge_to_one_locked_base_node(&tb->common); + result = db_select_delete_tree_common(p, tbl, &base->baseOrRoute.base.root, + tid, pattern, ret, &stack); + wunlock_base_node(&(base->baseOrRoute.base)); + return result; +} + +static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + int result; + DbTableCATreeNode *base; + base = merge_to_one_locked_base_node(&tb->common); + result = db_select_replace_tree_common(p, &tb->common, + &base->baseOrRoute.base.root, + tid, pattern, ret, NULL); + wunlock_base_node(&(base->baseOrRoute.base)); + return result; +} + +static int db_select_replace_continue_catree(Process *p, DbTable *tbl, + Eterm continuation, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + int result; + DbTableCATreeNode *base; + base = merge_to_one_locked_base_node(&tb->common); + result = db_select_replace_continue_tree_common(p, &tb->common, + &base->baseOrRoute.base.root, + continuation, ret, NULL); + wunlock_base_node(&(base->baseOrRoute.base)); + return result; +} + +#define ERL_DB_CATREE_DO_OPERATION_TAKE_PARAMS Process *p, Eterm *ret, DbTable *tbl +ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(take, + ERL_DB_CATREE_DO_OPERATION_TAKE_PARAMS, + db_take_tree_common(p, tbl, + &base_node->root, + key, ret, NULL);) + +static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) +{ + DbTableCATree *tb = &tbl->catree; + return erl_db_catree_do_operation_take(tb, key, p, ret, tbl); +} + +/* +** Other interface routines (not directly coupled to one bif) +*/ + + +/* Display tree contents (for dump) */ +static void db_print_catree(fmtfn_t to, void *to_arg, + int show, DbTable *tbl) +{ + DbTableCATree *tb = &tbl->catree; + DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common); + db_print_tree_common(to, to_arg, show, base->baseOrRoute.base.root, tbl); + wunlock_base_node(&(base->baseOrRoute.base)); +} + +/* Release all memory occupied by a single table */ +static int db_free_table_catree(DbTable *tbl) +{ + while (db_free_table_continue_catree(tbl, ERTS_SWORD_MAX) < 0) + ; + return 1; +} + +static SWord db_free_table_continue_catree(DbTable *tbl, SWord reds) +{ + DbTableCATreeNode *first_base_node; + DbTableCATree *tb = &tbl->catree; + if (!tb->deletion) { + tb->deletion = 1; + tb->free_stack_elems.array = + erts_db_alloc(ERTS_ALC_T_DB_STK, + (DbTable *) tb, + sizeof(TreeDbTerm *) * STACK_NEED); + tb->free_stack_elems.pos = 0; + tb->free_stack_elems.slot = 0; + tb->free_stack_rnodes.array = + erts_db_alloc(ERTS_ALC_T_DB_STK, + (DbTable *) tb, + sizeof(DbTableCATreeNode *) * STACK_NEED); + tb->free_stack_rnodes.pos = 0; + tb->free_stack_rnodes.size = STACK_NEED; + PUSH_NODE(&tb->free_stack_rnodes, GET_ROOT(tb)); + tb->is_routing_nodes_freed = 0; + tb->base_nodes_to_free_list = NULL; + } + if ( ! tb->is_routing_nodes_freed ) { + reds = do_free_routing_nodes_catree_cont(tb, reds); + if (reds < 0) { + return reds; /* Not finished */ + } else { + tb->is_routing_nodes_freed = 1; /* Ready with the routing nodes */ + first_base_node = catree_first_base_node_from_free_list(tb); + PUSH_NODE(&tb->free_stack_elems, first_base_node->baseOrRoute.base.root); + } + } + while (catree_first_base_node_from_free_list(tb) != NULL) { + reds = do_free_base_node_cont(tb, reds); + if (reds < 0) { + return reds; /* Continue later */ + } + } + /* Time to free the main structure*/ + erts_db_free(ERTS_ALC_T_DB_STK, + (DbTable *) tb, + (void *) tb->free_stack_elems.array, + sizeof(TreeDbTerm *) * STACK_NEED); + erts_db_free(ERTS_ALC_T_DB_STK, + (DbTable *) tb, + (void *) tb->free_stack_rnodes.array, + sizeof(DbTableCATreeNode *) * STACK_NEED); + return 1; +} + +static SWord db_delete_all_objects_catree(Process* p, DbTable* tbl, SWord reds) +{ + reds = db_free_table_continue_catree(tbl, reds); + if (reds < 0) + return reds; + db_create_catree(p, tbl); + erts_atomic_set_nob(&tbl->catree.common.nitems, 0); + return reds; +} + + +static void db_foreach_offheap_catree(DbTable *tbl, + void (*func)(ErlOffHeap *, void *), + void *arg) +{ + DbTableCATree *tb = &tbl->catree; + DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common); + db_foreach_offheap_tree_common(base->baseOrRoute.base.root, func, arg); + wunlock_base_node(&(base->baseOrRoute.base)); +} + +static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm obj, + DbUpdateHandle *handle) +{ + DbTableCATree *tb = &tbl->catree; + int res; + ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node); + res = db_lookup_dbterm_tree_common(p, tbl, &base_node->root, key, obj, handle, NULL); + if (res == 0) { + ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART; + } else { + /* db_finalize_dbterm_catree will unlock */ + handle->lck = prev_node; + handle->lck2 = current_node; + handle->current_level = current_level; + } + return res; +} + +static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *handle) +{ + DbTableCATree *tb = &(handle->tb->catree); + DbTableCATreeNode *prev_node = handle->lck; + DbTableCATreeNode *current_node = handle->lck2; + int current_level = handle->current_level; + DbTableCATreeBaseNode *base_node = ¤t_node->baseOrRoute.base; + db_finalize_dbterm_tree_common(cret, handle, NULL); + ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART; + return; +} + +#ifdef ERTS_ENABLE_LOCK_COUNT +static void erts_lcnt_enable_db_catree_lock_count_helper(DbTableCATree *tb, + DbTableCATreeNode *node, + int enable) +{ + erts_lcnt_ref_t *lcnt_ref; + erts_lock_flags_t lock_type; + if (node->is_base_node) { + lcnt_ref = &GET_BASE_NODE_LOCK(node)->lcnt; + lock_type = ERTS_LOCK_TYPE_RWMUTEX; + } else { + erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_LEFT(node), enable); + erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_RIGHT(node), enable); + lcnt_ref = &GET_ROUTE_NODE_LOCK(node)->lcnt; + lock_type = ERTS_LOCK_TYPE_MUTEX; + } + if (enable) { + erts_lcnt_install_new_lock_info(lcnt_ref, "db_hash_slot", tb->common.the_name, + lock_type | ERTS_LOCK_FLAGS_CATEGORY_DB); + } else { + erts_lcnt_uninstall(lcnt_ref); + } +} + +void erts_lcnt_enable_db_catree_lock_count(DbTableCATree *tb, int enable) +{ + erts_lcnt_enable_db_catree_lock_count_helper(tb, GET_ROOT(tb), enable); +} +#endif /* ERTS_ENABLE_LOCK_COUNT */ + + +#ifdef HARDDEBUG + +/* + * Not called, but kept as it might come to use + */ +static inline int my_check_table_tree(TreeDbTerm *t) +{ + int lh, rh; + if (t == NULL) + return 0; + lh = my_check_table_tree(t->left); + rh = my_check_table_tree(t->right); + if ((rh - lh) != t->balance) { + erts_fprintf(stderr, "Invalid tree balance for this node:\n"); + erts_fprintf(stderr,"balance = %d, left = 0x%08X, right = 0x%08X\n", + t->balance, t->left, t->right); + erts_fprintf(stderr,"\nDump:\n---------------------------------\n"); + erts_fprintf(stderr,"\n---------------------------------\n"); + abort(); + } + return ((rh > lh) ? rh : lh) + 1; +} + +#endif diff --git a/erts/emulator/beam/erl_db_catree.h b/erts/emulator/beam/erl_db_catree.h new file mode 100644 index 0000000000..1f2c7da091 --- /dev/null +++ b/erts/emulator/beam/erl_db_catree.h @@ -0,0 +1,91 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 1998-2016. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * %CopyrightEnd% + */ + +/* + * Description: Implementation of ETS ordered_set table type with + * fine-grained synchronization. + * + * Author: Kjell Winblad + * + * "erl_db_catree.c" contains more details about the implementation. + * + */ + +#ifndef _DB_CATREE_H +#define _DB_CATREE_H + +struct DbTableCATreeNode; + +typedef struct { + erts_rwmtx_t lock; /* The lock for this base node */ + Sint lock_statistics; + int is_valid; /* If this base node is still valid */ + TreeDbTerm *root; /* The root of the sequential tree */ + DbTable * tab; /* Table ptr, used when freeing using thread progress */ + ErtsThrPrgrLaterOp free_item; /* Used when freeing using thread progress */ + struct DbTableCATreeNode * next; /* Used when gradually deleting */ +} DbTableCATreeBaseNode; + +typedef struct { + ErtsThrPrgrLaterOp free_item; /* Used when freeing using thread progress */ + DbTable* tab; /* Table ptr, used when freeing using thread progress */ + erts_mtx_t lock; /* Used when joining route nodes */ + int is_valid; /* If this route node is still valid */ + erts_atomic_t left; + erts_atomic_t right; + DbTerm key; +} DbTableCATreeRouteNode; + +typedef struct DbTableCATreeNode { + int is_base_node; + union { + DbTableCATreeRouteNode route; + DbTableCATreeBaseNode base; + } baseOrRoute; +} DbTableCATreeNode; + +typedef struct { + Uint pos; /* Current position on stack */ + Uint size; /* The size of the stack array */ + DbTableCATreeNode** array; /* The stack */ +} CATreeNodeStack; + +typedef struct db_table_catree { + DbTableCommon common; + + /* CA Tree-specific fields */ + erts_atomic_t root; /* The tree root (DbTableCATreeNode*) */ + Uint deletion; /* Being deleted */ + DbTreeStack free_stack_elems;/* Used for deletion ...*/ + CATreeNodeStack free_stack_rnodes; + DbTableCATreeNode *base_nodes_to_free_list; + int is_routing_nodes_freed; +} DbTableCATree; + +void db_initialize_catree(void); + +int db_create_catree(Process *p, DbTable *tbl); + + +#ifdef ERTS_ENABLE_LOCK_COUNT +void erts_lcnt_enable_db_catree_lock_count(DbTableCATree *tb, int enable); +#endif + +#endif /* _DB_CATREE_H */ diff --git a/erts/emulator/beam/erl_db_tree.c b/erts/emulator/beam/erl_db_tree.c index 2eb874b005..f643344477 100644 --- a/erts/emulator/beam/erl_db_tree.c +++ b/erts/emulator/beam/erl_db_tree.c @@ -48,34 +48,13 @@ #include "erl_binary.h" #include "erl_db_tree.h" +#include "erl_db_tree_util.h" #define GETKEY_WITH_POS(Keypos, Tplp) (*((Tplp) + Keypos)) #define NITEMS(tb) ((int)erts_atomic_read_nob(&(tb)->common.nitems)) -/* -** A stack of this size is enough for an AVL tree with more than -** 0xFFFFFFFF elements. May be subject to change if -** the datatype of the element counter is changed to a 64 bit integer. -** The Maximal height of an AVL tree is calculated as: -** h(n) <= 1.4404 * log(n + 2) - 0.328 -** Where n denotes the number of nodes, h(n) the height of the tree -** with n nodes and log is the binary logarithm. -*/ - -#define STACK_NEED 50 #define TREE_MAX_ELEMENTS 0xFFFFFFFFUL -#define PUSH_NODE(Dtt, Tdt) \ - ((Dtt)->array[(Dtt)->pos++] = Tdt) - -#define POP_NODE(Dtt) \ - (((Dtt)->pos) ? \ - (Dtt)->array[--((Dtt)->pos)] : NULL) - -#define TOP_NODE(Dtt) \ - ((Dtt->pos) ? \ - (Dtt)->array[(Dtt)->pos - 1] : NULL) - #define TOPN_NODE(Dtt, Pos) \ (((Pos) < Dtt->pos) ? \ (Dtt)->array[(Dtt)->pos - ((Pos) + 1)] : NULL) @@ -89,9 +68,9 @@ /* Obtain table static stack if available. NULL if not. ** Must be released with release_stack() */ -static DbTreeStack* get_static_stack(DbTableTree* tb) +ERTS_INLINE static DbTreeStack* get_static_stack(DbTableTree* tb) { - if (!erts_atomic_xchg_acqb(&tb->is_stack_busy, 1)) { + if (tb != NULL && !erts_atomic_xchg_acqb(&tb->is_stack_busy, 1)) { return &tb->static_stack; } return NULL; @@ -100,13 +79,14 @@ static DbTreeStack* get_static_stack(DbTableTree* tb) /* Obtain static stack if available, otherwise empty dynamic stack. ** Must be released with release_stack() */ -static DbTreeStack* get_any_stack(DbTableTree* tb) +static DbTreeStack* get_any_stack(DbTable* tb, DbTableTree* stack_container) { DbTreeStack* stack; - if (!erts_atomic_xchg_acqb(&tb->is_stack_busy, 1)) { - return &tb->static_stack; + if (stack_container != NULL && + !erts_atomic_xchg_acqb(&stack_container->is_stack_busy, 1)) { + return &stack_container->static_stack; } - stack = erts_db_alloc(ERTS_ALC_T_DB_STK, (DbTable *) tb, + stack = erts_db_alloc(ERTS_ALC_T_DB_STK, tb, sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED); stack->pos = 0; stack->slot = 0; @@ -114,62 +94,59 @@ static DbTreeStack* get_any_stack(DbTableTree* tb) return stack; } -static void release_stack(DbTableTree* tb, DbTreeStack* stack) +static void release_stack(DbTable* tb, DbTableTree* stack_container, DbTreeStack* stack) { - if (stack == &tb->static_stack) { - ASSERT(erts_atomic_read_nob(&tb->is_stack_busy) == 1); - erts_atomic_set_relb(&tb->is_stack_busy, 0); + if (stack_container != NULL && stack == &stack_container->static_stack) { + ASSERT(erts_atomic_read_nob(&stack_container->is_stack_busy) == 1); + erts_atomic_set_relb(&stack_container->is_stack_busy, 0); } else { - erts_db_free(ERTS_ALC_T_DB_STK, (DbTable *) tb, + erts_db_free(ERTS_ALC_T_DB_STK, tb, (void *) stack, sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED); } } -static ERTS_INLINE void reset_static_stack(DbTableTree* tb) +static ERTS_INLINE void reset_stack(DbTreeStack* stack) { - tb->static_stack.pos = 0; - tb->static_stack.slot = 0; + if (stack != NULL) { + stack->pos = 0; + stack->slot = 0; + } } -static ERTS_INLINE void free_term(DbTableTree *tb, TreeDbTerm* p) +static ERTS_INLINE void reset_static_stack(DbTableTree* tb) { - db_free_term((DbTable*)tb, p, offsetof(TreeDbTerm, dbterm)); + if (tb != NULL) { + reset_stack(&tb->static_stack); + } } -static ERTS_INLINE TreeDbTerm* new_dbterm(DbTableTree *tb, Eterm obj) +static ERTS_INLINE TreeDbTerm* new_dbterm(DbTableCommon *tb, Eterm obj) { TreeDbTerm* p; - if (tb->common.compress) { - p = db_store_term_comp(&tb->common, NULL, offsetof(TreeDbTerm,dbterm), obj); + if (tb->compress) { + p = db_store_term_comp(tb, NULL, offsetof(TreeDbTerm,dbterm), obj); } else { - p = db_store_term(&tb->common, NULL, offsetof(TreeDbTerm,dbterm), obj); + p = db_store_term(tb, NULL, offsetof(TreeDbTerm,dbterm), obj); } return p; } -static ERTS_INLINE TreeDbTerm* replace_dbterm(DbTableTree *tb, TreeDbTerm* old, +static ERTS_INLINE TreeDbTerm* replace_dbterm(DbTableCommon *tb, TreeDbTerm* old, Eterm obj) { TreeDbTerm* p; ASSERT(old != NULL); - if (tb->common.compress) { - p = db_store_term_comp(&tb->common, &(old->dbterm), offsetof(TreeDbTerm,dbterm), obj); + if (tb->compress) { + p = db_store_term_comp(tb, &(old->dbterm), offsetof(TreeDbTerm,dbterm), obj); } else { - p = db_store_term(&tb->common, &(old->dbterm), offsetof(TreeDbTerm,dbterm), obj); + p = db_store_term(tb, &(old->dbterm), offsetof(TreeDbTerm,dbterm), obj); } return p; } /* -** Some macros for "direction stacks" -*/ -#define DIR_LEFT 0 -#define DIR_RIGHT 1 -#define DIR_END 2 - -/* * Number of records to delete before trapping. */ #define DELETE_RECORD_LIMIT 12000 @@ -262,7 +239,9 @@ struct select_count_context { */ struct select_delete_context { Process *p; - DbTableTree *tb; + DbTableCommon *tb; + TreeDbTerm **root; + DbTreeStack *stack; Uint accum; Binary *mp; Eterm end_condition; @@ -277,7 +256,8 @@ struct select_delete_context { */ struct select_replace_context { Process *p; - DbTableTree *tb; + DbTableCommon *tb; + TreeDbTerm **root; Binary *mp; Eterm end_condition; Eterm *lastobj; @@ -292,73 +272,82 @@ typedef int (*extra_match_validator_t)(int keypos, Eterm match, Eterm guard, Ete /* ** Forward declarations */ -static TreeDbTerm *linkout_tree(DbTableTree *tb, Eterm key); -static TreeDbTerm *linkout_object_tree(DbTableTree *tb, - Eterm object); +static TreeDbTerm *linkout_tree(DbTableCommon *tb, TreeDbTerm **root, + Eterm key, DbTreeStack *stack); +static TreeDbTerm *linkout_object_tree(DbTableCommon *tb, TreeDbTerm **root, + Eterm object, DbTableTree *stack); static SWord do_free_tree_continue(DbTableTree *tb, SWord reds); -static void free_term(DbTableTree *tb, TreeDbTerm* p); -static int balance_left(TreeDbTerm **this); -static int balance_right(TreeDbTerm **this); +static void free_term(DbTable *tb, TreeDbTerm* p); +int tree_balance_left(TreeDbTerm **this); +int tree_balance_right(TreeDbTerm **this); static int delsub(TreeDbTerm **this); -static TreeDbTerm *slot_search(Process *p, DbTableTree *tb, Sint slot); -static TreeDbTerm *find_node(DbTableTree *tb, Eterm key); -static TreeDbTerm **find_node2(DbTableTree *tb, Eterm key); -static TreeDbTerm **find_ptr(DbTableTree *tb, DbTreeStack*, TreeDbTerm *this); -static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack*, Eterm key); -static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack*, Eterm key); -static TreeDbTerm *find_next_from_pb_key(DbTableTree *tb, DbTreeStack*, - Eterm key); -static TreeDbTerm *find_prev_from_pb_key(DbTableTree *tb, DbTreeStack*, - Eterm key); -static void traverse_backwards(DbTableTree *tb, +static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root, Sint slot, + DbTable *tb, DbTableTree *stack_container); +static TreeDbTerm *find_node(DbTableCommon *tb, TreeDbTerm *root, + Eterm key, DbTableTree *stack_container); +static TreeDbTerm **find_node2(DbTableCommon *tb, TreeDbTerm **root, Eterm key); +static TreeDbTerm **find_ptr(DbTableCommon *tb, TreeDbTerm **root, + DbTreeStack *stack, TreeDbTerm *this); +static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root, + DbTreeStack* stack, Eterm key); +static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root, + DbTreeStack* stack, Eterm key); +static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, + DbTreeStack* stack, Eterm key); +static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, + DbTreeStack* stack, Eterm key); +static void traverse_backwards(DbTableCommon *tb, + TreeDbTerm **root, DbTreeStack*, Eterm lastkey, - int (*doit)(DbTableTree *tb, + int (*doit)(DbTableCommon *, TreeDbTerm *, void *, int), void *context); -static void traverse_forward(DbTableTree *tb, +static void traverse_forward(DbTableCommon *tb, + TreeDbTerm **root, DbTreeStack*, Eterm lastkey, - int (*doit)(DbTableTree *tb, + int (*doit)(DbTableCommon *tb, TreeDbTerm *, void *, int), void *context); -static void traverse_update_backwards(DbTableTree *tb, +static void traverse_update_backwards(DbTableCommon *tb, + TreeDbTerm **root, DbTreeStack*, Eterm lastkey, - int (*doit)(DbTableTree *tb, + int (*doit)(DbTableCommon *tb, TreeDbTerm **, // out void *, int), void *context); -static int key_given(DbTableTree *tb, Eterm pattern, TreeDbTerm ***ret, - Eterm *partly_bound_key); +static int key_given(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern, + TreeDbTerm ***ret, Eterm *partly_bound); static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key); static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done); -static int analyze_pattern(DbTableTree *tb, Eterm pattern, +static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern, extra_match_validator_t extra_validator, /* Optional callback */ struct mp_info *mpi); -static int doit_select(DbTableTree *tb, +static int doit_select(DbTableCommon *tb, TreeDbTerm *this, void *ptr, int forward); -static int doit_select_count(DbTableTree *tb, +static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, void *ptr, int forward); -static int doit_select_chunk(DbTableTree *tb, +static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, void *ptr, int forward); -static int doit_select_delete(DbTableTree *tb, +static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr, int forward); -static int doit_select_replace(DbTableTree *tb, +static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this_ptr, void *ptr, int forward); @@ -508,18 +497,18 @@ int db_create_tree(Process *p, DbTable *tbl) return DB_ERROR_NONE; } -static int db_first_tree(Process *p, DbTable *tbl, Eterm *ret) +int db_first_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, + Eterm *ret, DbTableTree *stack_container) { - DbTableTree *tb = &tbl->tree; DbTreeStack* stack; TreeDbTerm *this; - if (( this = tb->root ) == NULL) { + if (( this = root ) == NULL) { *ret = am_EOT; return DB_ERROR_NONE; } /* Walk down the tree to the left */ - if ((stack = get_static_stack(tb)) != NULL) { + if ((stack = get_static_stack(stack_container)) != NULL) { stack->pos = stack->slot = 0; } while (this->left != NULL) { @@ -529,23 +518,27 @@ static int db_first_tree(Process *p, DbTable *tbl, Eterm *ret) if (stack) { PUSH_NODE(stack, this); stack->slot = 1; - release_stack(tb,stack); + release_stack(tbl,stack_container,stack); } *ret = db_copy_key(p, tbl, &this->dbterm); return DB_ERROR_NONE; } -static int db_next_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) +static int db_first_tree(Process *p, DbTable *tbl, Eterm *ret) { DbTableTree *tb = &tbl->tree; - DbTreeStack* stack; + return db_first_tree_common(p, tbl, tb->root, ret, tb); +} + +int db_next_tree_common(Process *p, DbTable *tbl, + TreeDbTerm *root, Eterm key, + Eterm *ret, DbTreeStack* stack) +{ TreeDbTerm *this; if (is_atom(key) && key == am_EOT) return DB_ERROR_BADKEY; - stack = get_any_stack(tb); - this = find_next(tb, stack, key); - release_stack(tb,stack); + this = find_next(&tbl->common, root, stack, key); if (this == NULL) { *ret = am_EOT; return DB_ERROR_NONE; @@ -554,18 +547,27 @@ static int db_next_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) return DB_ERROR_NONE; } -static int db_last_tree(Process *p, DbTable *tbl, Eterm *ret) +static int db_next_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) { DbTableTree *tb = &tbl->tree; + DbTreeStack* stack = get_any_stack(tbl, tb); + int ret_val = db_next_tree_common(p, tbl, tb->root, key, ret, stack); + release_stack(tbl,tb,stack); + return ret_val; +} + +int db_last_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, + Eterm *ret, DbTableTree *stack_container) +{ TreeDbTerm *this; DbTreeStack* stack; - if (( this = tb->root ) == NULL) { + if (( this = root ) == NULL) { *ret = am_EOT; return DB_ERROR_NONE; } /* Walk down the tree to the right */ - if ((stack = get_static_stack(tb)) != NULL) { + if ((stack = get_static_stack(stack_container)) != NULL) { stack->pos = stack->slot = 0; } while (this->right != NULL) { @@ -574,24 +576,27 @@ static int db_last_tree(Process *p, DbTable *tbl, Eterm *ret) } if (stack) { PUSH_NODE(stack, this); - stack->slot = NITEMS(tb); - release_stack(tb,stack); + stack->slot = NITEMS(tbl); + release_stack(tbl,stack_container,stack); } *ret = db_copy_key(p, tbl, &this->dbterm); return DB_ERROR_NONE; } -static int db_prev_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) +static int db_last_tree(Process *p, DbTable *tbl, Eterm *ret) { DbTableTree *tb = &tbl->tree; + return db_last_tree_common(p, tbl, tb->root, ret, tb); +} + +int db_prev_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, Eterm key, + Eterm *ret, DbTreeStack* stack) +{ TreeDbTerm *this; - DbTreeStack* stack; if (is_atom(key) && key == am_EOT) return DB_ERROR_BADKEY; - stack = get_any_stack(tb); - this = find_prev(tb, stack, key); - release_stack(tb,stack); + this = find_prev(&tbl->common, root, stack, key); if (this == NULL) { *ret = am_EOT; return DB_ERROR_NONE; @@ -600,25 +605,30 @@ static int db_prev_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) return DB_ERROR_NONE; } -static ERTS_INLINE Sint cmp_key(DbTableTree* tb, Eterm key, TreeDbTerm* obj) { - return CMP(key, GETKEY(tb,obj->dbterm.tpl)); +static int db_prev_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) +{ + DbTableTree *tb = &tbl->tree; + DbTreeStack* stack = get_any_stack(tbl, tb); + int res = db_prev_tree_common(p, tbl, tb->root, key, ret, stack); + release_stack(tbl,tb,stack); + return res; } -static ERTS_INLINE int cmp_key_eq(DbTableTree* tb, Eterm key, TreeDbTerm* obj) { +static ERTS_INLINE int cmp_key_eq(DbTableCommon* tb, Eterm key, TreeDbTerm* obj) { Eterm obj_key = GETKEY(tb,obj->dbterm.tpl); return is_same(key, obj_key) || CMP(key, obj_key) == 0; } -static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail) +int db_put_tree_common(DbTableCommon *tb, TreeDbTerm **root, Eterm obj, + int key_clash_fail, DbTableTree *stack_container) { - DbTableTree *tb = &tbl->tree; /* Non recursive insertion in AVL tree, building our own stack */ TreeDbTerm **tstack[STACK_NEED]; int tpos = 0; int dstack[STACK_NEED+1]; int dpos = 0; int state = 0; - TreeDbTerm **this = &tb->root; + TreeDbTerm **this = root; Sint c; Eterm key; int dir; @@ -626,14 +636,14 @@ static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail) key = GETKEY(tb, tuple_val(obj)); - reset_static_stack(tb); + reset_static_stack(stack_container); dstack[dpos++] = DIR_END; for (;;) if (!*this) { /* Found our place */ state = 1; - if (erts_atomic_inc_read_nob(&tb->common.nitems) >= TREE_MAX_ELEMENTS) { - erts_atomic_dec_nob(&tb->common.nitems); + if (erts_atomic_inc_read_nob(&tb->nitems) >= TREE_MAX_ELEMENTS) { + erts_atomic_dec_nob(&tb->nitems); return DB_ERROR_SYSRES; } *this = new_dbterm(tb, obj); @@ -724,9 +734,15 @@ static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail) return DB_ERROR_NONE; } -static int db_get_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) +static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail) { DbTableTree *tb = &tbl->tree; + return db_put_tree_common(&tb->common, &tb->root, obj, key_clash_fail, tb); +} + +int db_get_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm *root, Eterm key, + Eterm *ret, DbTableTree *stack_container) +{ Eterm copy; Eterm *hp, *hend; TreeDbTerm *this; @@ -737,13 +753,13 @@ static int db_get_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) * The list created around it is purely for interface conformance. */ - this = find_node(tb,key); + this = find_node(tb,root,key,stack_container); if (this == NULL) { *ret = NIL; } else { hp = HAlloc(p, this->dbterm.size + 2); hend = hp + this->dbterm.size + 2; - copy = db_copy_object_from_ets(&tb->common, &this->dbterm, &hp, &MSO(p)); + copy = db_copy_object_from_ets(tb, &this->dbterm, &hp, &MSO(p)); *ret = CONS(hp, copy, NIL); hp += 2; HRelease(p,hend,hp); @@ -751,18 +767,28 @@ static int db_get_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) return DB_ERROR_NONE; } -static int db_member_tree(DbTable *tbl, Eterm key, Eterm *ret) +static int db_get_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) { DbTableTree *tb = &tbl->tree; + return db_get_tree_common(p, &tb->common, tb->root, key, ret, tb); +} - *ret = (find_node(tb,key) == NULL) ? am_false : am_true; +int db_member_tree_common(DbTableCommon *tb, TreeDbTerm *root, Eterm key, Eterm *ret, + DbTableTree *stack_container) +{ + *ret = (find_node(tb,root,key,stack_container) == NULL) ? am_false : am_true; return DB_ERROR_NONE; } -static int db_get_element_tree(Process *p, DbTable *tbl, - Eterm key, int ndex, Eterm *ret) +static int db_member_tree(DbTable *tbl, Eterm key, Eterm *ret) { DbTableTree *tb = &tbl->tree; + return db_member_tree_common(&tb->common, tb->root, key, ret, tb); +} + +int db_get_element_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm *root, Eterm key, + int ndex, Eterm *ret, DbTableTree *stack_container) +{ /* * Look the node up: */ @@ -776,49 +802,68 @@ static int db_get_element_tree(Process *p, DbTable *tbl, * around the element here either. */ - this = find_node(tb,key); + this = find_node(tb,root,key,stack_container); if (this == NULL) { return DB_ERROR_BADKEY; } else { if (ndex > arityval(this->dbterm.tpl[0])) { return DB_ERROR_BADPARAM; } - *ret = db_copy_element_from_ets(&tb->common, p, &this->dbterm, ndex, &hp, 0); + *ret = db_copy_element_from_ets(tb, p, &this->dbterm, ndex, &hp, 0); } return DB_ERROR_NONE; } -static int db_erase_tree(DbTable *tbl, Eterm key, Eterm *ret) +static int db_get_element_tree(Process *p, DbTable *tbl, + Eterm key, int ndex, Eterm *ret) { DbTableTree *tb = &tbl->tree; + return db_get_element_tree_common(p, &tb->common, tb->root, key, + ndex, ret, tb); +} + +int db_erase_tree_common(DbTable *tbl, TreeDbTerm **root, Eterm key, Eterm *ret, + DbTreeStack *stack /* NULL if no static stack */) +{ TreeDbTerm *res; *ret = am_true; - if ((res = linkout_tree(tb, key)) != NULL) { - free_term(tb, res); + if ((res = linkout_tree(&tbl->common, root,key, stack)) != NULL) { + free_term(tbl, res); } return DB_ERROR_NONE; } -static int db_erase_object_tree(DbTable *tbl, Eterm object, Eterm *ret) +static int db_erase_tree(DbTable *tbl, Eterm key, Eterm *ret) { DbTableTree *tb = &tbl->tree; + return db_erase_tree_common(tbl, &tb->root, key, ret, &tb->static_stack); +} + +int db_erase_object_tree_common(DbTable *tbl, TreeDbTerm **root, Eterm object, + Eterm *ret, DbTableTree *stack_container) +{ TreeDbTerm *res; *ret = am_true; - if ((res = linkout_object_tree(tb, object)) != NULL) { - free_term(tb, res); + if ((res = linkout_object_tree(&tbl->common, root, object, stack_container)) != NULL) { + free_term(tbl, res); } return DB_ERROR_NONE; } - -static int db_slot_tree(Process *p, DbTable *tbl, - Eterm slot_term, Eterm *ret) +static int db_erase_object_tree(DbTable *tbl, Eterm object, Eterm *ret) { DbTableTree *tb = &tbl->tree; + return db_erase_object_tree_common(tbl, &tb->root, object, ret, tb); +} + +int db_slot_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, + Eterm slot_term, Eterm *ret, + DbTableTree *stack_container) +{ Sint slot; TreeDbTerm *st; Eterm *hp, *hend; @@ -834,10 +879,10 @@ static int db_slot_tree(Process *p, DbTable *tbl, if (is_not_small(slot_term) || ((slot = signed_val(slot_term)) < 0) || - (slot > NITEMS(tb))) + (slot > NITEMS(tbl))) return DB_ERROR_BADPARAM; - if (slot == NITEMS(tb)) { + if (slot == NITEMS(tbl)) { *ret = am_EOT; return DB_ERROR_NONE; } @@ -847,20 +892,27 @@ static int db_slot_tree(Process *p, DbTable *tbl, * are counted from 1 and up. */ ++slot; - st = slot_search(p, tb, slot); + st = slot_search(p, root, slot, tbl, stack_container); if (st == NULL) { *ret = am_false; return DB_ERROR_UNSPEC; } hp = HAlloc(p, st->dbterm.size + 2); hend = hp + st->dbterm.size + 2; - copy = db_copy_object_from_ets(&tb->common, &st->dbterm, &hp, &MSO(p)); + copy = db_copy_object_from_ets(&tbl->common, &st->dbterm, &hp, &MSO(p)); *ret = CONS(hp, copy, NIL); hp += 2; HRelease(p,hend,hp); return DB_ERROR_NONE; } +static int db_slot_tree(Process *p, DbTable *tbl, + Eterm slot_term, Eterm *ret) +{ + DbTableTree *tb = &tbl->tree; + return db_slot_tree_common(p, tbl, tb->root, slot_term, ret, tb); +} + static BIF_RETTYPE ets_select_reverse(BIF_ALIST_3) @@ -926,19 +978,14 @@ static BIF_RETTYPE bif_trap3(Export *bif, { BIF_TRAP3(bif, p, p1, p2, p3); } - -/* -** This is called either when the select bif traps or when ets:select/1 -** is called. It does mostly the same as db_select_tree and may in either case -** trap to itself again (via the ets:select/1 bif). -** Note that this is common for db_select_tree and db_select_chunk_tree. -*/ -static int db_select_continue_tree(Process *p, - DbTable *tbl, - Eterm continuation, - Eterm *ret) + +int db_select_continue_tree_common(Process *p, + DbTableCommon *tb, + TreeDbTerm **root, + Eterm continuation, + Eterm *ret, + DbTableTree *stack_container) { - DbTableTree *tb = &tbl->tree; DbTreeStack* stack; struct select_context sc; unsigned sz; @@ -980,26 +1027,26 @@ static int db_select_continue_tree(Process *p, sc.end_condition = NIL; sc.lastobj = NULL; sc.max = 1000; - sc.keypos = tb->common.keypos; + sc.keypos = tb->keypos; sc.chunk_size = chunk_size; reverse = unsigned_val(tptr[7]); sc.got = signed_val(tptr[8]); - stack = get_any_stack(tb); + stack = get_any_stack((DbTable*)tb, stack_container); if (chunk_size) { if (reverse) { - traverse_backwards(tb, stack, lastkey, &doit_select_chunk, &sc); + traverse_backwards(tb, root, stack, lastkey, &doit_select_chunk, &sc); } else { - traverse_forward(tb, stack, lastkey, &doit_select_chunk, &sc); + traverse_forward(tb, root, stack, lastkey, &doit_select_chunk, &sc); } } else { if (reverse) { - traverse_forward(tb, stack, lastkey, &doit_select, &sc); + traverse_forward(tb, root, stack, lastkey, &doit_select, &sc); } else { - traverse_backwards(tb, stack, lastkey, &doit_select, &sc); + traverse_backwards(tb, root, stack, lastkey, &doit_select, &sc); } } - release_stack(tb,stack); + release_stack((DbTable*)tb,stack_container,stack); BUMP_REDS(p, 1000 - sc.max); @@ -1082,13 +1129,28 @@ static int db_select_continue_tree(Process *p, #undef RET_TO_BIF } + +/* +** This is called either when the select bif traps or when ets:select/1 +** is called. It does mostly the same as db_select_tree and may in either case +** trap to itself again (via the ets:select/1 bif). +** Note that this is common for db_select_tree and db_select_chunk_tree. +*/ +static int db_select_continue_tree(Process *p, + DbTable *tbl, + Eterm continuation, + Eterm *ret) +{ + DbTableTree *tb = &tbl->tree; + return db_select_continue_tree_common(p, &tb->common, &tb->root, + continuation, ret, tb); +} - -static int db_select_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, int reverse, Eterm *ret) +int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, + Eterm tid, Eterm pattern, int reverse, Eterm *ret, + DbTableTree *stack_container) { /* Strategy: Traverse backwards to build resulting list from tail to head */ - DbTableTree *tb = &tbl->tree; DbTreeStack* stack; struct select_context sc; struct mp_info mpi; @@ -1117,11 +1179,11 @@ static int db_select_tree(Process *p, DbTable *tbl, Eterm tid, sc.p = p; sc.max = 1000; sc.end_condition = NIL; - sc.keypos = tb->common.keypos; + sc.keypos = tb->keypos; sc.got = 0; sc.chunk_size = 0; - if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(tb, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } @@ -1138,25 +1200,25 @@ static int db_select_tree(Process *p, DbTable *tbl, Eterm tid, RET_TO_BIF(sc.accum,DB_ERROR_NONE); } - stack = get_any_stack(tb); + stack = get_any_stack((DbTable*)tb,stack_container); if (reverse) { if (mpi.some_limitation) { - if ((this = find_prev_from_pb_key(tb, stack, mpi.least)) != NULL) { + if ((this = find_prev_from_pb_key(tb, *root, stack, mpi.least)) != NULL) { lastkey = GETKEY(tb, this->dbterm.tpl); } sc.end_condition = mpi.most; } - traverse_forward(tb, stack, lastkey, &doit_select, &sc); + traverse_forward(tb, root, stack, lastkey, &doit_select, &sc); } else { if (mpi.some_limitation) { - if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) { + if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) { lastkey = GETKEY(tb, this->dbterm.tpl); } sc.end_condition = mpi.least; } - traverse_backwards(tb, stack, lastkey, &doit_select, &sc); + traverse_backwards(tb, root, stack, lastkey, &doit_select, &sc); } - release_stack(tb,stack); + release_stack((DbTable*)tb,stack_container,stack); #ifdef HARDDEBUG erts_fprintf(stderr,"Least: %T\n", mpi.least); erts_fprintf(stderr,"Most: %T\n", mpi.most); @@ -1192,16 +1254,21 @@ static int db_select_tree(Process *p, DbTable *tbl, Eterm tid, } - -/* -** This is called either when the select_count bif traps. -*/ -static int db_select_count_continue_tree(Process *p, - DbTable *tbl, - Eterm continuation, - Eterm *ret) +static int db_select_tree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, int reverse, Eterm *ret) { DbTableTree *tb = &tbl->tree; + return db_select_tree_common(p, &tb->common, &tb->root, tid, + pattern, reverse, ret, tb); +} + +int db_select_count_continue_tree_common(Process *p, + DbTableCommon *tb, + TreeDbTerm **root, + Eterm continuation, + Eterm *ret, + DbTableTree *stack_container) +{ DbTreeStack* stack; struct select_count_context sc; unsigned sz; @@ -1238,16 +1305,16 @@ static int db_select_count_continue_tree(Process *p, sc.end_condition = NIL; sc.lastobj = NULL; sc.max = 1000; - sc.keypos = tb->common.keypos; + sc.keypos = tb->keypos; if (is_big(tptr[5])) { sc.got = big_to_uint32(tptr[5]); } else { sc.got = unsigned_val(tptr[5]); } - stack = get_any_stack(tb); - traverse_backwards(tb, stack, lastkey, &doit_select_count, &sc); - release_stack(tb,stack); + stack = get_any_stack((DbTable*)tb, stack_container); + traverse_backwards(tb, root, stack, lastkey, &doit_select_count, &sc); + release_stack((DbTable*)tb,stack_container,stack); BUMP_REDS(p, 1000 - sc.max); @@ -1285,11 +1352,24 @@ static int db_select_count_continue_tree(Process *p, #undef RET_TO_BIF } - -static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) +/* +** This is called either when the select_count bif traps. +*/ +static int db_select_count_continue_tree(Process *p, + DbTable *tbl, + Eterm continuation, + Eterm *ret) { DbTableTree *tb = &tbl->tree; + return db_select_count_continue_tree_common(p, &tb->common, &tb->root, + continuation, ret, tb); +} + + +int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, + Eterm tid, Eterm pattern, Eterm *ret, + DbTableTree *stack_container) +{ DbTreeStack* stack; struct select_count_context sc; struct mp_info mpi; @@ -1318,10 +1398,10 @@ static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid, sc.p = p; sc.max = 1000; sc.end_condition = NIL; - sc.keypos = tb->common.keypos; + sc.keypos = tb->keypos; sc.got = 0; - if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(tb, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } @@ -1338,16 +1418,16 @@ static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid, RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE); } - stack = get_any_stack(tb); + stack = get_any_stack((DbTable*)tb, stack_container); if (mpi.some_limitation) { - if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) { + if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) { lastkey = GETKEY(tb, this->dbterm.tpl); } sc.end_condition = mpi.least; } - traverse_backwards(tb, stack, lastkey, &doit_select_count, &sc); - release_stack(tb,stack); + traverse_backwards(tb, root, stack, lastkey, &doit_select_count, &sc); + release_stack((DbTable*)tb,stack_container,stack); BUMP_REDS(p, 1000 - sc.max); if (sc.max > 0) { RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE); @@ -1383,12 +1463,20 @@ static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid, } -static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Sint chunk_size, - int reverse, - Eterm *ret) +static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret) { DbTableTree *tb = &tbl->tree; + return db_select_count_tree_common(p, &tb->common, &tb->root, + tid, pattern, ret, tb); +} + + +int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, + Eterm tid, Eterm pattern, Sint chunk_size, + int reverse, Eterm *ret, + DbTableTree *stack_container) +{ DbTreeStack* stack; struct select_context sc; struct mp_info mpi; @@ -1417,11 +1505,11 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid, sc.p = p; sc.max = 1000; sc.end_condition = NIL; - sc.keypos = tb->common.keypos; + sc.keypos = tb->keypos; sc.got = 0; sc.chunk_size = chunk_size; - if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(tb, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } @@ -1443,25 +1531,25 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid, } } - stack = get_any_stack(tb); + stack = get_any_stack((DbTable*)tb,stack_container); if (reverse) { if (mpi.some_limitation) { - if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) { + if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) { lastkey = GETKEY(tb, this->dbterm.tpl); } sc.end_condition = mpi.least; } - traverse_backwards(tb, stack, lastkey, &doit_select_chunk, &sc); + traverse_backwards(tb, root, stack, lastkey, &doit_select_chunk, &sc); } else { if (mpi.some_limitation) { - if ((this = find_prev_from_pb_key(tb, stack, mpi.least)) != NULL) { + if ((this = find_prev_from_pb_key(tb, *root, stack, mpi.least)) != NULL) { lastkey = GETKEY(tb, this->dbterm.tpl); } sc.end_condition = mpi.most; } - traverse_forward(tb, stack, lastkey, &doit_select_chunk, &sc); + traverse_forward(tb, root, stack, lastkey, &doit_select_chunk, &sc); } - release_stack(tb,stack); + release_stack((DbTable*)tb,stack_container,stack); BUMP_REDS(p, 1000 - sc.max); if (sc.max > 0 || sc.got == chunk_size) { @@ -1530,15 +1618,25 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid, } -/* -** This is called when select_delete traps -*/ -static int db_select_delete_continue_tree(Process *p, - DbTable *tbl, - Eterm continuation, - Eterm *ret) +static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Sint chunk_size, + int reverse, + Eterm *ret) { DbTableTree *tb = &tbl->tree; + return db_select_chunk_tree_common(p, &tb->common, &tb->root, + tid, pattern, chunk_size, + reverse, ret, tb); +} + + +int db_select_delete_continue_tree_common(Process *p, + DbTable *tbl, + TreeDbTerm **root, + Eterm continuation, + Eterm *ret, + DbTreeStack* stack) +{ struct select_delete_context sc; unsigned sz; Eterm *hp; @@ -1552,7 +1650,7 @@ static int db_select_delete_continue_tree(Process *p, #define RET_TO_BIF(Term, State) do { \ if (sc.erase_lastterm) { \ - free_term(tb, sc.lastterm); \ + free_term(tbl, sc.lastterm); \ } \ *ret = (Term); \ return State; \ @@ -1571,7 +1669,9 @@ static int db_select_delete_continue_tree(Process *p, mp = erts_db_get_match_prog_binary_unchecked(tptr[4]); sc.p = p; - sc.tb = tb; + sc.tb = &tbl->common; + sc.root = root; + sc.stack = stack; if (is_big(tptr[5])) { sc.accum = big_to_uint32(tptr[5]); } else { @@ -1580,17 +1680,16 @@ static int db_select_delete_continue_tree(Process *p, sc.mp = mp; sc.end_condition = NIL; sc.max = 1000; - sc.keypos = tb->common.keypos; + sc.keypos = tbl->common.keypos; - ASSERT(!erts_atomic_read_nob(&tb->is_stack_busy)); - traverse_backwards(tb, &tb->static_stack, lastkey, &doit_select_delete, &sc); + traverse_backwards(&tbl->common, root, stack, lastkey, &doit_select_delete, &sc); BUMP_REDS(p, 1000 - sc.max); if (sc.max > 0) { RET_TO_BIF(erts_make_integer(sc.accum, p), DB_ERROR_NONE); } - key = GETKEY(tb, (sc.lastterm)->dbterm.tpl); + key = GETKEY(&tbl->common, (sc.lastterm)->dbterm.tpl); if (end_condition != NIL && cmp_partly_bound(end_condition,key) > 0) { /* done anyway */ RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE); @@ -1620,10 +1719,24 @@ static int db_select_delete_continue_tree(Process *p, #undef RET_TO_BIF } -static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) +static int db_select_delete_continue_tree(Process *p, + DbTable *tbl, + Eterm continuation, + Eterm *ret) { DbTableTree *tb = &tbl->tree; + ASSERT(!erts_atomic_read_nob(&tb->is_stack_busy)); + return db_select_delete_continue_tree_common(p, tbl, &tb->root, + continuation, ret, + &tb->static_stack); +} + +int db_select_delete_tree_common(Process *p, DbTable *tbl, + TreeDbTerm **root, + Eterm tid, Eterm pattern, + Eterm *ret, + DbTreeStack* stack) +{ struct select_delete_context sc; struct mp_info mpi; Eterm lastkey = THE_NON_VALUE; @@ -1641,7 +1754,7 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid, erts_bin_free(mpi.mp); \ } \ if (sc.erase_lastterm) { \ - free_term(tb, sc.lastterm); \ + free_term(tbl, sc.lastterm); \ } \ *ret = (Term); \ return RetVal; \ @@ -1655,10 +1768,12 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid, sc.p = p; sc.max = 1000; sc.end_condition = NIL; - sc.keypos = tb->common.keypos; - sc.tb = tb; + sc.keypos = tbl->common.keypos; + sc.tb = &tbl->common; + sc.root = root; + sc.stack = stack; - if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(&tbl->common, root, pattern, NULL, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(0,errcode); } @@ -1671,26 +1786,26 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid, if (!mpi.got_partial && mpi.some_limitation && CMP_EQ(mpi.least,mpi.most)) { - doit_select_delete(tb,*(mpi.save_term),&sc, 0 /* direction doesn't + doit_select_delete(&tbl->common,*(mpi.save_term),&sc, 0 /* direction doesn't matter */); RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE); } if (mpi.some_limitation) { - if ((this = find_next_from_pb_key(tb, &tb->static_stack, mpi.most)) != NULL) { - lastkey = GETKEY(tb, this->dbterm.tpl); + if ((this = find_next_from_pb_key(&tbl->common, *root, stack, mpi.most)) != NULL) { + lastkey = GETKEY(&tbl->common, this->dbterm.tpl); } sc.end_condition = mpi.least; } - traverse_backwards(tb, &tb->static_stack, lastkey, &doit_select_delete, &sc); + traverse_backwards(&tbl->common, root, stack, lastkey, &doit_select_delete, &sc); BUMP_REDS(p, 1000 - sc.max); if (sc.max > 0) { RET_TO_BIF(erts_make_integer(sc.accum,p), DB_ERROR_NONE); } - key = GETKEY(tb, (sc.lastterm)->dbterm.tpl); + key = GETKEY(&tbl->common, (sc.lastterm)->dbterm.tpl); sz = size_object(key); if (IS_USMALL(0, sc.accum)) { hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6); @@ -1714,7 +1829,7 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid, /* Don't free mpi.mp, so don't use macro */ if (sc.erase_lastterm) { - free_term(tb, sc.lastterm); + free_term(tbl, sc.lastterm); } *ret = bif_trap1(&ets_select_delete_continue_exp, p, continuation); return DB_ERROR_NONE; @@ -1723,12 +1838,21 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid, } -static int db_select_replace_continue_tree(Process *p, - DbTable *tbl, - Eterm continuation, - Eterm *ret) +static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret) { DbTableTree *tb = &tbl->tree; + return db_select_delete_tree_common(p, tbl, &tb->root, tid, pattern, ret, + &tb->static_stack); +} + +int db_select_replace_continue_tree_common(Process *p, + DbTableCommon *tb, + TreeDbTerm **root, + Eterm continuation, + Eterm *ret, + DbTableTree *stack_container) +{ DbTreeStack* stack; struct select_replace_context sc; unsigned sz; @@ -1764,7 +1888,7 @@ static int db_select_replace_continue_tree(Process *p, sc.end_condition = NIL; sc.lastobj = NULL; sc.max = 1000; - sc.keypos = tb->common.keypos; + sc.keypos = tb->keypos; if (is_big(tptr[5])) { sc.replaced = big_to_uint32(tptr[5]); } else { @@ -1772,9 +1896,9 @@ static int db_select_replace_continue_tree(Process *p, } prev_replaced = sc.replaced; - stack = get_any_stack(tb); - traverse_update_backwards(tb, stack, lastkey, &doit_select_replace, &sc); - release_stack(tb,stack); + stack = get_any_stack((DbTable*)tb,stack_container); + traverse_update_backwards(tb, root, stack, lastkey, &doit_select_replace, &sc); + release_stack((DbTable*)tb,stack_container,stack); // the more objects we've replaced, the more reductions we've consumed BUMP_REDS(p, MIN(2000, (1000 - sc.max) + (sc.replaced - prev_replaced))); @@ -1813,10 +1937,20 @@ static int db_select_replace_continue_tree(Process *p, #undef RET_TO_BIF } -static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) +static int db_select_replace_continue_tree(Process *p, + DbTable *tbl, + Eterm continuation, + Eterm *ret) { DbTableTree *tb = &tbl->tree; + return db_select_replace_continue_tree_common(p, &tb->common, &tb->root, + continuation, ret, tb); +} + +int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, + Eterm tid, Eterm pattern, Eterm *ret, + DbTableTree *stack_container) +{ DbTreeStack* stack; struct select_replace_context sc; struct mp_info mpi; @@ -1844,12 +1978,13 @@ static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid, sc.lastobj = NULL; sc.p = p; sc.tb = tb; + sc.root = root; sc.max = 1000; sc.end_condition = NIL; - sc.keypos = tb->common.keypos; + sc.keypos = tb->keypos; sc.replaced = 0; - if ((errcode = analyze_pattern(tb, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(tb, root, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } @@ -1860,7 +1995,7 @@ static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid, sc.mp = mpi.mp; - stack = get_static_stack(tb); + stack = get_static_stack(stack_container); if (!mpi.got_partial && mpi.some_limitation && CMP_EQ(mpi.least,mpi.most)) { TreeDbTerm* term = *(mpi.save_term); @@ -1869,23 +2004,23 @@ static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid, if (TOP_NODE(stack) == term) // throw away potentially invalid reference REPLACE_TOP_NODE(stack, *(mpi.save_term)); - release_stack(tb, stack); + release_stack((DbTable*)tb,stack_container, stack); } RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE); } if (stack == NULL) - stack = get_any_stack(tb); + stack = get_any_stack((DbTable*)tb,stack_container); if (mpi.some_limitation) { - if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) { + if ((this = find_next_from_pb_key(tb, *root, stack, mpi.most)) != NULL) { lastkey = GETKEY(tb, this->dbterm.tpl); } sc.end_condition = mpi.least; } - traverse_update_backwards(tb, stack, lastkey, &doit_select_replace, &sc); - release_stack(tb,stack); + traverse_update_backwards(tb, root, stack, lastkey, &doit_select_replace, &sc); + release_stack((DbTable*)tb,stack_container,stack); // the more objects we've replaced, the more reductions we've consumed BUMP_REDS(p, MIN(2000, (1000 - sc.max) + sc.replaced)); if (sc.max > 0) { @@ -1922,52 +2057,73 @@ static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid, } -static int db_take_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) +static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret) { DbTableTree *tb = &tbl->tree; + return db_select_replace_tree_common(p, &tb->common, &tb->root, + tid, pattern, ret, tb); +} + +int db_take_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root, + Eterm key, Eterm *ret, + DbTreeStack *stack /* NULL if no static stack */) +{ TreeDbTerm *this; *ret = NIL; - this = linkout_tree(tb, key); + this = linkout_tree(&tbl->common, root, key, stack); if (this) { Eterm copy, *hp, *hend; hp = HAlloc(p, this->dbterm.size + 2); hend = hp + this->dbterm.size + 2; - copy = db_copy_object_from_ets(&tb->common, + copy = db_copy_object_from_ets(&tbl->common, &this->dbterm, &hp, &MSO(p)); *ret = CONS(hp, copy, NIL); hp += 2; HRelease(p, hend, hp); - free_term(tb, this); + free_term(tbl, this); } return DB_ERROR_NONE; } +static int db_take_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) +{ + DbTableTree *tb = &tbl->tree; + return db_take_tree_common(p, tbl, &tb->root, + key, ret, &tb->static_stack); +} + /* ** Other interface routines (not directly coupled to one bif) */ - -/* Display tree contents (for dump) */ -static void db_print_tree(fmtfn_t to, void *to_arg, - int show, - DbTable *tbl) +void db_print_tree_common(fmtfn_t to, void *to_arg, + int show, TreeDbTerm *root, DbTable *tbl) { - DbTableTree *tb = &tbl->tree; #ifdef TREE_DEBUG if (show) erts_print(to, to_arg, "\nTree data dump:\n" "------------------------------------------------\n"); - do_dump_tree2(&tbl->tree, to, to_arg, show, tb->root, 0); + do_dump_tree2(&tbl->common, to, to_arg, show, root, 0); if (show) erts_print(to, to_arg, "\n" "------------------------------------------------\n"); #else - erts_print(to, to_arg, "Ordered set (AVL tree), Elements: %d\n", NITEMS(tb)); + erts_print(to, to_arg, "Ordered set (AVL tree), Elements: %d\n", NITEMS(tbl)); #endif } +/* Display tree contents (for dump) */ +static void db_print_tree(fmtfn_t to, void *to_arg, + int show, + DbTable *tbl) +{ + DbTableTree *tb = &tbl->tree; + db_print_tree_common(to, to_arg, show, tb->root, tbl); +} + /* release all memory occupied by a single table */ static int db_free_empty_table_tree(DbTable *tbl) { @@ -1992,8 +2148,10 @@ static SWord db_free_table_continue_tree(DbTable *tbl, SWord reds) (DbTable *) tb, (void *) tb->static_stack.array, sizeof(TreeDbTerm *) * STACK_NEED); - ASSERT(erts_atomic_read_nob(&tb->common.memory_size) - == sizeof(DbTable)); + ASSERT((erts_atomic_read_nob(&tb->common.memory_size) + == sizeof(DbTable)) || + (erts_atomic_read_nob(&tb->common.memory_size) + == (sizeof(DbTable) + sizeof(DbFixation)))); } return reds; } @@ -2012,11 +2170,18 @@ static void do_db_tree_foreach_offheap(TreeDbTerm *, void (*)(ErlOffHeap *, void *), void *); +void db_foreach_offheap_tree_common(TreeDbTerm *root, + void (*func)(ErlOffHeap *, void *), + void * arg) +{ + do_db_tree_foreach_offheap(root, func, arg); +} + static void db_foreach_offheap_tree(DbTable *tbl, void (*func)(ErlOffHeap *, void *), void * arg) { - do_db_tree_foreach_offheap(tbl->tree.root, func, arg); + db_foreach_offheap_tree_common(tbl->tree.root, func, arg); } @@ -2041,13 +2206,14 @@ do_db_tree_foreach_offheap(TreeDbTerm *tdbt, do_db_tree_foreach_offheap(tdbt->right, func, arg); } -static TreeDbTerm *linkout_tree(DbTableTree *tb, Eterm key) { +static TreeDbTerm *linkout_tree(DbTableCommon *tb, TreeDbTerm **root, + Eterm key, DbTreeStack *stack) { TreeDbTerm **tstack[STACK_NEED]; int tpos = 0; int dstack[STACK_NEED+1]; int dpos = 0; int state = 0; - TreeDbTerm **this = &tb->root; + TreeDbTerm **this = root; Sint c; int dir; TreeDbTerm *q = NULL; @@ -2058,7 +2224,7 @@ static TreeDbTerm *linkout_tree(DbTableTree *tb, Eterm key) { * keep the balance. As in insert, we do the stacking ourselves. */ - reset_static_stack(tb); + reset_stack(stack); dstack[dpos++] = DIR_END; for (;;) { if (!*this) { /* Failure */ @@ -2084,30 +2250,30 @@ static TreeDbTerm *linkout_tree(DbTableTree *tb, Eterm key) { tstack[tpos++] = this; state = delsub(this); } - erts_atomic_dec_nob(&tb->common.nitems); + erts_atomic_dec_nob(&tb->nitems); break; } } while (state && ( dir = dstack[--dpos] ) != DIR_END) { this = tstack[--tpos]; if (dir == DIR_LEFT) { - state = balance_left(this); + state = tree_balance_left(this); } else { - state = balance_right(this); + state = tree_balance_right(this); } } return q; } -static TreeDbTerm *linkout_object_tree(DbTableTree *tb, - Eterm object) +static TreeDbTerm *linkout_object_tree(DbTableCommon *tb, TreeDbTerm **root, + Eterm object, DbTableTree *stack) { TreeDbTerm **tstack[STACK_NEED]; int tpos = 0; int dstack[STACK_NEED+1]; int dpos = 0; int state = 0; - TreeDbTerm **this = &tb->root; + TreeDbTerm **this = root; Sint c; int dir; TreeDbTerm *q = NULL; @@ -2122,7 +2288,7 @@ static TreeDbTerm *linkout_object_tree(DbTableTree *tb, key = GETKEY(tb, tuple_val(object)); - reset_static_stack(tb); + reset_static_stack(stack); dstack[dpos++] = DIR_END; for (;;) { if (!*this) { /* Failure */ @@ -2136,7 +2302,7 @@ static TreeDbTerm *linkout_object_tree(DbTableTree *tb, tstack[tpos++] = this; this = &((*this)->right); } else { /* Equal key, found the only possible matching object*/ - if (!db_eq(&tb->common,object,&(*this)->dbterm)) { + if (!db_eq(tb,object,&(*this)->dbterm)) { return NULL; } q = (*this); @@ -2151,16 +2317,16 @@ static TreeDbTerm *linkout_object_tree(DbTableTree *tb, tstack[tpos++] = this; state = delsub(this); } - erts_atomic_dec_nob(&tb->common.nitems); + erts_atomic_dec_nob(&tb->nitems); break; } } while (state && ( dir = dstack[--dpos] ) != DIR_END) { this = tstack[--tpos]; if (dir == DIR_LEFT) { - state = balance_left(this); + state = tree_balance_left(this); } else { - state = balance_right(this); + state = tree_balance_right(this); } } return q; @@ -2170,7 +2336,7 @@ static TreeDbTerm *linkout_object_tree(DbTableTree *tb, ** For the select functions, analyzes the pattern and determines which ** part of the tree should be searched. Also compiles the match program */ -static int analyze_pattern(DbTableTree *tb, Eterm pattern, +static int analyze_pattern(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern, extra_match_validator_t extra_validator, /* Optional callback */ struct mp_info *mpi) { @@ -2231,7 +2397,7 @@ static int analyze_pattern(DbTableTree *tb, Eterm pattern, guards[i] = guard = ptpl[2]; bodies[i] = body = ptpl[3]; - if(extra_validator != NULL && !extra_validator(tb->common.keypos, match, guard, body)) { + if(extra_validator != NULL && !extra_validator(tb->keypos, match, guard, body)) { if (buff != sbuff) { erts_free(ERTS_ALC_T_DB_TMP, buff); } @@ -2244,7 +2410,7 @@ static int analyze_pattern(DbTableTree *tb, Eterm pattern, ++i; partly_bound = NIL; - res = key_given(tb, tpl, &(mpi->save_term), &partly_bound); + res = key_given(tb, root, tpl, &(mpi->save_term), &partly_bound); if ( res >= 0 ) { /* Can match something */ key = 0; mpi->something_can_match = 1; @@ -2308,7 +2474,7 @@ static SWord do_free_tree_continue(DbTableTree *tb, SWord reds) PUSH_NODE(&tb->static_stack, root); root = p; } else { - free_term(tb, root); + free_term((DbTable*)tb, root); if (--reds < 0) { return reds; /* Done enough for now */ } @@ -2322,7 +2488,7 @@ static SWord do_free_tree_continue(DbTableTree *tb, SWord reds) /* * Deletion helpers */ -static int balance_left(TreeDbTerm **this) +int tree_balance_left(TreeDbTerm **this) { TreeDbTerm *p, *p1, *p2; int b1, b2, h = 1; @@ -2367,7 +2533,7 @@ static int balance_left(TreeDbTerm **this) return h; } -static int balance_right(TreeDbTerm **this) +int tree_balance_right(TreeDbTerm **this) { TreeDbTerm *p, *p1, *p2; int b1, b2, h = 1; @@ -2439,7 +2605,7 @@ static int delsub(TreeDbTerm **this) h = 1; while (tpos && h) { r = tstack[--tpos]; - h = balance_right(r); + h = tree_balance_right(r); } return h; } @@ -2448,11 +2614,13 @@ static int delsub(TreeDbTerm **this) * Helper for db_slot */ -static TreeDbTerm *slot_search(Process *p, DbTableTree *tb, Sint slot) +static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root, + Sint slot, DbTable *tb, + DbTableTree *stack_container) { TreeDbTerm *this; TreeDbTerm *tmp; - DbTreeStack* stack = get_any_stack(tb); + DbTreeStack* stack = get_any_stack(tb,stack_container); ASSERT(stack != NULL); if (slot == 1) { /* Don't search from where we are if we are @@ -2465,7 +2633,7 @@ static TreeDbTerm *slot_search(Process *p, DbTableTree *tb, Sint slot) stack->pos = 0; } if (EMPTY_NODE(stack)) { - this = tb->root; + this = root; if (this == NULL) goto done; while (this->left != NULL){ @@ -2514,7 +2682,7 @@ static TreeDbTerm *slot_search(Process *p, DbTableTree *tb, Sint slot) } } done: - release_stack(tb,stack); + release_stack(tb,stack_container,stack); return this; } @@ -2522,7 +2690,8 @@ done: * Find next and previous in sort order */ -static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack* stack, Eterm key) { +static TreeDbTerm *find_next(DbTableCommon *tb, TreeDbTerm *root, + DbTreeStack* stack, Eterm key) { TreeDbTerm *this; TreeDbTerm *tmp; Sint c; @@ -2534,7 +2703,7 @@ static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack* stack, Eterm key) { } } if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */ - if (( this = tb->root ) == NULL) + if (( this = root ) == NULL) return NULL; for (;;) { PUSH_NODE(stack, this); @@ -2578,7 +2747,8 @@ static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack* stack, Eterm key) { return this; } -static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack* stack, Eterm key) { +static TreeDbTerm *find_prev(DbTableCommon *tb, TreeDbTerm *root, + DbTreeStack* stack, Eterm key) { TreeDbTerm *this; TreeDbTerm *tmp; Sint c; @@ -2590,7 +2760,7 @@ static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack* stack, Eterm key) { } } if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */ - if (( this = tb->root ) == NULL) + if (( this = root ) == NULL) return NULL; for (;;) { PUSH_NODE(stack, this); @@ -2634,8 +2804,8 @@ static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack* stack, Eterm key) { return this; } -static TreeDbTerm *find_next_from_pb_key(DbTableTree *tb, DbTreeStack* stack, - Eterm key) +static TreeDbTerm *find_next_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, + DbTreeStack* stack, Eterm key) { TreeDbTerm *this; TreeDbTerm *tmp; @@ -2643,7 +2813,7 @@ static TreeDbTerm *find_next_from_pb_key(DbTableTree *tb, DbTreeStack* stack, /* spool the stack, we have to "re-search" */ stack->pos = stack->slot = 0; - if (( this = tb->root ) == NULL) + if (( this = root ) == NULL) return NULL; for (;;) { PUSH_NODE(stack, this); @@ -2667,8 +2837,8 @@ static TreeDbTerm *find_next_from_pb_key(DbTableTree *tb, DbTreeStack* stack, } } -static TreeDbTerm *find_prev_from_pb_key(DbTableTree *tb, DbTreeStack* stack, - Eterm key) +static TreeDbTerm *find_prev_from_pb_key(DbTableCommon *tb, TreeDbTerm *root, + DbTreeStack* stack, Eterm key) { TreeDbTerm *this; TreeDbTerm *tmp; @@ -2676,7 +2846,7 @@ static TreeDbTerm *find_prev_from_pb_key(DbTableTree *tb, DbTreeStack* stack, /* spool the stack, we have to "re-search" */ stack->pos = stack->slot = 0; - if (( this = tb->root ) == NULL) + if (( this = root ) == NULL) return NULL; for (;;) { PUSH_NODE(stack, this); @@ -2704,16 +2874,17 @@ static TreeDbTerm *find_prev_from_pb_key(DbTableTree *tb, DbTreeStack* stack, /* * Just lookup a node */ -static TreeDbTerm *find_node(DbTableTree *tb, Eterm key) +static TreeDbTerm *find_node(DbTableCommon *tb, TreeDbTerm *root, + Eterm key, DbTableTree *stack_container) { TreeDbTerm *this; Sint res; - DbTreeStack* stack = get_static_stack(tb); + DbTreeStack* stack = get_static_stack(stack_container); if(!stack || EMPTY_NODE(stack) || !cmp_key_eq(tb, key, (this=TOP_NODE(stack)))) { - this = tb->root; + this = root; while (this != NULL && (res = cmp_key(tb,key,this)) != 0) { if (res < 0) this = this->left; @@ -2722,7 +2893,7 @@ static TreeDbTerm *find_node(DbTableTree *tb, Eterm key) } } if (stack) { - release_stack(tb,stack); + release_stack((DbTable*)tb,stack_container,stack); } return this; } @@ -2730,12 +2901,12 @@ static TreeDbTerm *find_node(DbTableTree *tb, Eterm key) /* * Lookup a node and return the address of the node pointer in the tree */ -static TreeDbTerm **find_node2(DbTableTree *tb, Eterm key) +static TreeDbTerm **find_node2(DbTableCommon *tb, TreeDbTerm **root, Eterm key) { TreeDbTerm **this; Sint res; - this = &tb->root; + this = root; while ((*this) != NULL && (res = cmp_key(tb, key, *this)) != 0) { if (res < 0) this = &((*this)->left); @@ -2752,7 +2923,8 @@ static TreeDbTerm **find_node2(DbTableTree *tb, Eterm key) * Tries to reuse the existing stack for performance. */ -static TreeDbTerm **find_ptr(DbTableTree *tb, DbTreeStack *stack, TreeDbTerm *this) { +static TreeDbTerm **find_ptr(DbTableCommon *tb, TreeDbTerm **root, + DbTreeStack *stack, TreeDbTerm *this) { Eterm key = GETKEY(tb, this->dbterm.tpl); TreeDbTerm *tmp; TreeDbTerm *parent; @@ -2765,7 +2937,7 @@ static TreeDbTerm **find_ptr(DbTableTree *tb, DbTreeStack *stack, TreeDbTerm *th } } if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */ - if (( tmp = tb->root ) == NULL) + if (( tmp = *root ) == NULL) return NULL; for (;;) { PUSH_NODE(stack, tmp); @@ -2791,7 +2963,7 @@ static TreeDbTerm **find_ptr(DbTableTree *tb, DbTreeStack *stack, TreeDbTerm *th parent = TOPN_NODE(stack, 1); if (parent == NULL) - return ((this != tb->root) ? NULL : &(tb->root)); + return ((this != *root) ? NULL : root); if (parent->left == this) return &(parent->left); if (parent->right == this) @@ -2799,12 +2971,11 @@ static TreeDbTerm **find_ptr(DbTableTree *tb, DbTreeStack *stack, TreeDbTerm *th return NULL; } -static int -db_lookup_dbterm_tree(Process *p, DbTable *tbl, Eterm key, Eterm obj, - DbUpdateHandle* handle) +int db_lookup_dbterm_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root, + Eterm key, Eterm obj, DbUpdateHandle* handle, + DbTableTree *stack_container) { - DbTableTree *tb = &tbl->tree; - TreeDbTerm **pp = find_node2(tb, key); + TreeDbTerm **pp = find_node2(&tbl->common, root, key); int flags = 0; if (pp == NULL) { @@ -2815,18 +2986,19 @@ db_lookup_dbterm_tree(Process *p, DbTable *tbl, Eterm key, Eterm obj, int arity = arityval(*objp); Eterm *htop, *hend; - ASSERT(arity >= tb->common.keypos); + ASSERT(arity >= tbl->common.keypos); htop = HAlloc(p, arity + 1); hend = htop + arity + 1; sys_memcpy(htop, objp, sizeof(Eterm) * (arity + 1)); - htop[tb->common.keypos] = key; + htop[tbl->common.keypos] = key; obj = make_tuple(htop); - if (db_put_tree(tbl, obj, 1) != DB_ERROR_NONE) { + if (db_put_tree_common(&tbl->common, root, + obj, 1, stack_container) != DB_ERROR_NONE) { return 0; } - pp = find_node2(tb, key); + pp = find_node2(&tbl->common, root, key); ASSERT(pp != NULL); HRelease(p, hend, htop); flags |= DB_NEW_OBJECT; @@ -2841,21 +3013,28 @@ db_lookup_dbterm_tree(Process *p, DbTable *tbl, Eterm key, Eterm obj, return 1; } -static void -db_finalize_dbterm_tree(int cret, DbUpdateHandle *handle) +static int +db_lookup_dbterm_tree(Process *p, DbTable *tbl, Eterm key, Eterm obj, + DbUpdateHandle* handle) { - DbTable *tbl = handle->tb; DbTableTree *tb = &tbl->tree; + return db_lookup_dbterm_tree_common(p, tbl, &tb->root, key, obj, handle, tb); +} + +void db_finalize_dbterm_tree_common(int cret, DbUpdateHandle *handle, + DbTableTree *stack_container) +{ + DbTable *tbl = handle->tb; TreeDbTerm *bp = (TreeDbTerm *) *handle->bp; if (handle->flags & DB_NEW_OBJECT && cret != DB_ERROR_NONE) { Eterm ret; - db_erase_tree(tbl, GETKEY(tb, bp->dbterm.tpl), &ret); + db_erase_tree(tbl, GETKEY(&tbl->common, bp->dbterm.tpl), &ret); } else if (handle->flags & DB_MUST_RESIZE) { db_finalize_resize(handle, offsetof(TreeDbTerm,dbterm)); - reset_static_stack(tb); + reset_static_stack(stack_container); - free_term(tb, bp); + free_term(tbl, bp); } #ifdef DEBUG handle->dbterm = 0; @@ -2863,13 +3042,22 @@ db_finalize_dbterm_tree(int cret, DbUpdateHandle *handle) return; } +static void +db_finalize_dbterm_tree(int cret, DbUpdateHandle *handle) +{ + DbTable *tbl = handle->tb; + DbTableTree *tb = &tbl->tree; + db_finalize_dbterm_tree_common(cret, handle, tb); +} + /* * Traverse the tree with a callback function, used by db_match_xxx */ -static void traverse_backwards(DbTableTree *tb, +static void traverse_backwards(DbTableCommon *tb, + TreeDbTerm **root, DbTreeStack* stack, Eterm lastkey, - int (*doit)(DbTableTree *, + int (*doit)(DbTableCommon *, TreeDbTerm *, void *, int), @@ -2879,7 +3067,7 @@ static void traverse_backwards(DbTableTree *tb, if (lastkey == THE_NON_VALUE) { stack->pos = stack->slot = 0; - if (( this = tb->root ) == NULL) { + if (( this = *root ) == NULL) { return; } while (this != NULL) { @@ -2887,15 +3075,15 @@ static void traverse_backwards(DbTableTree *tb, this = this->right; } this = TOP_NODE(stack); - next = find_prev(tb, stack, GETKEY(tb, this->dbterm.tpl)); + next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); if (!((*doit)(tb, this, context, 0))) return; } else { - next = find_prev(tb, stack, lastkey); + next = find_prev(tb, *root, stack, lastkey); } while ((this = next) != NULL) { - next = find_prev(tb, stack, GETKEY(tb, this->dbterm.tpl)); + next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); if (!((*doit)(tb, this, context, 0))) return; } @@ -2904,10 +3092,11 @@ static void traverse_backwards(DbTableTree *tb, /* * Traverse the tree with a callback function, used by db_match_xxx */ -static void traverse_forward(DbTableTree *tb, +static void traverse_forward(DbTableCommon *tb, + TreeDbTerm **root, DbTreeStack* stack, Eterm lastkey, - int (*doit)(DbTableTree *, + int (*doit)(DbTableCommon *, TreeDbTerm *, void *, int), @@ -2917,7 +3106,7 @@ static void traverse_forward(DbTableTree *tb, if (lastkey == THE_NON_VALUE) { stack->pos = stack->slot = 0; - if (( this = tb->root ) == NULL) { + if (( this = *root ) == NULL) { return; } while (this != NULL) { @@ -2925,15 +3114,15 @@ static void traverse_forward(DbTableTree *tb, this = this->left; } this = TOP_NODE(stack); - next = find_next(tb, stack, GETKEY(tb, this->dbterm.tpl)); + next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); if (!((*doit)(tb, this, context, 1))) return; } else { - next = find_next(tb, stack, lastkey); + next = find_next(tb, *root, stack, lastkey); } while ((this = next) != NULL) { - next = find_next(tb, stack, GETKEY(tb, this->dbterm.tpl)); + next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl)); if (!((*doit)(tb, this, context, 1))) return; } @@ -2942,10 +3131,11 @@ static void traverse_forward(DbTableTree *tb, /* * Traverse the tree with an update callback function, used by db_select_replace */ -static void traverse_update_backwards(DbTableTree *tb, +static void traverse_update_backwards(DbTableCommon *tb, + TreeDbTerm **root, DbTreeStack* stack, Eterm lastkey, - int (*doit)(DbTableTree*, + int (*doit)(DbTableCommon*, TreeDbTerm**, void*, int), @@ -2956,7 +3146,7 @@ static void traverse_update_backwards(DbTableTree *tb, if (lastkey == THE_NON_VALUE) { stack->pos = stack->slot = 0; - if (( this = tb->root ) == NULL) { + if (( this = *root ) == NULL) { return; } while (this != NULL) { @@ -2964,23 +3154,23 @@ static void traverse_update_backwards(DbTableTree *tb, this = this->right; } this = TOP_NODE(stack); - this_ptr = find_ptr(tb, stack, this); + this_ptr = find_ptr(tb, root, stack, this); ASSERT(this_ptr != NULL); res = (*doit)(tb, this_ptr, context, 0); REPLACE_TOP_NODE(stack, *this_ptr); - next = find_prev(tb, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl)); + next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl)); if (!res) return; } else { - next = find_prev(tb, stack, lastkey); + next = find_prev(tb, *root, stack, lastkey); } while ((this = next) != NULL) { - this_ptr = find_ptr(tb, stack, this); + this_ptr = find_ptr(tb, root, stack, this); ASSERT(this_ptr != NULL); res = (*doit)(tb, this_ptr, context, 0); REPLACE_TOP_NODE(stack, *this_ptr); - next = find_prev(tb, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl)); + next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl)); if (!res) return; } @@ -2990,8 +3180,8 @@ static void traverse_update_backwards(DbTableTree *tb, * Returns 0 if not given 1 if given and -1 on no possible match * if key is given; *ret is set to point to the object concerned. */ -static int key_given(DbTableTree *tb, Eterm pattern, TreeDbTerm ***ret, - Eterm *partly_bound) +static int key_given(DbTableCommon *tb, TreeDbTerm **root, Eterm pattern, + TreeDbTerm ***ret, Eterm *partly_bound) { TreeDbTerm **this; Eterm key; @@ -2999,11 +3189,11 @@ static int key_given(DbTableTree *tb, Eterm pattern, TreeDbTerm ***ret, ASSERT(ret != NULL); if (pattern == am_Underscore || db_is_variable(pattern) != -1) return 0; - key = db_getkey(tb->common.keypos, pattern); + key = db_getkey(tb->keypos, pattern); if (is_non_value(key)) return -1; /* can't possibly match anything */ if (!db_has_variable(key)) { /* Bound key */ - if (( this = find_node2(tb, key) ) == NULL) { + if (( this = find_node2(tb, root, key) ) == NULL) { return -1; } *ret = this; @@ -3296,7 +3486,7 @@ static int do_partly_bound_can_match_greater(Eterm a, Eterm b, * Callback functions for the different match functions */ -static int doit_select(DbTableTree *tb, TreeDbTerm *this, void *ptr, +static int doit_select(DbTableCommon *tb, TreeDbTerm *this, void *ptr, int forward) { struct select_context *sc = (struct select_context *) ptr; @@ -3314,7 +3504,7 @@ static int doit_select(DbTableTree *tb, TreeDbTerm *this, void *ptr, GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0))) { return 0; } - ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &this->dbterm, &hp, 2); + ret = db_match_dbterm(tb, sc->p,sc->mp, &this->dbterm, &hp, 2); if (is_value(ret)) { sc->accum = CONS(hp, ret, sc->accum); } @@ -3331,7 +3521,7 @@ static int doit_select(DbTableTree *tb, TreeDbTerm *this, void *ptr, return 1; } -static int doit_select_count(DbTableTree *tb, TreeDbTerm *this, void *ptr, +static int doit_select_count(DbTableCommon *tb, TreeDbTerm *this, void *ptr, int forward) { struct select_count_context *sc = (struct select_count_context *) ptr; @@ -3345,7 +3535,7 @@ static int doit_select_count(DbTableTree *tb, TreeDbTerm *this, void *ptr, GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0)) { return 0; } - ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &this->dbterm, NULL, 0); + ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, NULL, 0); if (ret == am_true) { ++(sc->got); } @@ -3355,7 +3545,7 @@ static int doit_select_count(DbTableTree *tb, TreeDbTerm *this, void *ptr, return 1; } -static int doit_select_chunk(DbTableTree *tb, TreeDbTerm *this, void *ptr, +static int doit_select_chunk(DbTableCommon *tb, TreeDbTerm *this, void *ptr, int forward) { struct select_context *sc = (struct select_context *) ptr; @@ -3374,7 +3564,7 @@ static int doit_select_chunk(DbTableTree *tb, TreeDbTerm *this, void *ptr, return 0; } - ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &this->dbterm, &hp, 2); + ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, &hp, 2); if (is_value(ret)) { ++(sc->got); sc->accum = CONS(hp, ret, sc->accum); @@ -3393,7 +3583,7 @@ static int doit_select_chunk(DbTableTree *tb, TreeDbTerm *this, void *ptr, } -static int doit_select_delete(DbTableTree *tb, TreeDbTerm *this, void *ptr, +static int doit_select_delete(DbTableCommon *tb, TreeDbTerm *this, void *ptr, int forward) { struct select_delete_context *sc = (struct select_delete_context *) ptr; @@ -3401,7 +3591,7 @@ static int doit_select_delete(DbTableTree *tb, TreeDbTerm *this, void *ptr, Eterm key; if (sc->erase_lastterm) - free_term(tb, sc->lastterm); + free_term((DbTable*)tb, sc->lastterm); sc->erase_lastterm = 0; sc->lastterm = this; @@ -3409,10 +3599,10 @@ static int doit_select_delete(DbTableTree *tb, TreeDbTerm *this, void *ptr, cmp_partly_bound(sc->end_condition, GETKEY_WITH_POS(sc->keypos, this->dbterm.tpl)) > 0) return 0; - ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &this->dbterm, NULL, 0); + ret = db_match_dbterm(tb, sc->p, sc->mp, &this->dbterm, NULL, 0); if (ret == am_true) { key = GETKEY(sc->tb, this->dbterm.tpl); - linkout_tree(sc->tb, key); + linkout_tree(sc->tb, sc->root, key, sc->stack); sc->erase_lastterm = 1; ++sc->accum; } @@ -3422,7 +3612,7 @@ static int doit_select_delete(DbTableTree *tb, TreeDbTerm *this, void *ptr, return 1; } -static int doit_select_replace(DbTableTree *tb, TreeDbTerm **this, void *ptr, +static int doit_select_replace(DbTableCommon *tb, TreeDbTerm **this, void *ptr, int forward) { struct select_replace_context *sc = (struct select_replace_context *) ptr; @@ -3436,13 +3626,13 @@ static int doit_select_replace(DbTableTree *tb, TreeDbTerm **this, void *ptr, GETKEY_WITH_POS(sc->keypos, (*this)->dbterm.tpl)) > 0)) { return 0; } - ret = db_match_dbterm(&tb->common, sc->p, sc->mp, &(*this)->dbterm, NULL, 0); + ret = db_match_dbterm(tb, sc->p, sc->mp, &(*this)->dbterm, NULL, 0); if (is_value(ret)) { TreeDbTerm* new; TreeDbTerm* old = *this; #ifdef DEBUG - Eterm key = db_getkey(tb->common.keypos, ret); + Eterm key = db_getkey(tb->keypos, ret); ASSERT(is_value(key)); ASSERT(cmp_key(tb, key, old) == 0); #endif @@ -3452,7 +3642,7 @@ static int doit_select_replace(DbTableTree *tb, TreeDbTerm **this, void *ptr, new->balance = old->balance; sc->lastobj = new->dbterm.tpl; *this = new; - free_term(tb, old); + free_term((DbTable*)tb, old); ++(sc->replaced); } if (--(sc->max) <= 0) { @@ -3462,7 +3652,7 @@ static int doit_select_replace(DbTableTree *tb, TreeDbTerm **this, void *ptr, } #ifdef TREE_DEBUG -static void do_dump_tree2(DbTableTree* tb, int to, void *to_arg, int show, +static void do_dump_tree2(DbTableCommon* tb, int to, void *to_arg, int show, TreeDbTerm *t, int offset) { if (t == NULL) @@ -3471,7 +3661,7 @@ static void do_dump_tree2(DbTableTree* tb, int to, void *to_arg, int show, if (show) { const char* prefix; Eterm term; - if (tb->common.compress) { + if (tb->compress) { prefix = "key="; term = GETKEY(tb, t->dbterm.tpl); } @@ -3526,7 +3716,7 @@ static void check_slot_pos(DbTableTree *tb) "element position %d is really 0x%08X, when stack says " "it's 0x%08X\n", tb->stack.slot, t, tb->stack.array[tb->stack.pos - 1]); - do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0); + do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0); } } @@ -3541,14 +3731,14 @@ static void check_saved_stack(DbTableTree *tb) if (t != stack->array[0]) { erts_fprintf(stderr,"tb->stack[0] is 0x%08X, should be 0x%08X\n", stack->array[0], t); - do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0); + do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0); return; } while (n < stack->pos) { if (t == NULL) { erts_fprintf(stderr, "NULL pointer in tree when stack not empty," " stack depth is %d\n", n); - do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0); + do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0); return; } n++; @@ -3562,7 +3752,7 @@ static void check_saved_stack(DbTableTree *tb) "represent child pointer in tree!" "(left == 0x%08X, right == 0x%08X\n", n, tb->stack[n], t->left, t->right); - do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0); + do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, tb->root, 0); return; } } @@ -3581,7 +3771,7 @@ static int check_table_tree(DbTableTree* tb, TreeDbTerm *t) erts_fprintf(stderr,"balance = %d, left = 0x%08X, right = 0x%08X\n", t->balance, t->left, t->right); erts_fprintf(stderr,"\nDump:\n---------------------------------\n"); - do_dump_tree2(tb, ERTS_PRINT_STDERR, NULL, 1, t, 0); + do_dump_tree2(&tb->common, ERTS_PRINT_STDERR, NULL, 1, t, 0); erts_fprintf(stderr,"\n---------------------------------\n"); } return ((rh > lh) ? rh : lh) + 1; diff --git a/erts/emulator/beam/erl_db_tree_util.h b/erts/emulator/beam/erl_db_tree_util.h new file mode 100644 index 0000000000..fbd9a9124a --- /dev/null +++ b/erts/emulator/beam/erl_db_tree_util.h @@ -0,0 +1,151 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 1998-2016. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * %CopyrightEnd% + */ + +#ifndef _DB_TREE_UTIL_H +#define _DB_TREE_UTIL_H + +/* +** Internal functions and macros used by both the CA tree and the AVL tree +*/ + +/* +** A stack of this size is enough for an AVL tree with more than +** 0xFFFFFFFF elements. May be subject to change if +** the datatype of the element counter is changed to a 64 bit integer. +** The Maximal height of an AVL tree is calculated as: +** h(n) <= 1.4404 * log(n + 2) - 0.328 +** Where n denotes the number of nodes, h(n) the height of the tree +** with n nodes and log is the binary logarithm. +*/ + +#define STACK_NEED 50 + +#define PUSH_NODE(Dtt, Tdt) \ + ((Dtt)->array[(Dtt)->pos++] = Tdt) + +#define POP_NODE(Dtt) \ + (((Dtt)->pos) ? \ + (Dtt)->array[--((Dtt)->pos)] : NULL) + +#define TOP_NODE(Dtt) \ + ((Dtt->pos) ? \ + (Dtt)->array[(Dtt)->pos - 1] : NULL) + +#define EMPTY_NODE(Dtt) (TOP_NODE(Dtt) == NULL) + +static ERTS_INLINE void free_term(DbTable *tb, TreeDbTerm* p) +{ + db_free_term(tb, p, offsetof(TreeDbTerm, dbterm)); +} + +/* +** Some macros for "direction stacks" +*/ +#define DIR_LEFT 0 +#define DIR_RIGHT 1 +#define DIR_END 2 + +static ERTS_INLINE Sint cmp_key(DbTableCommon* tb, Eterm key, TreeDbTerm* obj) { + return CMP(key, GETKEY(tb,obj->dbterm.tpl)); +} + +int tree_balance_left(TreeDbTerm **this); +int tree_balance_right(TreeDbTerm **this); + +int db_first_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, + Eterm *ret, DbTableTree *stack_container); +int db_next_tree_common(Process *p, DbTable *tbl, + TreeDbTerm *root, Eterm key, + Eterm *ret, DbTreeStack* stack); +int db_last_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, + Eterm *ret, DbTableTree *stack_container); +int db_prev_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, Eterm key, + Eterm *ret, DbTreeStack* stack); +int db_put_tree_common(DbTableCommon *tb, TreeDbTerm **root, Eterm obj, + int key_clash_fail, DbTableTree *stack_container); +int db_get_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm *root, Eterm key, + Eterm *ret, DbTableTree *stack_container); +int db_get_element_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm *root, Eterm key, + int ndex, Eterm *ret, DbTableTree *stack_container); +int db_member_tree_common(DbTableCommon *tb, TreeDbTerm *root, Eterm key, Eterm *ret, + DbTableTree *stack_container); +int db_erase_tree_common(DbTable *tbl, TreeDbTerm **root, Eterm key, Eterm *ret, + DbTreeStack *stack /* NULL if no static stack */); +int db_erase_object_tree_common(DbTable *tbl, TreeDbTerm **root, Eterm object, + Eterm *ret, DbTableTree *stack_container); +int db_slot_tree_common(Process *p, DbTable *tbl, TreeDbTerm *root, + Eterm slot_term, Eterm *ret, + DbTableTree *stack_container); +int db_select_chunk_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, + Eterm tid, Eterm pattern, Sint chunk_size, + int reverse, Eterm *ret, + DbTableTree *stack_container); +int db_select_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, + Eterm tid, Eterm pattern, int reverse, Eterm *ret, + DbTableTree *stack_container); +int db_select_delete_tree_common(Process *p, DbTable *tbl, + TreeDbTerm **root, + Eterm tid, Eterm pattern, + Eterm *ret, + DbTreeStack* stack); +int db_select_continue_tree_common(Process *p, + DbTableCommon *tb, + TreeDbTerm **root, + Eterm continuation, + Eterm *ret, + DbTableTree *stack_container); +int db_select_delete_continue_tree_common(Process *p, + DbTable *tbl, + TreeDbTerm **root, + Eterm continuation, + Eterm *ret, + DbTreeStack* stack); +int db_select_count_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, + Eterm tid, Eterm pattern, Eterm *ret, + DbTableTree *stack_container); +int db_select_count_continue_tree_common(Process *p, + DbTableCommon *tb, + TreeDbTerm **root, + Eterm continuation, + Eterm *ret, + DbTableTree *stack_container); +int db_select_replace_tree_common(Process *p, DbTableCommon *tb, TreeDbTerm **root, + Eterm tid, Eterm pattern, Eterm *ret, + DbTableTree *stack_container); +int db_select_replace_continue_tree_common(Process *p, + DbTableCommon *tb, + TreeDbTerm **root, + Eterm continuation, + Eterm *ret, + DbTableTree *stack_container); +int db_take_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root, + Eterm key, Eterm *ret, + DbTreeStack *stack /* NULL if no static stack */); +void db_print_tree_common(fmtfn_t to, void *to_arg, + int show, TreeDbTerm *root, DbTable *tbl); +void db_foreach_offheap_tree_common(TreeDbTerm *root, + void (*func)(ErlOffHeap *, void *), + void * arg); +int db_lookup_dbterm_tree_common(Process *p, DbTable *tbl, TreeDbTerm **root, + Eterm key, Eterm obj, DbUpdateHandle* handle, + DbTableTree *stack_container); +void db_finalize_dbterm_tree_common(int cret, DbUpdateHandle *handle, + DbTableTree *stack_container); +#endif /* _DB_TREE_UTIL_H */ diff --git a/erts/emulator/beam/erl_db_util.h b/erts/emulator/beam/erl_db_util.h index 6ec3b4f98f..be74bc795c 100644 --- a/erts/emulator/beam/erl_db_util.h +++ b/erts/emulator/beam/erl_db_util.h @@ -90,6 +90,8 @@ typedef struct { Uint new_size; int flags; void* lck; + void* lck2; + int current_level; } DbUpdateHandle; @@ -274,23 +276,26 @@ typedef struct db_table_common { } DbTableCommon; /* These are status bit patterns */ -#define DB_PRIVATE (1 << 0) -#define DB_PROTECTED (1 << 1) -#define DB_PUBLIC (1 << 2) -#define DB_DELETE (1 << 3) /* table is being deleted */ -#define DB_SET (1 << 4) -#define DB_BAG (1 << 5) -#define DB_DUPLICATE_BAG (1 << 6) -#define DB_ORDERED_SET (1 << 7) -#define DB_FINE_LOCKED (1 << 8) /* write_concurrency */ -#define DB_FREQ_READ (1 << 9) /* read_concurrency */ -#define DB_NAMED_TABLE (1 << 10) -#define DB_BUSY (1 << 11) +#define DB_PRIVATE (1 << 0) +#define DB_PROTECTED (1 << 1) +#define DB_PUBLIC (1 << 2) +#define DB_DELETE (1 << 3) /* table is being deleted */ +#define DB_SET (1 << 4) +#define DB_BAG (1 << 5) +#define DB_DUPLICATE_BAG (1 << 6) +#define DB_ORDERED_SET (1 << 7) +#define DB_CA_ORDERED_SET (1 << 8) +#define DB_FINE_LOCKED (1 << 9) /* write_concurrency */ +#define DB_FREQ_READ (1 << 10) /* read_concurrency */ +#define DB_NAMED_TABLE (1 << 11) +#define DB_BUSY (1 << 12) #define IS_HASH_TABLE(Status) (!!((Status) & \ (DB_BAG | DB_SET | DB_DUPLICATE_BAG))) #define IS_TREE_TABLE(Status) (!!((Status) & \ DB_ORDERED_SET)) +#define IS_CATREE_TABLE(Status) (!!((Status) & \ + DB_CA_ORDERED_SET)) #define NFIXED(T) (erts_refc_read(&(T)->common.fix_count,0)) #define IS_FIXED(T) (NFIXED(T) != 0) diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c index 463ae898a3..3b10ef8c25 100644 --- a/erts/emulator/beam/erl_lock_check.c +++ b/erts/emulator/beam/erl_lock_check.c @@ -91,6 +91,8 @@ static erts_lc_lock_order_t erts_lock_order[] = { { "db_tab", "address" }, { "db_tab_fix", "address" }, { "db_hash_slot", "address" }, + { "erl_db_catree_base_node", "dynamic" }, + { "erl_db_catree_route_node", "dynamic" }, { "resource_monitors", "address" }, { "driver_list", NULL }, { "proc_msgq", "pid" }, @@ -707,6 +709,26 @@ erts_lc_get_lock_order_id(char *name) return (Sint16) -1; } +int +erts_lc_is_check_order(char *name) +{ + int i; + if (!name || name[0] == '\0') + erts_fprintf(stderr, "Missing lock name\n"); + + for (i = 0; i < ERTS_LOCK_ORDER_SIZE; i++) { + if (sys_strcmp(erts_lock_order[i].name, name) == 0) { + if (erts_lock_order[i].internal_order != NULL && + sys_strcmp(erts_lock_order[i].internal_order, "dynamic") == 0) { + return 0; + }else{ + return 1; + } + } + } + return 1; +} + static int compare_locked_by_id(lc_locked_lock_t *locked_lock, erts_lc_lock_t *comparand) { if(locked_lock->id < comparand->id) { @@ -987,15 +1009,17 @@ erts_lc_trylock_force_busy_flg(erts_lc_lock_t *lck, erts_lock_options_t options) */ - /* Check that we are not trying to lock this lock twice */ - for (tl_lck = thr->locked.last; tl_lck; tl_lck = tl_lck->prev) { - if (tl_lck->id < lck->id - || (tl_lck->id == lck->id && tl_lck->extra <= lck->extra)) { - if (tl_lck->id == lck->id && tl_lck->extra == lck->extra) - lock_twice("Trylocking", thr, lck, options); - break; - } - } + if (lck->check_order) { + /* Check that we are not trying to lock this lock twice */ + for (tl_lck = thr->locked.last; tl_lck; tl_lck = tl_lck->prev) { + if (tl_lck->id < lck->id + || (tl_lck->id == lck->id && tl_lck->extra <= lck->extra)) { + if (tl_lck->id == lck->id && tl_lck->extra == lck->extra) + lock_twice("Trylocking", thr, lck, options); + break; + } + } + } #ifndef ERTS_LC_ALLWAYS_FORCE_BUSY_TRYLOCK_ON_LOCK_ORDER_VIOLATION /* We only force busy if a lock order violation would occur @@ -1044,7 +1068,7 @@ void erts_lc_trylock_flg_x(int locked, erts_lc_lock_t *lck, erts_lock_options_t for (tl_lck = thr->locked.last; tl_lck; tl_lck = tl_lck->prev) { if (tl_lck->id < lck->id || (tl_lck->id == lck->id && tl_lck->extra <= lck->extra)) { - if (tl_lck->id == lck->id && tl_lck->extra == lck->extra) + if (tl_lck->id == lck->id && tl_lck->extra == lck->extra && lck->check_order) lock_twice("Trylocking", thr, lck, options); if (locked) { ll->next = tl_lck->next; @@ -1164,9 +1188,10 @@ void erts_lc_lock_flg_x(erts_lc_lock_t *lck, erts_lock_options_t options, ASSERT(0 < lck->id && lck->id < ERTS_LOCK_ORDER_SIZE); thr->matrix.m[lck->id][0] = 1; } - else if (thr->locked.last->id < lck->id - || (thr->locked.last->id == lck->id - && thr->locked.last->extra < lck->extra)) { + else if (( ! lck->check_order && thr->locked.last->id == lck->id) || + (thr->locked.last->id < lck->id + || (thr->locked.last->id == lck->id + && thr->locked.last->extra < lck->extra))) { lc_locked_lock_t* ll; if (LOCK_IS_TYPE_ORDER_VIOLATION(lck->flags, thr->locked.last->flags)) { type_order_violation("locking ", thr, lck); @@ -1296,7 +1321,7 @@ void erts_lc_init_lock(erts_lc_lock_t *lck, char *name, erts_lock_flags_t flags) { lck->id = erts_lc_get_lock_order_id(name); - + lck->check_order = erts_lc_is_check_order(name); lck->extra = (UWord) &lck->extra; ASSERT(is_not_immed(lck->extra)); lck->flags = flags; @@ -1308,6 +1333,7 @@ void erts_lc_init_lock_x(erts_lc_lock_t *lck, char *name, erts_lock_flags_t flags, Eterm extra) { lck->id = erts_lc_get_lock_order_id(name); + lck->check_order = erts_lc_is_check_order(name); lck->extra = extra; ASSERT(is_immed(lck->extra)); lck->flags = flags; diff --git a/erts/emulator/beam/erl_lock_check.h b/erts/emulator/beam/erl_lock_check.h index d10e32985a..472e88ad65 100644 --- a/erts/emulator/beam/erl_lock_check.h +++ b/erts/emulator/beam/erl_lock_check.h @@ -46,6 +46,7 @@ typedef struct { int inited; Sint16 id; + int check_order; erts_lock_flags_t flags; erts_lock_options_t taken_options; UWord extra; @@ -53,11 +54,12 @@ typedef struct { #define ERTS_LC_INITITALIZED 0x7f7f7f7f -#define ERTS_LC_LOCK_INIT(ID, X, F) {ERTS_LC_INITITALIZED, (ID), (F), 0, (X)} +#define ERTS_LC_LOCK_INIT(ID, X, F) {ERTS_LC_INITITALIZED, (ID), 1, (F), 0, (X)} void erts_lc_init(void); void erts_lc_late_init(void); Sint16 erts_lc_get_lock_order_id(char *name); +int erts_lc_is_check_order(char *name); void erts_lc_check(erts_lc_lock_t *have, int have_len, erts_lc_lock_t *have_not, int have_not_len); void erts_lc_check_exact(erts_lc_lock_t *have, int have_len); |