aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator')
-rw-r--r--erts/emulator/beam/erl_db_catree.c236
1 files changed, 93 insertions, 143 deletions
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c
index bc058dcf18..756f188983 100644
--- a/erts/emulator/beam/erl_db_catree.c
+++ b/erts/emulator/beam/erl_db_catree.c
@@ -568,14 +568,14 @@ static TreeDbTerm* join_trees(TreeDbTerm *left_root_param,
p = *this;
if (dir == DIR_LEFT) {
switch (p->balance) {
- case 1:
+ case 1:
p->balance = 0;
state = 0;
break;
- case 0:
+ case 0:
p->balance = -1;
break;
- case -1: /* The icky case */
+ case -1: /* The icky case */
p1 = p->left;
if (p1->balance == -1) { /* Single LL rotation */
p->left = p1->right;
@@ -598,14 +598,14 @@ static TreeDbTerm* join_trees(TreeDbTerm *left_root_param,
}
} else { /* dir == DIR_RIGHT */
switch (p->balance) {
- case -1:
+ case -1:
p->balance = 0;
state = 0;
break;
- case 0:
+ case 0:
p->balance = 1;
break;
- case 1:
+ case 1:
p1 = p->right;
if (p1->balance == 1) { /* Single RR rotation */
p->right = p1->left;
@@ -800,12 +800,6 @@ void destroy_root_iterator(CATreeRootIterator* iter)
erts_free(ERTS_ALC_T_DB_TMP, iter->search_key);
}
-/*
- * The following macros are used to create the ETS functions that only
- * need to access one element (e.g. db_put_catree, db_get_catree and
- * db_erase_catree).
- */
-
typedef struct
{
@@ -818,11 +812,15 @@ DbTableCATreeNode* find_base_node(DbTableCATree* tb, Eterm key,
FindBaseNode* fbn)
{
DbTableCATreeNode* ERTS_RESTRICT node = GET_ROOT_ACQB(tb);
- fbn->parent = NULL;
- fbn->current_level = 0;
+ if (fbn) {
+ fbn->parent = NULL;
+ fbn->current_level = 0;
+ }
while (!node->is_base_node) {
- fbn->current_level++;
- fbn->parent = node;
+ if (fbn) {
+ fbn->current_level++;
+ fbn->parent = node;
+ }
if (cmp_key_route(key, node) < 0) {
node = GET_LEFT_ACQB(node);
} else {
@@ -832,59 +830,36 @@ DbTableCATreeNode* find_base_node(DbTableCATree* tb, Eterm key,
return node;
}
-#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(LOCK,UNLOCK) \
- int retry; \
- DbTableCATreeNode *current_node; \
- DbTableCATreeNode *prev_node; \
- DbTableCATreeBaseNode *base_node; \
- int current_level; \
- (void)prev_node; \
- do { \
- retry = 0; \
- current_node = GET_ROOT_ACQB(tb); \
- prev_node = NULL; \
- current_level = 0; \
- while ( ! current_node->is_base_node ) { \
- current_level = current_level + 1; \
- prev_node = current_node; \
- if (cmp_key_route(key,current_node) < 0) { \
- current_node = GET_LEFT_ACQB(current_node); \
- } else { \
- current_node = GET_RIGHT_ACQB(current_node); \
- } \
- } \
- base_node = &current_node->u.base; \
- LOCK(current_node); \
- if ( ! base_node->is_valid ) { \
- /* Retry */ \
- UNLOCK(current_node); \
- retry = 1; \
- } \
- } while(retry);
-
-#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \
- static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \
- Eterm key, \
- PARAMETERS){ \
- int result; \
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node) \
- result = SEQUENTAIL_CALL; \
- wunlock_adapt_base_node(tb, current_node, prev_node, current_level);\
- return result; \
+static ERTS_INLINE
+DbTableCATreeNode* find_rlock_valid_base_node(DbTableCATree* tb, Eterm key)
+{
+ DbTableCATreeNode* base_node;
+
+ while (1) {
+ base_node = find_base_node(tb, key, NULL);
+ rlock_base_node(base_node);
+ if (base_node->u.base.is_valid)
+ break;
+ runlock_base_node(base_node);
}
+ return base_node;
+}
+static ERTS_INLINE
+DbTableCATreeNode* find_wlock_valid_base_node(DbTableCATree* tb, Eterm key,
+ FindBaseNode* fbn)
+{
+ DbTableCATreeNode* base_node;
-#define ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \
- static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \
- Eterm key, \
- PARAMETERS){ \
- int result; \
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(rlock_base_node,runlock_base_node) \
- result = SEQUENTAIL_CALL; \
- runlock_base_node(current_node); \
- return result; \
+ while (1) {
+ base_node = find_base_node(tb, key, fbn);
+ wlock_base_node(base_node);
+ if (base_node->u.base.is_valid)
+ break;
+ wunlock_base_node(base_node);
}
-
+ return base_node;
+}
static ERTS_INLINE
Eterm copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size)
@@ -1534,40 +1509,29 @@ static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_PUT_PARAMS Eterm obj, int key_clash_fail
-ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(put,
- ERL_DB_CATREE_DO_OPERATION_PUT_PARAMS,
- db_put_tree_common(&tb->common,
- &base_node->root,
- obj,
- key_clash_fail,
- NULL);)
-
static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail)
{
DbTableCATree *tb = &tbl->catree;
Eterm key = GETKEY(&tb->common, tuple_val(obj));
- return erl_db_catree_do_operation_put(tb,
- key,
- obj,
- key_clash_fail);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_put_tree_common(&tb->common, &node->u.base.root, obj,
+ key_clash_fail, NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_GET_PARAMS Process *p, Eterm *ret
-ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(get,
- ERL_DB_CATREE_DO_OPERATION_GET_PARAMS,
- db_get_tree_common(p, &tb->common,
- base_node->root,
- key, ret, NULL);)
-
static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_get(tb, key, p, ret);
+ DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
+ int result = db_get_tree_common(p, &tb->common,
+ node->u.base.root,
+ key, ret, NULL);
+ runlock_base_node(node);
+ return result;
}
-
-
TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator* iter)
{
FindBaseNode fbn;
@@ -1822,66 +1786,53 @@ TreeDbTerm** catree_find_last_root(CATreeRootIterator* iter)
return catree_find_firstlast_root(iter, 0);
}
-
-#define ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS Eterm *ret
-ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(member,
- ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS,
- db_member_tree_common(&tb->common,
- base_node->root,
- key,
- ret,
- NULL);)
-
static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_member(tb, key, ret);
+ DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
+ int result = db_member_tree_common(&tb->common,
+ node->u.base.root,
+ key, ret, NULL);
+ runlock_base_node(node);
+ return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_GET_ELEMENT_PARAMS Process *p, int ndex, Eterm *ret
-ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(get_element,
- ERL_DB_CATREE_DO_OPERATION_GET_ELEMENT_PARAMS,
- db_get_element_tree_common(p, &tb->common,
- base_node->root,
- key, ndex,
- ret, NULL))
-
static int db_get_element_catree(Process *p, DbTable *tbl,
Eterm key, int ndex, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_get_element(tb, key, p, ndex, ret);
+ DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key);
+ int result = db_get_element_tree_common(p, &tb->common,
+ node->u.base.root,
+ key, ndex, ret, NULL);
+ runlock_base_node(node);
+ return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_ERASE_PARAMS DbTable *tbl, Eterm *ret
-ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(erase,
- ERL_DB_CATREE_DO_OPERATION_ERASE_PARAMS,
- db_erase_tree_common(tbl,
- &base_node->root,
- key,
- ret,
- NULL);)
-
static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_erase(tb, key, tbl, ret);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_erase_tree_common(tbl, &node->u.base.root, key,
+ ret, NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_ERASE_OBJECT_PARAMS DbTable *tbl, Eterm object, Eterm *ret
-ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(erase_object,
- ERL_DB_CATREE_DO_OPERATION_ERASE_OBJECT_PARAMS,
- db_erase_object_tree_common(tbl,
- &base_node->root,
- object,
- ret,
- NULL);)
-
static int db_erase_object_catree(DbTable *tbl, Eterm object, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
Eterm key = GETKEY(&tb->common, tuple_val(object));
- return erl_db_catree_do_operation_erase_object(tb, key, tbl, object, ret);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_erase_object_tree_common(tbl,
+ &node->u.base.root,
+ object,
+ ret,
+ NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
}
@@ -2031,17 +1982,15 @@ static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
return result;
}
-#define ERL_DB_CATREE_DO_OPERATION_TAKE_PARAMS Process *p, Eterm *ret, DbTable *tbl
-ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(take,
- ERL_DB_CATREE_DO_OPERATION_TAKE_PARAMS,
- db_take_tree_common(p, tbl,
- &base_node->root,
- key, ret, NULL);)
-
static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableCATree *tb = &tbl->catree;
- return erl_db_catree_do_operation_take(tb, key, p, ret, tbl);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int result = db_take_tree_common(p, tbl, &node->u.base.root, key,
+ ret, NULL);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
+ return result;
}
/*
@@ -2154,16 +2103,17 @@ static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm ob
DbUpdateHandle *handle)
{
DbTableCATree *tb = &tbl->catree;
- int res;
- ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node);
- res = db_lookup_dbterm_tree_common(p, tbl, &base_node->root, key, obj, handle, NULL);
+ FindBaseNode fbn;
+ DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn);
+ int res = db_lookup_dbterm_tree_common(p, tbl, &node->u.base.root, key,
+ obj, handle, NULL);
if (res == 0) {
- wunlock_adapt_base_node(tb, current_node, prev_node, current_level);
+ wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level);
} else {
/* db_finalize_dbterm_catree will unlock */
- handle->lck = prev_node;
- handle->lck2 = current_node;
- handle->current_level = current_level;
+ handle->lck = fbn.parent;
+ handle->lck2 = node;
+ handle->current_level = fbn.current_level;
}
return res;
}