diff options
Diffstat (limited to 'erts/emulator')
-rw-r--r-- | erts/emulator/beam/erl_db_catree.c | 236 |
1 files changed, 93 insertions, 143 deletions
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c index bc058dcf18..756f188983 100644 --- a/erts/emulator/beam/erl_db_catree.c +++ b/erts/emulator/beam/erl_db_catree.c @@ -568,14 +568,14 @@ static TreeDbTerm* join_trees(TreeDbTerm *left_root_param, p = *this; if (dir == DIR_LEFT) { switch (p->balance) { - case 1: + case 1: p->balance = 0; state = 0; break; - case 0: + case 0: p->balance = -1; break; - case -1: /* The icky case */ + case -1: /* The icky case */ p1 = p->left; if (p1->balance == -1) { /* Single LL rotation */ p->left = p1->right; @@ -598,14 +598,14 @@ static TreeDbTerm* join_trees(TreeDbTerm *left_root_param, } } else { /* dir == DIR_RIGHT */ switch (p->balance) { - case -1: + case -1: p->balance = 0; state = 0; break; - case 0: + case 0: p->balance = 1; break; - case 1: + case 1: p1 = p->right; if (p1->balance == 1) { /* Single RR rotation */ p->right = p1->left; @@ -800,12 +800,6 @@ void destroy_root_iterator(CATreeRootIterator* iter) erts_free(ERTS_ALC_T_DB_TMP, iter->search_key); } -/* - * The following macros are used to create the ETS functions that only - * need to access one element (e.g. db_put_catree, db_get_catree and - * db_erase_catree). - */ - typedef struct { @@ -818,11 +812,15 @@ DbTableCATreeNode* find_base_node(DbTableCATree* tb, Eterm key, FindBaseNode* fbn) { DbTableCATreeNode* ERTS_RESTRICT node = GET_ROOT_ACQB(tb); - fbn->parent = NULL; - fbn->current_level = 0; + if (fbn) { + fbn->parent = NULL; + fbn->current_level = 0; + } while (!node->is_base_node) { - fbn->current_level++; - fbn->parent = node; + if (fbn) { + fbn->current_level++; + fbn->parent = node; + } if (cmp_key_route(key, node) < 0) { node = GET_LEFT_ACQB(node); } else { @@ -832,59 +830,36 @@ DbTableCATreeNode* find_base_node(DbTableCATree* tb, Eterm key, return node; } -#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(LOCK,UNLOCK) \ - int retry; \ - DbTableCATreeNode *current_node; \ - DbTableCATreeNode *prev_node; \ - DbTableCATreeBaseNode *base_node; \ - int current_level; \ - (void)prev_node; \ - do { \ - retry = 0; \ - current_node = GET_ROOT_ACQB(tb); \ - prev_node = NULL; \ - current_level = 0; \ - while ( ! current_node->is_base_node ) { \ - current_level = current_level + 1; \ - prev_node = current_node; \ - if (cmp_key_route(key,current_node) < 0) { \ - current_node = GET_LEFT_ACQB(current_node); \ - } else { \ - current_node = GET_RIGHT_ACQB(current_node); \ - } \ - } \ - base_node = ¤t_node->u.base; \ - LOCK(current_node); \ - if ( ! base_node->is_valid ) { \ - /* Retry */ \ - UNLOCK(current_node); \ - retry = 1; \ - } \ - } while(retry); - -#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \ - static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \ - Eterm key, \ - PARAMETERS){ \ - int result; \ - ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node) \ - result = SEQUENTAIL_CALL; \ - wunlock_adapt_base_node(tb, current_node, prev_node, current_level);\ - return result; \ +static ERTS_INLINE +DbTableCATreeNode* find_rlock_valid_base_node(DbTableCATree* tb, Eterm key) +{ + DbTableCATreeNode* base_node; + + while (1) { + base_node = find_base_node(tb, key, NULL); + rlock_base_node(base_node); + if (base_node->u.base.is_valid) + break; + runlock_base_node(base_node); } + return base_node; +} +static ERTS_INLINE +DbTableCATreeNode* find_wlock_valid_base_node(DbTableCATree* tb, Eterm key, + FindBaseNode* fbn) +{ + DbTableCATreeNode* base_node; -#define ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(FUN_POSTFIX,PARAMETERS,SEQUENTAIL_CALL) \ - static int erl_db_catree_do_operation_##FUN_POSTFIX(DbTableCATree *tb, \ - Eterm key, \ - PARAMETERS){ \ - int result; \ - ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(rlock_base_node,runlock_base_node) \ - result = SEQUENTAIL_CALL; \ - runlock_base_node(current_node); \ - return result; \ + while (1) { + base_node = find_base_node(tb, key, fbn); + wlock_base_node(base_node); + if (base_node->u.base.is_valid) + break; + wunlock_base_node(base_node); } - + return base_node; +} static ERTS_INLINE Eterm copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size) @@ -1534,40 +1509,29 @@ static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) return result; } -#define ERL_DB_CATREE_DO_OPERATION_PUT_PARAMS Eterm obj, int key_clash_fail -ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(put, - ERL_DB_CATREE_DO_OPERATION_PUT_PARAMS, - db_put_tree_common(&tb->common, - &base_node->root, - obj, - key_clash_fail, - NULL);) - static int db_put_catree(DbTable *tbl, Eterm obj, int key_clash_fail) { DbTableCATree *tb = &tbl->catree; Eterm key = GETKEY(&tb->common, tuple_val(obj)); - return erl_db_catree_do_operation_put(tb, - key, - obj, - key_clash_fail); + FindBaseNode fbn; + DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn); + int result = db_put_tree_common(&tb->common, &node->u.base.root, obj, + key_clash_fail, NULL); + wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level); + return result; } -#define ERL_DB_CATREE_DO_OPERATION_GET_PARAMS Process *p, Eterm *ret -ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(get, - ERL_DB_CATREE_DO_OPERATION_GET_PARAMS, - db_get_tree_common(p, &tb->common, - base_node->root, - key, ret, NULL);) - static int db_get_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) { DbTableCATree *tb = &tbl->catree; - return erl_db_catree_do_operation_get(tb, key, p, ret); + DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key); + int result = db_get_tree_common(p, &tb->common, + node->u.base.root, + key, ret, NULL); + runlock_base_node(node); + return result; } - - TreeDbTerm** catree_find_root(Eterm key, CATreeRootIterator* iter) { FindBaseNode fbn; @@ -1822,66 +1786,53 @@ TreeDbTerm** catree_find_last_root(CATreeRootIterator* iter) return catree_find_firstlast_root(iter, 0); } - -#define ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS Eterm *ret -ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(member, - ERL_DB_CATREE_DO_OPERATION_MEMBER_PARAMS, - db_member_tree_common(&tb->common, - base_node->root, - key, - ret, - NULL);) - static int db_member_catree(DbTable *tbl, Eterm key, Eterm *ret) { DbTableCATree *tb = &tbl->catree; - return erl_db_catree_do_operation_member(tb, key, ret); + DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key); + int result = db_member_tree_common(&tb->common, + node->u.base.root, + key, ret, NULL); + runlock_base_node(node); + return result; } -#define ERL_DB_CATREE_DO_OPERATION_GET_ELEMENT_PARAMS Process *p, int ndex, Eterm *ret -ERL_DB_CATREE_CREATE_DO_READ_OPERATION_FUNCTION(get_element, - ERL_DB_CATREE_DO_OPERATION_GET_ELEMENT_PARAMS, - db_get_element_tree_common(p, &tb->common, - base_node->root, - key, ndex, - ret, NULL)) - static int db_get_element_catree(Process *p, DbTable *tbl, Eterm key, int ndex, Eterm *ret) { DbTableCATree *tb = &tbl->catree; - return erl_db_catree_do_operation_get_element(tb, key, p, ndex, ret); + DbTableCATreeNode* node = find_rlock_valid_base_node(tb, key); + int result = db_get_element_tree_common(p, &tb->common, + node->u.base.root, + key, ndex, ret, NULL); + runlock_base_node(node); + return result; } -#define ERL_DB_CATREE_DO_OPERATION_ERASE_PARAMS DbTable *tbl, Eterm *ret -ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(erase, - ERL_DB_CATREE_DO_OPERATION_ERASE_PARAMS, - db_erase_tree_common(tbl, - &base_node->root, - key, - ret, - NULL);) - static int db_erase_catree(DbTable *tbl, Eterm key, Eterm *ret) { DbTableCATree *tb = &tbl->catree; - return erl_db_catree_do_operation_erase(tb, key, tbl, ret); + FindBaseNode fbn; + DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn); + int result = db_erase_tree_common(tbl, &node->u.base.root, key, + ret, NULL); + wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level); + return result; } -#define ERL_DB_CATREE_DO_OPERATION_ERASE_OBJECT_PARAMS DbTable *tbl, Eterm object, Eterm *ret -ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(erase_object, - ERL_DB_CATREE_DO_OPERATION_ERASE_OBJECT_PARAMS, - db_erase_object_tree_common(tbl, - &base_node->root, - object, - ret, - NULL);) - static int db_erase_object_catree(DbTable *tbl, Eterm object, Eterm *ret) { DbTableCATree *tb = &tbl->catree; Eterm key = GETKEY(&tb->common, tuple_val(object)); - return erl_db_catree_do_operation_erase_object(tb, key, tbl, object, ret); + FindBaseNode fbn; + DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn); + int result = db_erase_object_tree_common(tbl, + &node->u.base.root, + object, + ret, + NULL); + wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level); + return result; } @@ -2031,17 +1982,15 @@ static int db_select_replace_continue_catree(Process *p, DbTable *tbl, return result; } -#define ERL_DB_CATREE_DO_OPERATION_TAKE_PARAMS Process *p, Eterm *ret, DbTable *tbl -ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION(take, - ERL_DB_CATREE_DO_OPERATION_TAKE_PARAMS, - db_take_tree_common(p, tbl, - &base_node->root, - key, ret, NULL);) - static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) { DbTableCATree *tb = &tbl->catree; - return erl_db_catree_do_operation_take(tb, key, p, ret, tbl); + FindBaseNode fbn; + DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn); + int result = db_take_tree_common(p, tbl, &node->u.base.root, key, + ret, NULL); + wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level); + return result; } /* @@ -2154,16 +2103,17 @@ static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm ob DbUpdateHandle *handle) { DbTableCATree *tb = &tbl->catree; - int res; - ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_FIND_BASE_NODE_PART(wlock_base_node,wunlock_base_node); - res = db_lookup_dbterm_tree_common(p, tbl, &base_node->root, key, obj, handle, NULL); + FindBaseNode fbn; + DbTableCATreeNode* node = find_wlock_valid_base_node(tb, key, &fbn); + int res = db_lookup_dbterm_tree_common(p, tbl, &node->u.base.root, key, + obj, handle, NULL); if (res == 0) { - wunlock_adapt_base_node(tb, current_node, prev_node, current_level); + wunlock_adapt_base_node(tb, node, fbn.parent, fbn.current_level); } else { /* db_finalize_dbterm_catree will unlock */ - handle->lck = prev_node; - handle->lck2 = current_node; - handle->current_level = current_level; + handle->lck = fbn.parent; + handle->lck2 = node; + handle->current_level = fbn.current_level; } return res; } |