aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r--erts/emulator/beam/erl_bif_binary.c8
-rw-r--r--erts/emulator/beam/erl_db.c31
-rw-r--r--erts/emulator/beam/erl_db.h26
-rw-r--r--erts/emulator/beam/erl_db_catree.c977
-rw-r--r--erts/emulator/beam/erl_db_catree.h21
-rw-r--r--erts/emulator/beam/erl_db_hash.c10
-rw-r--r--erts/emulator/beam/erl_lock_check.c110
-rw-r--r--erts/emulator/beam/erl_lock_check.h8
-rw-r--r--erts/emulator/beam/erl_lock_flags.h6
9 files changed, 612 insertions, 585 deletions
diff --git a/erts/emulator/beam/erl_bif_binary.c b/erts/emulator/beam/erl_bif_binary.c
index ff919082c3..5b3f091ccc 100644
--- a/erts/emulator/beam/erl_bif_binary.c
+++ b/erts/emulator/beam/erl_bif_binary.c
@@ -471,6 +471,9 @@ static BMData *create_bmdata(MyAllocator *my, byte *x, Uint len,
Binary **the_bin /* out */)
{
Uint datasize;
+ BMData *bmd;
+ Binary *mb;
+ byte *data;
if(len > 1) {
datasize = BM_SIZE_MULTI(len);
@@ -478,9 +481,8 @@ static BMData *create_bmdata(MyAllocator *my, byte *x, Uint len,
datasize = BM_SIZE_SINGLE();
}
- BMData *bmd;
- Binary *mb = erts_create_magic_binary(datasize,cleanup_my_data_bm);
- byte *data = ERTS_MAGIC_BIN_DATA(mb);
+ mb = erts_create_magic_binary(datasize,cleanup_my_data_bm);
+ data = ERTS_MAGIC_BIN_DATA(mb);
init_my_allocator(my, datasize, data);
bmd = my_alloc(my, sizeof(BMData));
bmd->x = my_alloc(my,len);
diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c
index 1df972f4b6..3653c0bf7c 100644
--- a/erts/emulator/beam/erl_db.c
+++ b/erts/emulator/beam/erl_db.c
@@ -409,30 +409,17 @@ static void
free_dbtable(void *vtb)
{
DbTable *tb = (DbTable *) vtb;
-#ifdef HARDDEBUG
- if (erts_atomic_read_nob(&tb->common.memory_size) != sizeof(DbTable)) {
- erts_fprintf(stderr, "ets: free_dbtable memory remain=%ld fix=%x\n",
- erts_atomic_read_nob(&tb->common.memory_size)-sizeof(DbTable),
- tb->common.fixations);
- }
-#endif
- if (erts_atomic_read_nob(&tb->common.memory_size) > sizeof(DbTable)) {
- /* The CA tree implementation use delayed freeing and the DbTable needs to
- be freed after all other memory blocks that are allocated by the table. */
- erts_schedule_thr_prgr_later_cleanup_op(free_dbtable,
- (void *) tb,
- &tb->release.data,
- sizeof(DbTable));
- return;
- }
- erts_rwmtx_destroy(&tb->common.rwlock);
- erts_mtx_destroy(&tb->common.fixlock);
- ASSERT(is_immed(tb->common.heir_data));
- if (tb->common.btid)
- erts_bin_release(tb->common.btid);
+ ASSERT(erts_atomic_read_nob(&tb->common.memory_size) == sizeof(DbTable));
+
+ erts_rwmtx_destroy(&tb->common.rwlock);
+ erts_mtx_destroy(&tb->common.fixlock);
+ ASSERT(is_immed(tb->common.heir_data));
+
+ if (tb->common.btid)
+ erts_bin_release(tb->common.btid);
- erts_db_free(ERTS_ALC_T_DB_TABLE, tb, (void *) tb, sizeof(DbTable));
+ erts_db_free(ERTS_ALC_T_DB_TABLE, tb, (void *) tb, sizeof(DbTable));
}
static void schedule_free_dbtable(DbTable* tb)
diff --git a/erts/emulator/beam/erl_db.h b/erts/emulator/beam/erl_db.h
index 45d120ac0e..7a915ccea2 100644
--- a/erts/emulator/beam/erl_db.h
+++ b/erts/emulator/beam/erl_db.h
@@ -286,6 +286,12 @@ ERTS_GLB_INLINE void erts_db_free(ErtsAlcType_t type,
void *ptr,
Uint size);
+ERTS_GLB_INLINE void erts_schedule_db_free(DbTableCommon* tab,
+ void (*free_func)(void *),
+ void *ptr,
+ ErtsThrPrgrLaterOp *lop,
+ Uint size);
+
ERTS_GLB_INLINE void erts_db_free_nt(ErtsAlcType_t type,
void *ptr,
Uint size);
@@ -306,6 +312,26 @@ erts_db_free(ErtsAlcType_t type, DbTable *tab, void *ptr, Uint size)
}
ERTS_GLB_INLINE void
+erts_schedule_db_free(DbTableCommon* tab,
+ void (*free_func)(void *),
+ void *ptr,
+ ErtsThrPrgrLaterOp *lop,
+ Uint size)
+{
+ ASSERT(ptr != 0);
+ ASSERT(((void *) tab) != ptr);
+ ASSERT(size == ERTS_ALC_DBG_BLK_SZ(ptr));
+
+ /*
+ * We update table memory stats here as table may already be gone
+ * when 'free_func' is finally called.
+ */
+ ERTS_DB_ALC_MEM_UPDATE_((DbTable*)tab, size, 0);
+
+ erts_schedule_thr_prgr_later_cleanup_op(free_func, ptr, lop, size);
+}
+
+ERTS_GLB_INLINE void
erts_db_free_nt(ErtsAlcType_t type, void *ptr, Uint size)
{
ASSERT(ptr != 0);
diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c
index 37a299df35..a8e48bce1b 100644
--- a/erts/emulator/beam/erl_db_catree.c
+++ b/erts/emulator/beam/erl_db_catree.c
@@ -210,33 +210,32 @@ DbTableMethod db_catree =
* Internal CA tree related helper functions and macros
*/
-#define GET_ROUTE_NODE_KEY(node) (node->baseOrRoute.route.key.tpl[0])
-#define GET_BASE_NODE_LOCK(node) (&(node->baseOrRoute.base.lock))
-#define GET_ROUTE_NODE_LOCK(node) (&(node->baseOrRoute.route.lock))
+#define GET_ROUTE_NODE_KEY(node) (node->u.route.key.term)
+#define GET_BASE_NODE_LOCK(node) (&(node->u.base.lock))
+#define GET_ROUTE_NODE_LOCK(node) (&(node->u.route.lock))
/* Helpers for reading and writing shared atomic variables */
/* No memory barrier */
#define GET_ROOT(tb) ((DbTableCATreeNode*)erts_atomic_read_nob(&(tb->root)))
-#define GET_LEFT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->baseOrRoute.route.left)))
-#define GET_RIGHT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->baseOrRoute.route.right)))
+#define GET_LEFT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->u.route.left)))
+#define GET_RIGHT(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_nob(&(ca_tree_route_node->u.route.right)))
#define SET_ROOT(tb, v) erts_atomic_set_nob(&((tb)->root), (erts_aint_t)(v))
-#define SET_LEFT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->baseOrRoute.route.left), (erts_aint_t)(v));
-#define SET_RIGHT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->baseOrRoute.route.right), (erts_aint_t)(v));
+#define SET_LEFT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->u.route.left), (erts_aint_t)(v));
+#define SET_RIGHT(ca_tree_route_node, v) erts_atomic_set_nob(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v));
/* Release or acquire barriers */
#define GET_ROOT_ACQB(tb) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(tb->root)))
-#define GET_LEFT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->baseOrRoute.route.left)))
-#define GET_RIGHT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->baseOrRoute.route.right)))
+#define GET_LEFT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->u.route.left)))
+#define GET_RIGHT_ACQB(ca_tree_route_node) ((DbTableCATreeNode*)erts_atomic_read_acqb(&(ca_tree_route_node->u.route.right)))
#define SET_ROOT_RELB(tb, v) erts_atomic_set_relb(&((tb)->root), (erts_aint_t)(v))
-#define SET_LEFT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->baseOrRoute.route.left), (erts_aint_t)(v));
-#define SET_RIGHT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->baseOrRoute.route.right), (erts_aint_t)(v));
+#define SET_LEFT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.left), (erts_aint_t)(v));
+#define SET_RIGHT_RELB(ca_tree_route_node, v) erts_atomic_set_relb(&(ca_tree_route_node->u.route.right), (erts_aint_t)(v));
/* Compares a key to the key in a route node */
-static ERTS_INLINE Sint cmp_key_route(DbTableCommon * tb,
- Eterm key,
+static ERTS_INLINE Sint cmp_key_route(Eterm key,
DbTableCATreeNode *obj)
{
return CMP(key, GET_ROUTE_NODE_KEY(obj));
@@ -279,7 +278,7 @@ int less_than_two_elements(TreeDbTerm *root)
* Inserts a TreeDbTerm into a tree. Returns the new root.
*/
static ERTS_INLINE
-TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data,
+TreeDbTerm* insert_TreeDbTerm(DbTableCATree *tb,
TreeDbTerm *insert_to_root,
TreeDbTerm *value_to_insert) {
/* Non recursive insertion in AVL tree, building our own stack */
@@ -295,7 +294,7 @@ TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data,
int dir;
TreeDbTerm *p1, *p2, *p;
- key = GETKEY(common_table_data, value_to_insert->dbterm.tpl);
+ key = GETKEY(tb, value_to_insert->dbterm.tpl);
dstack[dpos++] = DIR_END;
for (;;)
@@ -305,7 +304,7 @@ TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data,
(*this)->balance = 0;
(*this)->left = (*this)->right = NULL;
break;
- } else if ((c = cmp_key(common_table_data, key, *this)) < 0) {
+ } else if ((c = cmp_key(&tb->common, key, *this)) < 0) {
/* go lefts */
dstack[dpos++] = DIR_LEFT;
tstack[tpos++] = this;
@@ -392,7 +391,7 @@ TreeDbTerm* insert_TreeDbTerm(DbTableCommon *common_table_data,
* left_wb and the tree containing the rest of the keys in the write
* back parameter right_wb.
*/
-static void split_tree(DbTableCommon *tb,
+static void split_tree(DbTableCATree *tb,
TreeDbTerm *root,
TreeDbTerm **split_key_node_wb,
TreeDbTerm **left_wb,
@@ -413,9 +412,7 @@ static void split_tree(DbTableCommon *tb,
split_node->left = NULL;
right_root = split_node->right;
split_node->right = NULL;
- right_root = insert_TreeDbTerm(tb,
- right_root,
- split_node);
+ right_root = insert_TreeDbTerm(tb, right_root, split_node);
*split_key_node_wb = split_node;
*left_wb = left_root;
*right_wb = right_root;
@@ -700,15 +697,17 @@ void runlock_base_node(DbTableCATreeBaseNode *base_node)
}
static ERTS_INLINE
-void lock_route_node(DbTableCATreeRouteNode *route_node)
+void lock_route_node(DbTableCATreeNode *route_node)
{
- erts_mtx_lock(&route_node->lock);
+ ASSERT(!route_node->is_base_node);
+ erts_mtx_lock(&route_node->u.route.lock);
}
static ERTS_INLINE
-void unlock_route_node(DbTableCATreeRouteNode *route_node)
+void unlock_route_node(DbTableCATreeNode *route_node)
{
- erts_mtx_unlock(&route_node->lock);
+ ASSERT(!route_node->is_base_node);
+ erts_mtx_unlock(&route_node->u.route.lock);
}
@@ -722,7 +721,6 @@ void unlock_route_node(DbTableCATreeRouteNode *route_node)
int retry; \
DbTableCATreeNode *current_node; \
DbTableCATreeNode *prev_node; \
- DbTableCommon* common_table_data = &tb->common; \
DbTableCATreeBaseNode *base_node; \
int current_level; \
(void)prev_node; \
@@ -734,13 +732,13 @@ void unlock_route_node(DbTableCATreeRouteNode *route_node)
while ( ! current_node->is_base_node ) { \
current_level = current_level + 1; \
prev_node = current_node; \
- if (cmp_key_route(common_table_data,key,current_node) < 0) { \
+ if (cmp_key_route(key,current_node) < 0) { \
current_node = GET_LEFT_ACQB(current_node); \
} else { \
current_node = GET_RIGHT_ACQB(current_node); \
} \
} \
- base_node = &current_node->baseOrRoute.base; \
+ base_node = &current_node->u.base; \
LOCK(base_node); \
if ( ! base_node->is_valid ) { \
/* Retry */ \
@@ -753,7 +751,7 @@ void unlock_route_node(DbTableCATreeRouteNode *route_node)
#define ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART \
if (base_node->lock_statistics > ERL_DB_CATREE_HIGH_CONTENTION_LIMIT \
&& current_level < ERL_DB_CATREE_MAX_ROUTE_NODE_LAYER_HEIGHT) { \
- split_catree(&tb->common, prev_node, current_node); \
+ split_catree(tb, prev_node, current_node); \
} else if (base_node->lock_statistics < ERL_DB_CATREE_LOW_CONTENTION_LIMIT) { \
join_catree(tb, prev_node, current_node); \
} else { \
@@ -784,141 +782,189 @@ void unlock_route_node(DbTableCATreeRouteNode *route_node)
}
+static ERTS_INLINE
+void copy_route_key(DbRouteKey* dst, Eterm key, Uint key_size)
+{
+ dst->size = key_size;
+ if (key_size != 0) {
+ Eterm* hp = &dst->heap[0];
+ ErlOffHeap tmp_offheap;
+ tmp_offheap.first = NULL;
+ dst->term = copy_struct(key, key_size, &hp, &tmp_offheap);
+ dst->oh = tmp_offheap.first;
+ }
+ else {
+ ASSERT(is_immed(key));
+ dst->term = key;
+ dst->oh = NULL;
+ }
+}
-static DbTableCATreeNode *create_catree_base_node(DbTableCATree *tb)
+static ERTS_INLINE
+void destroy_route_key(DbRouteKey* key)
{
- DbTableCATreeNode *new_base_node_container =
- erts_db_alloc(ERTS_ALC_T_DB_TABLE,
- (DbTable *) tb,
- sizeof(DbTableCATreeNode));
- DbTableCATreeBaseNode *new_base_node =
- &new_base_node_container->baseOrRoute.base;
+ if (key->oh) {
+ ErlOffHeap oh;
+ oh.first = key->oh;
+ erts_cleanup_offheap(&oh);
+ }
+}
+
+
+#ifdef ERTS_ENABLE_LOCK_CHECK
+# define sizeof_base_node(KEY_SZ) \
+ (offsetof(DbTableCATreeNode, u.base.lc_key.heap) \
+ + (KEY_SZ)*sizeof(Eterm))
+# define LC_ORDER(ORDER) ORDER
+#else
+# define sizeof_base_node(KEY_SZ) \
+ offsetof(DbTableCATreeNode, u.base.end_of_struct__)
+# define LC_ORDER(ORDER) NIL
+#endif
+
+static DbTableCATreeNode *create_base_node(DbTableCATree *tb,
+ TreeDbTerm* root,
+ Eterm lc_key)
+{
+ DbTableCATreeNode *p;
erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
- new_base_node_container->is_base_node = 1;
- new_base_node->root = NULL;
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ Eterm lc_key_size = size_object(lc_key);
+#endif
+ p = erts_db_alloc(ERTS_ALC_T_DB_TABLE, (DbTable *) tb,
+ sizeof_base_node(lc_key_size));
+
+ p->is_base_node = 1;
+ p->u.base.root = root;
if (tb->common.type & DB_FREQ_READ)
rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
if (erts_ets_rwmtx_spin_count >= 0)
rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count;
- erts_rwmtx_init_opt(&new_base_node->lock, &rwmtx_opt,
- "erl_db_catree_base_node", tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB);
- new_base_node->lock_statistics = 0;
- new_base_node->is_valid = 1;
- new_base_node->tab = (DbTable *) tb;
- return new_base_node_container;
+
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ copy_route_key(&p->u.base.lc_key, lc_key, lc_key_size);
+#endif
+ erts_rwmtx_init_opt(&p->u.base.lock, &rwmtx_opt,
+ "erl_db_catree_base_node",
+ lc_key,
+ ERTS_LOCK_FLAGS_CATEGORY_DB);
+ p->u.base.lock_statistics = 0;
+ p->u.base.is_valid = 1;
+ return p;
+}
+
+static ERTS_INLINE
+DbTableCATreeNode *create_wlocked_base_node(DbTableCATree *tb,
+ TreeDbTerm* root,
+ Eterm lc_key)
+{
+ DbTableCATreeNode* p = create_base_node(tb, root, lc_key);
+ ethr_rwmutex_rwlock(&p->u.base.lock.rwmtx);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ erts_lc_trylock_flg(-1, &p->u.base.lock.lc, ERTS_LOCK_OPTIONS_RDWR);
+#endif
+ return p;
+}
+
+
+static ERTS_INLINE Uint sizeof_route_node(Uint key_size)
+{
+ return (offsetof(DbTableCATreeNode, u.route.key.heap)
+ + key_size*sizeof(Eterm));
}
static DbTableCATreeNode*
-create_catree_route_node(DbTableCommon * common_table_data,
- DbTableCATreeNode *left,
- DbTableCATreeNode *right,
- DbTerm * keyTerm)
+create_route_node(DbTableCATree *tb,
+ DbTableCATreeNode *left,
+ DbTableCATreeNode *right,
+ DbTerm * keyTerm,
+ DbTableCATreeNode* lc_parent)
{
- Eterm* top;
- Eterm key = GETKEY(common_table_data,keyTerm->tpl);
+ Eterm key = GETKEY(tb,keyTerm->tpl);
int key_size = size_object(key);
- Uint offset = offsetof(DbTableCATreeNode,baseOrRoute) +
- offsetof(DbTableCATreeRouteNode,key);
- size_t route_node_container_size =
- offset +
- sizeof(DbTerm) +
- sizeof(Eterm)*key_size;
- ErlOffHeap tmp_offheap;
- byte* new_route_node_container_bytes =
- erts_db_alloc(ERTS_ALC_T_DB_TABLE,
- (DbTable *) common_table_data,
- route_node_container_size);
- DbTerm* newp = (DbTerm*) (new_route_node_container_bytes + offset);
- DbTableCATreeNode *new_route_node_container =
- (DbTableCATreeNode*)new_route_node_container_bytes;
- DbTableCATreeRouteNode *new_route_node =
- &new_route_node_container->baseOrRoute.route;
- new_route_node->tab = (DbTable *)common_table_data;
- if (key_size != 0) {
- newp->size = key_size;
- top = &newp->tpl[1];
- tmp_offheap.first = NULL;
- newp->tpl[0] = copy_struct(key, key_size, &top, &tmp_offheap);
- newp->first_oh = tmp_offheap.first;
- } else {
- newp->size = key_size;
- newp->first_oh = NULL;
- newp->tpl[0] = key;
- }
- new_route_node_container->is_base_node = 0;
- new_route_node->is_valid = 1;
- erts_atomic_init_nob(&(new_route_node->left), (erts_aint_t)left);
- erts_atomic_init_nob(&(new_route_node->right), (erts_aint_t)right);
- erts_mtx_init(&new_route_node->lock, "erl_db_catree_route_node",
- NIL, ERTS_LOCK_FLAGS_CATEGORY_DB);
- return new_route_node_container;
+ DbTableCATreeNode* p = erts_db_alloc(ERTS_ALC_T_DB_TABLE,
+ (DbTable *) tb,
+ sizeof_route_node(key_size));
+
+ copy_route_key(&p->u.route.key, key, key_size);
+ p->is_base_node = 0;
+ p->u.route.is_valid = 1;
+ erts_atomic_init_nob(&p->u.route.left, (erts_aint_t)left);
+ erts_atomic_init_nob(&p->u.route.right, (erts_aint_t)right);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ /* Route node lock order is inverse tree depth (from leafs toward root) */
+ p->u.route.lc_order = (lc_parent == NULL ? MAX_SMALL :
+ lc_parent->u.route.lc_order - 1);
+ /*
+ * This assert may eventually fail as we don't increase 'lc_order' in join
+ * operations when route nodes move up in the tree.
+ * Tough luck if you run a lock-checking VM for such a long time on 32-bit.
+ */
+ ERTS_LC_ASSERT(p->u.route.lc_order >= 0);
+#endif
+ erts_mtx_init(&p->u.route.lock, "erl_db_catree_route_node",
+ LC_ORDER(make_small(p->u.route.lc_order)),
+ ERTS_LOCK_FLAGS_CATEGORY_DB);
+ return p;
}
-static void free_catree_base_node(void* base_node_container_ptr)
+static void do_free_base_node(void* vptr)
{
- DbTableCATreeNode *base_node_container =
- (DbTableCATreeNode *)base_node_container_ptr;
- DbTableCATreeBaseNode *base_node =
- &base_node_container->baseOrRoute.base;
- erts_rwmtx_destroy(&base_node->lock);
- erts_db_free(ERTS_ALC_T_DB_TABLE,
- base_node->tab, base_node_container,
- sizeof(DbTableCATreeNode));
-}
-
-static void free_catree_routing_node(void *route_node_container_ptr)
-{
- size_t route_node_container_size;
- byte* route_node_container_bytes = route_node_container_ptr;
- DbTableCATreeNode *route_node_container =
- (DbTableCATreeNode *)route_node_container_bytes;
- DbTableCATreeRouteNode *route_node =
- &route_node_container->baseOrRoute.route;
- int key_size = route_node->key.size;
- Uint offset = offsetof(DbTableCATreeNode,baseOrRoute) +
- offsetof(DbTableCATreeRouteNode,key);
- ErlOffHeap tmp_oh;
- DbTerm* db_term = (DbTerm*) (route_node_container_bytes + offset);
- erts_mtx_destroy(&route_node->lock);
- route_node_container_size =
- offset +
- sizeof(DbTerm) +
- sizeof(Eterm)*key_size;
- if (key_size != 0) {
- tmp_oh.first = db_term->first_oh;
- erts_cleanup_offheap(&tmp_oh);
- }
- erts_db_free(ERTS_ALC_T_DB_TABLE,
- route_node->tab,
- route_node_container,
- route_node_container_size);
+ DbTableCATreeNode *p = (DbTableCATreeNode *)vptr;
+ ASSERT(p->is_base_node);
+ erts_rwmtx_destroy(&p->u.base.lock);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ destroy_route_key(&p->u.base.lc_key);
+#endif
+ erts_free(ERTS_ALC_T_DB_TABLE, p);
+}
+
+static void free_catree_base_node(DbTableCATree* tb, DbTableCATreeNode* p)
+{
+ ASSERT(p->is_base_node);
+ ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_base_node(p->u.base.lc_key.size), 0);
+ do_free_base_node(p);
+}
+
+static void do_free_route_node(void *vptr)
+{
+ DbTableCATreeNode *p = (DbTableCATreeNode *)vptr;
+ ASSERT(!p->is_base_node);
+ erts_mtx_destroy(&p->u.route.lock);
+ destroy_route_key(&p->u.route.key);
+ erts_free(ERTS_ALC_T_DB_TABLE, p);
}
+static void free_catree_route_node(DbTableCATree* tb, DbTableCATreeNode* p)
+{
+ ASSERT(!p->is_base_node);
+ ERTS_DB_ALC_MEM_UPDATE_(tb, sizeof_route_node(p->u.route.key.size), 0);
+ do_free_route_node(p);
+}
+
+
/*
* Returns the parent routing node of the specified
- * route_node_container if such a routing node exists or NULL if
- * route_node_container is attached to the root
+ * route node 'child' if such a parent exists
+ * or NULL if 'child' is attached to the root.
*/
static ERTS_INLINE DbTableCATreeNode *
parent_of(DbTableCATree *tb,
- DbTableCATreeNode *route_node_container)
+ DbTableCATreeNode *child)
{
+ Eterm key = GET_ROUTE_NODE_KEY(child);
+ DbTableCATreeNode *current = GET_ROOT_ACQB(tb);
+ DbTableCATreeNode *prev = NULL;
- Eterm key = GET_ROUTE_NODE_KEY(route_node_container);
- DbTableCATreeNode *current_node = GET_ROOT_ACQB(tb);
- DbTableCATreeNode *prev_node = NULL;
- if (current_node == route_node_container) {
- return NULL;
- }
- while (current_node != route_node_container) {
- prev_node = current_node;
- if (cmp_key_route((DbTableCommon *)tb, key, current_node) < 0) {
- current_node = GET_LEFT_ACQB(current_node);
+ while (current != child) {
+ prev = current;
+ if (cmp_key_route(key, current) < 0) {
+ current = GET_LEFT_ACQB(current);
} else {
- current_node = GET_RIGHT_ACQB(current_node);
+ current = GET_RIGHT_ACQB(current);
}
}
- return prev_node;
+ return prev;
}
@@ -953,11 +999,7 @@ leftmost_route_node(DbTableCATreeNode *root)
prev_node = node;
node = GET_LEFT_ACQB(node);
}
- if (prev_node == NULL) {
- return NULL;
- } else {
- return prev_node;
- }
+ return prev_node;
}
static ERTS_INLINE DbTableCATreeNode*
@@ -969,11 +1011,7 @@ rightmost_route_node(DbTableCATreeNode *root)
prev_node = node;
node = GET_RIGHT_ACQB(node);
}
- if (prev_node == NULL) {
- return NULL;
- } else {
- return prev_node;
- }
+ return prev_node;
}
static ERTS_INLINE DbTableCATreeNode*
@@ -988,8 +1026,7 @@ leftmost_base_node_and_path(DbTableCATreeNode *root, CATreeNodeStack * stack)
}
static ERTS_INLINE DbTableCATreeNode*
-get_next_base_node_and_path(DbTableCommon *common_table_data,
- DbTableCATreeNode *base_node,
+get_next_base_node_and_path(DbTableCATreeNode *base_node,
CATreeNodeStack *stack)
{
if (EMPTY_NODE(stack)) { /* The parent of b is the root */
@@ -1001,11 +1038,11 @@ get_next_base_node_and_path(DbTableCommon *common_table_data,
stack);
} else {
Eterm pkey =
- TOP_NODE(stack)->baseOrRoute.route.key.tpl[0]; /* pKey = key of parent */
+ TOP_NODE(stack)->u.route.key.term; /* pKey = key of parent */
POP_NODE(stack);
while (!EMPTY_NODE(stack)) {
- if (TOP_NODE(stack)->baseOrRoute.route.is_valid &&
- cmp_key_route(common_table_data, pkey, TOP_NODE(stack)) <= 0) {
+ if (TOP_NODE(stack)->u.route.is_valid &&
+ cmp_key_route(pkey, TOP_NODE(stack)) <= 0) {
return leftmost_base_node_and_path(GET_RIGHT_ACQB(TOP_NODE(stack)), stack);
} else {
POP_NODE(stack);
@@ -1034,25 +1071,23 @@ lock_first_base_node(DbTable *tbl,
CATreeNodeStack *search_stack_ptr,
CATreeNodeStack *locked_base_nodes_stack_ptr)
{
- int retry;
DbTableCATreeNode *current_node;
DbTableCATreeBaseNode *base_node;
DbTableCATree* tb = &tbl->catree;
- do {
- retry = 0;
+ while (1) {
current_node = GET_ROOT_ACQB(tb);
while ( ! current_node->is_base_node ) {
PUSH_NODE(search_stack_ptr, current_node);
current_node = GET_LEFT_ACQB(current_node);
}
- base_node = &current_node->baseOrRoute.base;
+ base_node = &current_node->u.base;
rlock_base_node(base_node);
- if ( ! base_node->is_valid ) {
- /* Retry */
- runlock_base_node(base_node);
- retry = 1;
- }
- } while(retry);
+ if (base_node->is_valid)
+ break;
+ /* Retry */
+ runlock_base_node(base_node);
+ search_stack_ptr->pos = 0;
+ }
push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
return current_node;
}
@@ -1066,19 +1101,20 @@ find_and_lock_next_base_node_and_path(DbTable *tbl,
DbTableCATreeNode *current_node;
DbTableCATreeBaseNode *base_node;
CATreeNodeStack * tmp_stack_ptr;
- DbTableCommon* common_table_data;
- retry_find_and_lock_next_base_node:
- current_node = TOP_NODE(locked_base_nodes_stack_ptr);
- common_table_data = &tbl->common;
- clone_stack(*search_stack_ptr_ptr, *search_stack_copy_ptr_ptr);
- current_node =
- get_next_base_node_and_path(common_table_data, current_node, *search_stack_ptr_ptr);
- if (current_node == NULL) {
- return NULL;
- }
- base_node = &current_node->baseOrRoute.base;
- rlock_base_node(base_node);
- if ( ! base_node->is_valid ) {
+
+ while (1) {
+ current_node = TOP_NODE(locked_base_nodes_stack_ptr);
+ clone_stack(*search_stack_ptr_ptr, *search_stack_copy_ptr_ptr);
+ current_node =
+ get_next_base_node_and_path(current_node, *search_stack_ptr_ptr);
+ if (current_node == NULL) {
+ return NULL;
+ }
+ base_node = &current_node->u.base;
+ rlock_base_node(base_node);
+ if (base_node->is_valid)
+ break;
+
/* Retry */
runlock_base_node(base_node);
/* Revert to previous state */
@@ -1086,10 +1122,9 @@ find_and_lock_next_base_node_and_path(DbTable *tbl,
tmp_stack_ptr = *search_stack_ptr_ptr;
*search_stack_ptr_ptr = *search_stack_copy_ptr_ptr;
*search_stack_copy_ptr_ptr = tmp_stack_ptr;
- goto retry_find_and_lock_next_base_node;
- } else {
- push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
}
+
+ push_node_dyn_array(tbl, locked_base_nodes_stack_ptr, current_node);
return base_node;
}
@@ -1102,7 +1137,7 @@ void unlock_and_release_locked_base_node_stack(DbTable *tbl,
int i;
for (i = 0; i < locked_base_nodes_stack_ptr->pos; i++) {
current_node = locked_base_nodes_stack_ptr->array[i];
- base_node = &current_node->baseOrRoute.base;
+ base_node = &current_node->u.base;
if (locked_base_nodes_stack_ptr->pos > 1) {
base_node->lock_statistics = /* This is not atomic which is fine as */
base_node->lock_statistics + /* correctness does not depend on that. */
@@ -1166,19 +1201,18 @@ lock_base_node_with_key(DbTable *tbl,
DbTableCATreeNode *current_node;
DbTableCATreeBaseNode *base_node;
DbTableCATree* tb = &tbl->catree;
- DbTableCommon* common_table_data = &tbl->common;
do {
retry = 0;
current_node = GET_ROOT_ACQB(tb);
while ( ! current_node->is_base_node ) {
PUSH_NODE(search_stack_ptr, current_node);
- if( cmp_key_route(common_table_data,key,current_node) < 0 ) {
+ if( cmp_key_route(key,current_node) < 0 ) {
current_node = GET_LEFT_ACQB(current_node);
} else {
current_node = GET_RIGHT_ACQB(current_node);
}
}
- base_node = &current_node->baseOrRoute.base;
+ base_node = &current_node->u.base;
rlock_base_node(base_node);
if ( ! base_node->is_valid ) {
/* Retry */
@@ -1196,100 +1230,90 @@ lock_base_node_with_key(DbTable *tbl,
* node to join with.
*/
static DbTableCATreeNode*
-erl_db_catree_force_join_right(DbTableCommon *common_table_data,
- DbTableCATreeNode *parent_container,
- DbTableCATreeNode *base_container,
+erl_db_catree_force_join_right(DbTableCATree *tb,
+ DbTableCATreeNode *parent,
+ DbTableCATreeNode *thiz,
DbTableCATreeNode **result_parent_wb)
{
- DbTableCATreeRouteNode *parent;
- DbTableCATreeNode *gparent_container;
- DbTableCATreeRouteNode *gparent;
- DbTableCATreeBaseNode *base = &base_container->baseOrRoute.base;
- DbTableCATree *tb = (DbTableCATree *)common_table_data;
- DbTableCATreeNode *neighbor_base_container;
- DbTableCATreeBaseNode *neighbor_base;
- DbTableCATreeNode *new_neighbor_base;
- DbTableCATreeNode *neighbor_base_parent;
- int neighbour_not_valid;
- if (parent_container == NULL) {
+ DbTableCATreeNode *gparent;
+ DbTableCATreeNode *neighbor;
+ DbTableCATreeNode *new_neighbor;
+ DbTableCATreeNode *neighbor_parent;
+ TreeDbTerm* new_root;
+
+ if (parent == NULL) {
return NULL;
}
- parent = &parent_container->baseOrRoute.route;
- do {
- neighbor_base_container = leftmost_base_node(GET_RIGHT_ACQB(parent_container));
- neighbor_base = &neighbor_base_container->baseOrRoute.base;
- wlock_base_node_no_stats(neighbor_base);
- neighbour_not_valid = !neighbor_base->is_valid;
- if (neighbour_not_valid) {
- wunlock_base_node(neighbor_base);
- }
- } while (neighbour_not_valid);
+ ASSERT(thiz == GET_LEFT(parent));
+ while (1) {
+ neighbor = leftmost_base_node(GET_RIGHT_ACQB(parent));
+ wlock_base_node_no_stats(&neighbor->u.base);
+ if (neighbor->u.base.is_valid)
+ break;
+ wunlock_base_node(&neighbor->u.base);
+ }
lock_route_node(parent);
- parent->is_valid = 0;
- neighbor_base->is_valid = 0;
- base->is_valid = 0;
- gparent = NULL;
- gparent_container = NULL;
- do {
- if (gparent != NULL) {
- unlock_route_node(gparent);
- }
- gparent_container = parent_of(tb, parent_container);
- if (gparent_container != NULL) {
- gparent = &gparent_container->baseOrRoute.route;
- lock_route_node(gparent);
- } else {
- gparent = NULL;
- }
- } while (gparent != NULL && !gparent->is_valid);
+ parent->u.route.is_valid = 0;
+ neighbor->u.base.is_valid = 0;
+ thiz->u.base.is_valid = 0;
+ while (1) {
+ gparent = parent_of(tb, parent);
+ if (gparent == NULL)
+ break;
+ lock_route_node(gparent);
+ if (gparent->u.route.is_valid)
+ break;
+ unlock_route_node(gparent);
+ }
if (gparent == NULL) {
- SET_ROOT_RELB(tb, GET_RIGHT(parent_container));
- } else if (GET_LEFT(gparent_container) == parent_container) {
- SET_LEFT_RELB(gparent_container, GET_RIGHT(parent_container));
+ SET_ROOT_RELB(tb, GET_RIGHT(parent));
+ } else if (GET_LEFT(gparent) == parent) {
+ SET_LEFT_RELB(gparent, GET_RIGHT(parent));
} else {
- SET_RIGHT_RELB(gparent_container, GET_RIGHT(parent_container));
+ SET_RIGHT_RELB(gparent, GET_RIGHT(parent));
}
unlock_route_node(parent);
if (gparent != NULL) {
unlock_route_node(gparent);
}
- new_neighbor_base = create_catree_base_node(tb);
- new_neighbor_base->baseOrRoute.base.root =
- join_trees(base->root, neighbor_base->root);
- wlock_base_node_no_stats(&(new_neighbor_base->baseOrRoute.base));
- neighbor_base_parent = NULL;
- if (GET_RIGHT(parent_container) == neighbor_base_container) {
- neighbor_base_parent = gparent_container;
+
+ new_root = join_trees(thiz->u.base.root, neighbor->u.base.root);
+ new_neighbor = create_wlocked_base_node(tb, new_root,
+ LC_ORDER(thiz->u.base.lc_key.term));
+
+ if (GET_RIGHT(parent) == neighbor) {
+ neighbor_parent = gparent;
} else {
- neighbor_base_parent =
- leftmost_route_node(GET_RIGHT(parent_container));
+ neighbor_parent = leftmost_route_node(GET_RIGHT(parent));
}
- if(neighbor_base_parent == NULL) {
- SET_ROOT_RELB(tb, new_neighbor_base);
- } else if (GET_LEFT(neighbor_base_parent) == neighbor_base_container) {
- SET_LEFT_RELB(neighbor_base_parent, new_neighbor_base);
+ if(neighbor_parent == NULL) {
+ SET_ROOT_RELB(tb, new_neighbor);
+ } else if (GET_LEFT(neighbor_parent) == neighbor) {
+ SET_LEFT_RELB(neighbor_parent, new_neighbor);
} else {
- SET_RIGHT_RELB(neighbor_base_parent, new_neighbor_base);
+ SET_RIGHT_RELB(neighbor_parent, new_neighbor);
}
- wunlock_base_node(base);
- wunlock_base_node(neighbor_base);
+ wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(&neighbor->u.base);
/* Free the parent and base */
- erts_schedule_thr_prgr_later_op(free_catree_routing_node,
- parent_container,
- &parent->free_item);
- erts_schedule_thr_prgr_later_op(free_catree_base_node,
- base_container,
- &base->free_item);
- erts_schedule_thr_prgr_later_op(free_catree_base_node,
- neighbor_base_container,
- &neighbor_base->free_item);
-
- if (parent_container == neighbor_base_container) {
- *result_parent_wb = gparent_container;
- } else {
- *result_parent_wb = neighbor_base_parent;
- }
- return new_neighbor_base;
+ erts_schedule_db_free(&tb->common,
+ do_free_route_node,
+ parent,
+ &parent->u.route.free_item,
+ sizeof_route_node(parent->u.route.key.size));
+ erts_schedule_db_free(&tb->common,
+ do_free_base_node,
+ thiz,
+ &thiz->u.base.free_item,
+ sizeof_base_node(thiz->u.base.lc_key.size));
+ erts_schedule_db_free(&tb->common,
+ do_free_base_node,
+ neighbor,
+ &neighbor->u.base.free_item,
+ sizeof_base_node(neighbor->u.base.lc_key.size));
+
+ *result_parent_wb = neighbor_parent;
+ return new_neighbor;
}
/*
@@ -1298,243 +1322,232 @@ erl_db_catree_force_join_right(DbTableCommon *common_table_data,
* locked state.
*/
static DbTableCATreeNode *
-merge_to_one_locked_base_node(DbTableCommon * common_table_data)
+merge_to_one_locked_base_node(DbTableCATree* tb)
{
- DbTableCATreeNode *parent_container;
- DbTableCATreeNode *new_parent_container;
- DbTableCATree *tb = (DbTableCATree *)common_table_data;
- DbTableCATreeNode *base_container;
- DbTableCATreeNode *new_base_container;
+ DbTableCATreeNode *parent;
+ DbTableCATreeNode *new_parent;
+ DbTableCATreeNode *base;
+ DbTableCATreeNode *new_base;
int is_not_valid;
/* Find first base node */
do {
- parent_container = NULL;
- base_container = GET_ROOT_ACQB(tb);
- while ( ! base_container->is_base_node ) {
- parent_container = base_container;
- base_container = GET_LEFT_ACQB(base_container);
+ parent = NULL;
+ base = GET_ROOT_ACQB(tb);
+ while ( ! base->is_base_node ) {
+ parent = base;
+ base = GET_LEFT_ACQB(base);
}
- wlock_base_node_no_stats(&(base_container->baseOrRoute.base));
- is_not_valid = ! base_container->baseOrRoute.base.is_valid;
+ wlock_base_node_no_stats(&(base->u.base));
+ is_not_valid = ! base->u.base.is_valid;
if (is_not_valid) {
- wunlock_base_node(&(base_container->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
}
} while(is_not_valid);
do {
- new_base_container = erl_db_catree_force_join_right(common_table_data,
- parent_container,
- base_container,
- &new_parent_container);
- if (new_base_container != NULL) {
- base_container = new_base_container;
- parent_container = new_parent_container;
+ new_base = erl_db_catree_force_join_right(tb,
+ parent,
+ base,
+ &new_parent);
+ if (new_base != NULL) {
+ base = new_base;
+ parent = new_parent;
}
- } while(new_base_container != NULL);
- return base_container;
+ } while(new_base != NULL);
+ return base;
}
static void join_catree(DbTableCATree *tb,
- DbTableCATreeNode *parent_container,
- DbTableCATreeNode *base_container)
-{
- DbTableCATreeRouteNode *parent;
- DbTableCATreeNode *gparent_container;
- DbTableCATreeRouteNode *gparent;
- DbTableCATreeBaseNode *base = &base_container->baseOrRoute.base;
- DbTableCATreeNode *neighbor_base_container;
- DbTableCATreeBaseNode *neighbor_base;
- DbTableCATreeNode *new_neighbor_base;
- DbTableCATreeNode *neighbor_base_parent;
- if (parent_container == NULL) {
- base->lock_statistics = 0;
- wunlock_base_node(base);
+ DbTableCATreeNode *parent,
+ DbTableCATreeNode *thiz)
+{
+ DbTableCATreeNode *gparent;
+ DbTableCATreeNode *neighbor;
+ DbTableCATreeNode *new_neighbor;
+ DbTableCATreeNode *neighbor_parent;
+
+ ASSERT(thiz->is_base_node);
+ if (parent == NULL) {
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(&thiz->u.base);
return;
}
- parent = &parent_container->baseOrRoute.route;
- if (GET_LEFT(parent_container) == base_container) {
- neighbor_base_container = leftmost_base_node(GET_RIGHT_ACQB(parent_container));
- neighbor_base = &neighbor_base_container->baseOrRoute.base;
- if (try_wlock_base_node(neighbor_base)) {
+ ASSERT(!parent->is_base_node);
+ if (GET_LEFT(parent) == thiz) {
+ neighbor = leftmost_base_node(GET_RIGHT_ACQB(parent));
+ if (try_wlock_base_node(&neighbor->u.base)) {
/* Failed to acquire lock */
- base->lock_statistics = 0;
- wunlock_base_node(base);
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(&thiz->u.base);
return;
- } else if (!neighbor_base->is_valid) {
- base->lock_statistics = 0;
- wunlock_base_node(base);
- wunlock_base_node(neighbor_base);
+ } else if (!neighbor->u.base.is_valid) {
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(&neighbor->u.base);
return;
} else {
lock_route_node(parent);
- parent->is_valid = 0;
- neighbor_base->is_valid = 0;
- base->is_valid = 0;
+ parent->u.route.is_valid = 0;
+ neighbor->u.base.is_valid = 0;
+ thiz->u.base.is_valid = 0;
gparent = NULL;
- gparent_container = NULL;
do {
if (gparent != NULL) {
unlock_route_node(gparent);
}
- gparent_container = parent_of(tb, parent_container);
- if (gparent_container != NULL) {
- gparent = &gparent_container->baseOrRoute.route;
+ gparent = parent_of(tb, parent);
+ if (gparent != NULL)
lock_route_node(gparent);
- } else {
- gparent = NULL;
- }
- } while (gparent != NULL && !gparent->is_valid);
+ } while (gparent != NULL && !gparent->u.route.is_valid);
+
if (gparent == NULL) {
- SET_ROOT_RELB(tb, GET_RIGHT(parent_container));
- } else if (GET_LEFT(gparent_container) == parent_container) {
- SET_LEFT_RELB(gparent_container, GET_RIGHT(parent_container));
+ SET_ROOT_RELB(tb, GET_RIGHT(parent));
+ } else if (GET_LEFT(gparent) == parent) {
+ SET_LEFT_RELB(gparent, GET_RIGHT(parent));
} else {
- SET_RIGHT_RELB(gparent_container, GET_RIGHT(parent_container));
+ SET_RIGHT_RELB(gparent, GET_RIGHT(parent));
}
unlock_route_node(parent);
if (gparent != NULL) {
unlock_route_node(gparent);
}
- new_neighbor_base = create_catree_base_node(tb);
- new_neighbor_base->baseOrRoute.base.root =
- join_trees(base->root, neighbor_base->root);
- neighbor_base_parent = NULL;
- if (GET_RIGHT(parent_container) == neighbor_base_container) {
- neighbor_base_parent = gparent_container;
+ {
+ TreeDbTerm* new_root = join_trees(thiz->u.base.root,
+ neighbor->u.base.root);
+ new_neighbor = create_base_node(tb, new_root,
+ LC_ORDER(thiz->u.base.lc_key.term));
+ }
+ if (GET_RIGHT(parent) == neighbor) {
+ neighbor_parent = gparent;
} else {
- neighbor_base_parent =
- leftmost_route_node(GET_RIGHT(parent_container));
+ neighbor_parent = leftmost_route_node(GET_RIGHT(parent));
}
}
} else { /* Symetric case */
- neighbor_base_container = rightmost_base_node(GET_LEFT_ACQB(parent_container));
- neighbor_base = &neighbor_base_container->baseOrRoute.base;
- if (try_wlock_base_node(neighbor_base)) {
+ ASSERT(GET_RIGHT(parent) == thiz);
+ neighbor = rightmost_base_node(GET_LEFT_ACQB(parent));
+ if (try_wlock_base_node(&neighbor->u.base)) {
/* Failed to acquire lock */
- base->lock_statistics = 0;
- wunlock_base_node(base);
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(&thiz->u.base);
return;
- } else if (!neighbor_base->is_valid) {
- base->lock_statistics = 0;
- wunlock_base_node(base);
- wunlock_base_node(neighbor_base);
+ } else if (!neighbor->u.base.is_valid) {
+ thiz->u.base.lock_statistics = 0;
+ wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(&neighbor->u.base);
return;
} else {
lock_route_node(parent);
- parent->is_valid = 0;
- neighbor_base->is_valid = 0;
- base->is_valid = 0;
+ parent->u.route.is_valid = 0;
+ neighbor->u.base.is_valid = 0;
+ thiz->u.base.is_valid = 0;
gparent = NULL;
- gparent_container = NULL;
do {
if (gparent != NULL) {
unlock_route_node(gparent);
}
- gparent_container = parent_of(tb, parent_container);
- if (gparent_container != NULL) {
- gparent = &gparent_container->baseOrRoute.route;
+ gparent = parent_of(tb, parent);
+ if (gparent != NULL) {
lock_route_node(gparent);
} else {
gparent = NULL;
}
- } while (gparent != NULL && !gparent->is_valid);
+ } while (gparent != NULL && !gparent->u.route.is_valid);
if (gparent == NULL) {
- SET_ROOT_RELB(tb, GET_LEFT(parent_container));
- } else if (GET_RIGHT(gparent_container) == parent_container) {
- SET_RIGHT_RELB(gparent_container, GET_LEFT(parent_container));
+ SET_ROOT_RELB(tb, GET_LEFT(parent));
+ } else if (GET_RIGHT(gparent) == parent) {
+ SET_RIGHT_RELB(gparent, GET_LEFT(parent));
} else {
- SET_LEFT_RELB(gparent_container, GET_LEFT(parent_container));
+ SET_LEFT_RELB(gparent, GET_LEFT(parent));
}
unlock_route_node(parent);
if (gparent != NULL) {
unlock_route_node(gparent);
}
- new_neighbor_base = create_catree_base_node(tb);
- new_neighbor_base->baseOrRoute.base.root =
- join_trees(neighbor_base->root, base->root);
- neighbor_base_parent = NULL;
- if (GET_LEFT(parent_container) == neighbor_base_container) {
- neighbor_base_parent = gparent_container;
+ {
+ TreeDbTerm* new_root = join_trees(neighbor->u.base.root,
+ thiz->u.base.root);
+ new_neighbor = create_base_node(tb, new_root,
+ LC_ORDER(thiz->u.base.lc_key.term));
+ }
+ if (GET_LEFT(parent) == neighbor) {
+ neighbor_parent = gparent;
} else {
- neighbor_base_parent =
- rightmost_route_node(GET_LEFT(parent_container));
+ neighbor_parent =
+ rightmost_route_node(GET_LEFT(parent));
}
}
}
/* Link in new neighbor and free nodes that are no longer in the tree */
- if (neighbor_base_parent == NULL) {
- SET_ROOT_RELB(tb, new_neighbor_base);
- } else if (GET_LEFT(neighbor_base_parent) == neighbor_base_container) {
- SET_LEFT_RELB(neighbor_base_parent, new_neighbor_base);
+ if (neighbor_parent == NULL) {
+ SET_ROOT_RELB(tb, new_neighbor);
+ } else if (GET_LEFT(neighbor_parent) == neighbor) {
+ SET_LEFT_RELB(neighbor_parent, new_neighbor);
} else {
- SET_RIGHT_RELB(neighbor_base_parent, new_neighbor_base);
+ SET_RIGHT_RELB(neighbor_parent, new_neighbor);
}
- wunlock_base_node(base);
- wunlock_base_node(neighbor_base);
+ wunlock_base_node(&thiz->u.base);
+ wunlock_base_node(&neighbor->u.base);
/* Free the parent and base */
- erts_schedule_thr_prgr_later_op(free_catree_routing_node,
- parent_container,
- &parent->free_item);
- erts_schedule_thr_prgr_later_op(free_catree_base_node,
- base_container,
- &base->free_item);
- erts_schedule_thr_prgr_later_op(free_catree_base_node,
- neighbor_base_container,
- &neighbor_base->free_item);
-}
-
-
-static void split_catree(DbTableCommon *tb,
- DbTableCATreeNode *parent_container,
- DbTableCATreeNode *base_container) {
+ erts_schedule_db_free(&tb->common,
+ do_free_route_node,
+ parent,
+ &parent->u.route.free_item,
+ sizeof_route_node(parent->u.route.key.size));
+ erts_schedule_db_free(&tb->common,
+ do_free_base_node,
+ thiz,
+ &thiz->u.base.free_item,
+ sizeof_base_node(thiz->u.base.lc_key.size));
+ erts_schedule_db_free(&tb->common,
+ do_free_base_node,
+ neighbor,
+ &neighbor->u.base.free_item,
+ sizeof_base_node(neighbor->u.base.lc_key.size));
+}
+
+static void split_catree(DbTableCATree *tb,
+ DbTableCATreeNode* ERTS_RESTRICT parent,
+ DbTableCATreeNode* ERTS_RESTRICT base) {
TreeDbTerm *splitOutWriteBack;
- TreeDbTerm *leftWriteBack;
- TreeDbTerm *rightWriteBack;
- DbTableCATreeNode *left_base_node;
- DbTableCATreeNode *right_base_node;
- DbTableCATreeNode *routing_node_container;
- DbTableCATreeBaseNode *base = &base_container->baseOrRoute.base;
- DbTableCATreeRouteNode *parent;
- if (parent_container == NULL) {
- parent = NULL;
- } else {
- parent = &parent_container->baseOrRoute.route;
- }
+ DbTableCATreeNode* ERTS_RESTRICT new_left;
+ DbTableCATreeNode* ERTS_RESTRICT new_right;
+ DbTableCATreeNode* ERTS_RESTRICT new_route;
- if (less_than_two_elements(base->root)) {
- base->lock_statistics = 0;
- wunlock_base_node(base);
+ if (less_than_two_elements(base->u.base.root)) {
+ base->u.base.lock_statistics = 0;
+ wunlock_base_node(&base->u.base);
return;
} else {
- /* Split the tree */
- split_tree(tb,
- base->root,
- &splitOutWriteBack,
- &leftWriteBack,
- &rightWriteBack);
- /* Create new base nodes */
- left_base_node =
- create_catree_base_node((DbTableCATree*)tb);
- right_base_node =
- create_catree_base_node((DbTableCATree*)tb);
- left_base_node->baseOrRoute.base.root = leftWriteBack;
- right_base_node->baseOrRoute.base.root = rightWriteBack;
- routing_node_container = create_catree_route_node(tb,
- left_base_node,
- right_base_node,
- &splitOutWriteBack->dbterm);
+ TreeDbTerm *left_tree;
+ TreeDbTerm *right_tree;
+
+ split_tree(tb, base->u.base.root, &splitOutWriteBack,
+ &left_tree, &right_tree);
+
+ new_left = create_base_node(tb, left_tree,
+ LC_ORDER(GETKEY(tb, left_tree->dbterm.tpl)));
+ new_right = create_base_node(tb, right_tree,
+ LC_ORDER(GETKEY(tb, right_tree->dbterm.tpl)));
+ new_route = create_route_node(tb,
+ new_left,
+ new_right,
+ &splitOutWriteBack->dbterm,
+ parent);
if (parent == NULL) {
- SET_ROOT_RELB((DbTableCATree*)tb, routing_node_container);
- } else if(GET_LEFT(parent_container) == base_container) {
- SET_LEFT_RELB(parent_container, routing_node_container);
+ SET_ROOT_RELB(tb, new_route);
+ } else if(GET_LEFT(parent) == base) {
+ SET_LEFT_RELB(parent, new_route);
} else {
- SET_RIGHT_RELB(parent_container, routing_node_container);
+ SET_RIGHT_RELB(parent, new_route);
}
- base->is_valid = 0;
- wunlock_base_node(base);
- erts_schedule_thr_prgr_later_op(free_catree_base_node,
- base_container,
- &base->free_item);
+ base->u.base.is_valid = 0;
+ wunlock_base_node(&base->u.base);
+ erts_schedule_db_free(&tb->common,
+ do_free_base_node,
+ base,
+ &base->u.base.free_item,
+ sizeof_base_node(base->u.base.lc_key.size));
}
}
@@ -1546,7 +1559,7 @@ static void catree_add_base_node_to_free_list(
DbTableCATree *tb,
DbTableCATreeNode *base_node_container)
{
- base_node_container->baseOrRoute.base.next =
+ base_node_container->u.base.next =
tb->base_nodes_to_free_list;
tb->base_nodes_to_free_list = base_node_container;
}
@@ -1557,7 +1570,7 @@ static void catree_deque_base_node_from_free_list(DbTableCATree *tb)
return; /* List empty */
} else {
DbTableCATreeNode *first = tb->base_nodes_to_free_list;
- tb->base_nodes_to_free_list = first->baseOrRoute.base.next;
+ tb->base_nodes_to_free_list = first->u.base.next;
}
}
@@ -1596,7 +1609,7 @@ static SWord do_free_routing_nodes_catree_cont(DbTableCATree *tb, SWord num_left
PUSH_NODE(&tb->free_stack_rnodes, root);
root = p;
} else {
- free_catree_routing_node(root);
+ free_catree_route_node(tb, root);
if (--num_left >= 0) {
break;
} else {
@@ -1637,10 +1650,10 @@ static SWord do_free_base_node_cont(DbTableCATree *tb, SWord num_left)
}
}
catree_deque_base_node_from_free_list(tb);
- free_catree_base_node(base_node_container);
+ free_catree_base_node(tb, base_node_container);
base_node_container = catree_first_base_node_from_free_list(tb);
if (base_node_container != NULL) {
- PUSH_NODE(&tb->free_stack_elems, base_node_container->baseOrRoute.base.root);
+ PUSH_NODE(&tb->free_stack_elems, base_node_container->u.base.root);
}
return num_left;
}
@@ -1662,7 +1675,7 @@ void db_initialize_catree(void)
int db_create_catree(Process *p, DbTable *tbl)
{
DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *root = create_catree_base_node(tb);
+ DbTableCATreeNode *root = create_base_node(tb, NULL, NIL);
tb->deletion = 0;
tb->base_nodes_to_free_list = NULL;
erts_atomic_init_relb(&(tb->root), (erts_aint_t)root);
@@ -1675,7 +1688,7 @@ static int db_first_catree(Process *p, DbTable *tbl, Eterm *ret)
int result;
DECLARE_AND_INIT_BASE_NODE_SEARCH_STACKS;
/* Find first base node */
- base_node = &(lock_first_base_node(tbl, &search_stack, &locked_base_nodes_stack)->baseOrRoute.base);
+ base_node = &(lock_first_base_node(tbl, &search_stack, &locked_base_nodes_stack)->u.base);
/* Find next base node until non-empty base node is found */
while (base_node != NULL && base_node->root == NULL) {
base_node = find_and_lock_next_base_node_and_path(tbl, &search_stack_ptr, &search_stack_copy_ptr, locked_base_nodes_stack_ptr);
@@ -1716,10 +1729,9 @@ static int db_next_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
static int db_last_catree(Process *p, DbTable *tbl, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common);
- int result = db_last_tree_common(p, tbl, base->baseOrRoute.base.root, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree);
+ int result = db_last_tree_common(p, tbl, base->u.base.root, ret, NULL);
+ wunlock_base_node(&(base->u.base));
return result;
}
@@ -1728,13 +1740,12 @@ static int db_prev_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_prev_tree_common(p, tbl, base->baseOrRoute.base.root, key, ret, &stack);
- wunlock_base_node(&(base->baseOrRoute.base));
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_prev_tree_common(p, tbl, base->u.base.root, key, ret, &stack);
+ wunlock_base_node(&(base->u.base));
return result;
}
@@ -1835,13 +1846,12 @@ static int db_erase_object_catree(DbTable *tbl, Eterm object, Eterm *ret)
static int db_slot_catree(Process *p, DbTable *tbl,
Eterm slot_term, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_slot_tree_common(p, tbl, base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_slot_tree_common(p, tbl, base->u.base.root,
slot_term, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
@@ -1850,13 +1860,12 @@ static int db_select_continue_catree(Process *p,
Eterm continuation,
Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_continue_tree_common(p, &tb->common, &base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_continue_tree_common(p, &tbl->common, &base->u.base.root,
continuation, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
@@ -1864,14 +1873,13 @@ static int db_select_continue_catree(Process *p,
static int db_select_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, int reverse, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_tree_common(p, &tb->common, &base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_tree_common(p, &tbl->common, &base->u.base.root,
tid, pattern, reverse, ret,
NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
@@ -1880,27 +1888,25 @@ static int db_select_count_continue_catree(Process *p,
Eterm continuation,
Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_count_continue_tree_common(p, &tb->common,
- &base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_count_continue_tree_common(p, &tbl->common,
+ &base->u.base.root,
continuation, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_count_tree_common(p, &tb->common, &base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_count_tree_common(p, &tbl->common, &base->u.base.root,
tid, pattern, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
@@ -1908,13 +1914,12 @@ static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Sint chunk_size,
int reversed, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_chunk_tree_common(p, &tb->common, &base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_chunk_tree_common(p, &tbl->common, &base->u.base.root,
tid, pattern, chunk_size, reversed, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
@@ -1923,60 +1928,56 @@ static int db_select_delete_continue_catree(Process *p,
Eterm continuation,
Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
int result;
DbTableCATreeNode *base;
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_delete_continue_tree_common(p, tbl, &base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_delete_continue_tree_common(p, tbl, &base->u.base.root,
continuation, ret, &stack);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
DbTreeStack stack;
TreeDbTerm * stack_array[STACK_NEED];
int result;
DbTableCATreeNode *base;
init_tree_stack(&stack, stack_array, 0);
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_delete_tree_common(p, tbl, &base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_delete_tree_common(p, tbl, &base->u.base.root,
tid, pattern, ret, &stack);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_replace_tree_common(p, &tb->common,
- &base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_replace_tree_common(p, &tbl->common,
+ &base->u.base.root,
tid, pattern, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
static int db_select_replace_continue_catree(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret)
{
- DbTableCATree *tb = &tbl->catree;
int result;
DbTableCATreeNode *base;
- base = merge_to_one_locked_base_node(&tb->common);
- result = db_select_replace_continue_tree_common(p, &tb->common,
- &base->baseOrRoute.base.root,
+ base = merge_to_one_locked_base_node(&tbl->catree);
+ result = db_select_replace_continue_tree_common(p, &tbl->common,
+ &base->u.base.root,
continuation, ret, NULL);
- wunlock_base_node(&(base->baseOrRoute.base));
+ wunlock_base_node(&(base->u.base));
return result;
}
@@ -2002,10 +2003,9 @@ static int db_take_catree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
static void db_print_catree(fmtfn_t to, void *to_arg,
int show, DbTable *tbl)
{
- DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common);
- db_print_tree_common(to, to_arg, show, base->baseOrRoute.base.root, tbl);
- wunlock_base_node(&(base->baseOrRoute.base));
+ DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree);
+ db_print_tree_common(to, to_arg, show, base->u.base.root, tbl);
+ wunlock_base_node(&(base->u.base));
}
/* Release all memory occupied by a single table */
@@ -2045,7 +2045,7 @@ static SWord db_free_table_continue_catree(DbTable *tbl, SWord reds)
} else {
tb->is_routing_nodes_freed = 1; /* Ready with the routing nodes */
first_base_node = catree_first_base_node_from_free_list(tb);
- PUSH_NODE(&tb->free_stack_elems, first_base_node->baseOrRoute.base.root);
+ PUSH_NODE(&tb->free_stack_elems, first_base_node->u.base.root);
}
}
while (catree_first_base_node_from_free_list(tb) != NULL) {
@@ -2081,10 +2081,9 @@ static void db_foreach_offheap_catree(DbTable *tbl,
void (*func)(ErlOffHeap *, void *),
void *arg)
{
- DbTableCATree *tb = &tbl->catree;
- DbTableCATreeNode *base = merge_to_one_locked_base_node(&tb->common);
- db_foreach_offheap_tree_common(base->baseOrRoute.base.root, func, arg);
- wunlock_base_node(&(base->baseOrRoute.base));
+ DbTableCATreeNode *base = merge_to_one_locked_base_node(&tbl->catree);
+ db_foreach_offheap_tree_common(base->u.base.root, func, arg);
+ wunlock_base_node(&(base->u.base));
}
static int db_lookup_dbterm_catree(Process *p, DbTable *tbl, Eterm key, Eterm obj,
@@ -2111,7 +2110,7 @@ static void db_finalize_dbterm_catree(int cret, DbUpdateHandle *handle)
DbTableCATreeNode *prev_node = handle->lck;
DbTableCATreeNode *current_node = handle->lck2;
int current_level = handle->current_level;
- DbTableCATreeBaseNode *base_node = &current_node->baseOrRoute.base;
+ DbTableCATreeBaseNode *base_node = &current_node->u.base;
db_finalize_dbterm_tree_common(cret, handle, NULL);
ERL_DB_CATREE_CREATE_DO_OPERATION_FUNCTION_ADAPT_AND_UNLOCK_PART;
return;
diff --git a/erts/emulator/beam/erl_db_catree.h b/erts/emulator/beam/erl_db_catree.h
index 1f2c7da091..f9c0784289 100644
--- a/erts/emulator/beam/erl_db_catree.h
+++ b/erts/emulator/beam/erl_db_catree.h
@@ -34,23 +34,36 @@
struct DbTableCATreeNode;
typedef struct {
+ Eterm term;
+ struct erl_off_heap_header* oh;
+ Uint size;
+ Eterm heap[1];
+} DbRouteKey;
+
+typedef struct {
erts_rwmtx_t lock; /* The lock for this base node */
Sint lock_statistics;
int is_valid; /* If this base node is still valid */
TreeDbTerm *root; /* The root of the sequential tree */
- DbTable * tab; /* Table ptr, used when freeing using thread progress */
ErtsThrPrgrLaterOp free_item; /* Used when freeing using thread progress */
struct DbTableCATreeNode * next; /* Used when gradually deleting */
+
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ DbRouteKey lc_key;
+#endif
+ char end_of_struct__;
} DbTableCATreeBaseNode;
typedef struct {
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ Sint lc_order;
+#endif
ErtsThrPrgrLaterOp free_item; /* Used when freeing using thread progress */
- DbTable* tab; /* Table ptr, used when freeing using thread progress */
erts_mtx_t lock; /* Used when joining route nodes */
int is_valid; /* If this route node is still valid */
erts_atomic_t left;
erts_atomic_t right;
- DbTerm key;
+ DbRouteKey key;
} DbTableCATreeRouteNode;
typedef struct DbTableCATreeNode {
@@ -58,7 +71,7 @@ typedef struct DbTableCATreeNode {
union {
DbTableCATreeRouteNode route;
DbTableCATreeBaseNode base;
- } baseOrRoute;
+ } u;
} DbTableCATreeNode;
typedef struct {
diff --git a/erts/emulator/beam/erl_db_hash.c b/erts/emulator/beam/erl_db_hash.c
index 752d3ae3a8..61806876a7 100644
--- a/erts/emulator/beam/erl_db_hash.c
+++ b/erts/emulator/beam/erl_db_hash.c
@@ -2731,13 +2731,9 @@ static int free_seg(DbTableHash *tb, int free_records)
* sure no lingering threads are still hanging in BUCKET macro
* with an old segtab pointer.
*/
- Uint sz = SIZEOF_EXT_SEGTAB(est->nsegs);
- ASSERT(sz == ERTS_ALC_DBG_BLK_SZ(est));
- ERTS_DB_ALC_MEM_UPDATE_(tb, sz, 0);
- erts_schedule_thr_prgr_later_cleanup_op(dealloc_ext_segtab,
- est,
- &est->lop,
- sz);
+ erts_schedule_db_free(&tb->common, dealloc_ext_segtab,
+ est, &est->lop,
+ SIZEOF_EXT_SEGTAB(est->nsegs));
}
else
erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable*)tb, est,
diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c
index 3b10ef8c25..987e370341 100644
--- a/erts/emulator/beam/erl_lock_check.c
+++ b/erts/emulator/beam/erl_lock_check.c
@@ -42,6 +42,7 @@
#include "erl_term.h"
#include "erl_threads.h"
#include "erl_atom_table.h"
+#include "erl_utils.h"
typedef struct {
char *name;
@@ -91,8 +92,8 @@ static erts_lc_lock_order_t erts_lock_order[] = {
{ "db_tab", "address" },
{ "db_tab_fix", "address" },
{ "db_hash_slot", "address" },
- { "erl_db_catree_base_node", "dynamic" },
- { "erl_db_catree_route_node", "dynamic" },
+ { "erl_db_catree_base_node", "term" },
+ { "erl_db_catree_route_node", "index" },
{ "resource_monitors", "address" },
{ "driver_list", NULL },
{ "proc_msgq", "pid" },
@@ -709,26 +710,14 @@ erts_lc_get_lock_order_id(char *name)
return (Sint16) -1;
}
-int
-erts_lc_is_check_order(char *name)
+static int
+lc_is_term_order(Sint16 id)
{
- int i;
- if (!name || name[0] == '\0')
- erts_fprintf(stderr, "Missing lock name\n");
-
- for (i = 0; i < ERTS_LOCK_ORDER_SIZE; i++) {
- if (sys_strcmp(erts_lock_order[i].name, name) == 0) {
- if (erts_lock_order[i].internal_order != NULL &&
- sys_strcmp(erts_lock_order[i].internal_order, "dynamic") == 0) {
- return 0;
- }else{
- return 1;
- }
- }
- }
- return 1;
+ return erts_lock_order[id].internal_order != NULL
+ && sys_strcmp(erts_lock_order[id].internal_order, "term") == 0;
}
+
static int compare_locked_by_id(lc_locked_lock_t *locked_lock, erts_lc_lock_t *comparand)
{
if(locked_lock->id < comparand->id) {
@@ -740,18 +729,23 @@ static int compare_locked_by_id(lc_locked_lock_t *locked_lock, erts_lc_lock_t *c
return 0;
}
-static int compare_locked_by_id_extra(lc_locked_lock_t *locked_lock, erts_lc_lock_t *comparand)
+static int compare_locked_by_id_extra(lc_locked_lock_t *ll, erts_lc_lock_t *comparand)
{
- int order = compare_locked_by_id(locked_lock, comparand);
+ int order = compare_locked_by_id(ll, comparand);
if(order) {
return order;
- } else if(locked_lock->extra < comparand->extra) {
+ }
+ if (ll->flags & ERTS_LOCK_FLAGS_PROPERTY_TERM_ORDER) {
+ ASSERT(!is_header(ll->extra) && !is_header(comparand->extra));
+ return CMP(ll->extra, comparand->extra);
+ }
+
+ if(ll->extra < comparand->extra) {
return -1;
- } else if(locked_lock->extra > comparand->extra) {
+ } else if(ll->extra > comparand->extra) {
return 1;
}
-
return 0;
}
@@ -990,7 +984,8 @@ erts_lc_trylock_force_busy_flg(erts_lc_lock_t *lck, erts_lock_options_t options)
return 0;
}
else {
- lc_locked_lock_t *tl_lck;
+ lc_locked_lock_t *ll;
+ int order;
ASSERT(thr->locked.last);
@@ -999,26 +994,27 @@ erts_lc_trylock_force_busy_flg(erts_lc_lock_t *lck, erts_lock_options_t options)
type_order_violation("trylocking ", thr, lck);
#endif
- if (thr->locked.last->id < lck->id
- || (thr->locked.last->id == lck->id
- && thr->locked.last->extra < lck->extra))
- return 0;
+ ll = thr->locked.last;
+ order = compare_locked_by_id_extra(ll, lck);
+
+ if (order < 0)
+ return 0;
/*
- * Lock order violation
+ * TryLock order violation
*/
-
- if (lck->check_order) {
- /* Check that we are not trying to lock this lock twice */
- for (tl_lck = thr->locked.last; tl_lck; tl_lck = tl_lck->prev) {
- if (tl_lck->id < lck->id
- || (tl_lck->id == lck->id && tl_lck->extra <= lck->extra)) {
- if (tl_lck->id == lck->id && tl_lck->extra == lck->extra)
- lock_twice("Trylocking", thr, lck, options);
- break;
- }
+ /* Check that we are not trying to lock this lock twice */
+ while (1) {
+ if (order <= 0) {
+ if (order == 0)
+ lock_twice("Trylocking", thr, lck, options);
+ break;
}
+ ll = ll->prev;
+ if (!ll)
+ break;
+ order = compare_locked_by_id_extra(ll, lck);
}
#ifndef ERTS_LC_ALLWAYS_FORCE_BUSY_TRYLOCK_ON_LOCK_ORDER_VIOLATION
@@ -1038,6 +1034,11 @@ erts_lc_trylock_force_busy_flg(erts_lc_lock_t *lck, erts_lock_options_t options)
#endif
}
+/*
+ * locked = 0 trylock failed
+ * locked > 0 trylock succeeded
+ * locked < 0 prelocking of newly created lock (no lock order check)
+ */
void erts_lc_trylock_flg_x(int locked, erts_lc_lock_t *lck, erts_lock_options_t options,
char *file, unsigned int line)
{
@@ -1066,9 +1067,9 @@ void erts_lc_trylock_flg_x(int locked, erts_lc_lock_t *lck, erts_lock_options_t
#endif
for (tl_lck = thr->locked.last; tl_lck; tl_lck = tl_lck->prev) {
- if (tl_lck->id < lck->id
- || (tl_lck->id == lck->id && tl_lck->extra <= lck->extra)) {
- if (tl_lck->id == lck->id && tl_lck->extra == lck->extra && lck->check_order)
+ int order = compare_locked_by_id_extra(tl_lck, lck);
+ if (order <= 0) {
+ if (order == 0 && locked >= 0)
lock_twice("Trylocking", thr, lck, options);
if (locked) {
ll->next = tl_lck->next;
@@ -1111,10 +1112,10 @@ void erts_lc_require_lock_flg(erts_lc_lock_t *lck, erts_lock_options_t options,
for (l_lck2 = thr->required.last;
l_lck2;
l_lck2 = l_lck2->prev) {
- if (l_lck2->id < lck->id
- || (l_lck2->id == lck->id && l_lck2->extra < lck->extra))
+ int order = compare_locked_by_id_extra(l_lck2, lck);
+ if (order < 0)
break;
- else if (l_lck2->id == lck->id && l_lck2->extra == lck->extra)
+ if (order == 0)
require_twice(thr, lck);
}
if (!l_lck2) {
@@ -1172,6 +1173,7 @@ void erts_lc_lock_flg_x(erts_lc_lock_t *lck, erts_lock_options_t options,
{
lc_thread_t *thr;
lc_locked_lock_t *new_ll;
+ int order;
if (lck->inited != ERTS_LC_INITITALIZED)
uninitialized_lock();
@@ -1187,11 +1189,10 @@ void erts_lc_lock_flg_x(erts_lc_lock_t *lck, erts_lock_options_t options,
thr->locked.last = thr->locked.first = new_ll;
ASSERT(0 < lck->id && lck->id < ERTS_LOCK_ORDER_SIZE);
thr->matrix.m[lck->id][0] = 1;
+ return;
}
- else if (( ! lck->check_order && thr->locked.last->id == lck->id) ||
- (thr->locked.last->id < lck->id
- || (thr->locked.last->id == lck->id
- && thr->locked.last->extra < lck->extra))) {
+ order = compare_locked_by_id_extra(thr->locked.last, lck);
+ if (order < 0) {
lc_locked_lock_t* ll;
if (LOCK_IS_TYPE_ORDER_VIOLATION(lck->flags, thr->locked.last->flags)) {
type_order_violation("locking ", thr, lck);
@@ -1321,7 +1322,6 @@ void
erts_lc_init_lock(erts_lc_lock_t *lck, char *name, erts_lock_flags_t flags)
{
lck->id = erts_lc_get_lock_order_id(name);
- lck->check_order = erts_lc_is_check_order(name);
lck->extra = (UWord) &lck->extra;
ASSERT(is_not_immed(lck->extra));
lck->flags = flags;
@@ -1333,10 +1333,14 @@ void
erts_lc_init_lock_x(erts_lc_lock_t *lck, char *name, erts_lock_flags_t flags, Eterm extra)
{
lck->id = erts_lc_get_lock_order_id(name);
- lck->check_order = erts_lc_is_check_order(name);
lck->extra = extra;
- ASSERT(is_immed(lck->extra));
lck->flags = flags;
+ if (lc_is_term_order(lck->id)) {
+ lck->flags |= ERTS_LOCK_FLAGS_PROPERTY_TERM_ORDER;
+ ASSERT(!is_header(lck->extra));
+ }
+ else
+ ASSERT(is_immed(lck->extra));
lck->taken_options = 0;
lck->inited = ERTS_LC_INITITALIZED;
}
diff --git a/erts/emulator/beam/erl_lock_check.h b/erts/emulator/beam/erl_lock_check.h
index 472e88ad65..b32f27d9f9 100644
--- a/erts/emulator/beam/erl_lock_check.h
+++ b/erts/emulator/beam/erl_lock_check.h
@@ -46,7 +46,6 @@
typedef struct {
int inited;
Sint16 id;
- int check_order;
erts_lock_flags_t flags;
erts_lock_options_t taken_options;
UWord extra;
@@ -54,12 +53,11 @@ typedef struct {
#define ERTS_LC_INITITALIZED 0x7f7f7f7f
-#define ERTS_LC_LOCK_INIT(ID, X, F) {ERTS_LC_INITITALIZED, (ID), 1, (F), 0, (X)}
+#define ERTS_LC_LOCK_INIT(ID, X, F) {ERTS_LC_INITITALIZED, (ID), (F), 0, (X)}
void erts_lc_init(void);
void erts_lc_late_init(void);
Sint16 erts_lc_get_lock_order_id(char *name);
-int erts_lc_is_check_order(char *name);
void erts_lc_check(erts_lc_lock_t *have, int have_len,
erts_lc_lock_t *have_not, int have_not_len);
void erts_lc_check_exact(erts_lc_lock_t *have, int have_len);
@@ -106,7 +104,7 @@ Eterm erts_lc_dump_graph(void);
#define erts_lc_lock(lck) erts_lc_lock_x(lck,__FILE__,__LINE__)
#define erts_lc_trylock(res,lck) erts_lc_trylock_x(res,lck,__FILE__,__LINE__)
-#define erts_lc_lock_flg(lck) erts_lc_lock_flg_x(lck,__FILE__,__LINE__)
-#define erts_lc_trylock_flg(res,lck) erts_lc_trylock_flg_x(res,lck,__FILE__,__LINE__)
+#define erts_lc_lock_flg(lck,flg) erts_lc_lock_flg_x(lck,flg,__FILE__,__LINE__)
+#define erts_lc_trylock_flg(res,lck,flg) erts_lc_trylock_flg_x(res,lck,flg,__FILE__,__LINE__)
#endif /* #ifndef ERTS_LOCK_CHECK_H__ */
diff --git a/erts/emulator/beam/erl_lock_flags.h b/erts/emulator/beam/erl_lock_flags.h
index d711f69456..2db133b598 100644
--- a/erts/emulator/beam/erl_lock_flags.h
+++ b/erts/emulator/beam/erl_lock_flags.h
@@ -28,15 +28,17 @@
/* Property/category are bitfields to simplify their use in masks. */
#define ERTS_LOCK_FLAGS_MASK_CATEGORY (0xFFC0)
-#define ERTS_LOCK_FLAGS_MASK_PROPERTY (0x0030)
+#define ERTS_LOCK_FLAGS_MASK_PROPERTY (0x0038)
/* Type is a plain number. */
-#define ERTS_LOCK_FLAGS_MASK_TYPE (0x000F)
+#define ERTS_LOCK_FLAGS_MASK_TYPE (0x0007)
#define ERTS_LOCK_FLAGS_TYPE_SPINLOCK (1)
#define ERTS_LOCK_FLAGS_TYPE_MUTEX (2)
#define ERTS_LOCK_FLAGS_TYPE_PROCLOCK (3)
+/* Lock checker use real term order instead of raw word compare */
+#define ERTS_LOCK_FLAGS_PROPERTY_TERM_ORDER (1 << 3)
/* "Static" guarantees that the lock will never be destroyed once created. */
#define ERTS_LOCK_FLAGS_PROPERTY_STATIC (1 << 4)
#define ERTS_LOCK_FLAGS_PROPERTY_READ_WRITE (1 << 5)