aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_db_tree.c
diff options
context:
space:
mode:
authorSverker Eriksson <[email protected]>2018-10-18 11:29:06 +0200
committerSverker Eriksson <[email protected]>2018-10-23 12:36:29 +0200
commit77d3d262c5e7d784204a6d91b79ed2f46b4013ad (patch)
tree14a1b8f87c9ef617a966f95665e253af25a9e8ec /erts/emulator/beam/erl_db_tree.c
parent8cd07dae15569deea3a0b539057299a238c8ddc1 (diff)
downloadotp-77d3d262c5e7d784204a6d91b79ed2f46b4013ad.tar.gz
otp-77d3d262c5e7d784204a6d91b79ed2f46b4013ad.tar.bz2
otp-77d3d262c5e7d784204a6d91b79ed2f46b4013ad.zip
erts: Do contention adaptions during (updating) iterations
Once an iteration key has been found, never fall back to first/last key in next/prev tree as trees may split or join under our feet. I.e we must always use previous key when searching for the next key.
Diffstat (limited to 'erts/emulator/beam/erl_db_tree.c')
-rw-r--r--erts/emulator/beam/erl_db_tree.c201
1 files changed, 124 insertions, 77 deletions
diff --git a/erts/emulator/beam/erl_db_tree.c b/erts/emulator/beam/erl_db_tree.c
index 250d90e189..854b8d6329 100644
--- a/erts/emulator/beam/erl_db_tree.c
+++ b/erts/emulator/beam/erl_db_tree.c
@@ -1046,7 +1046,7 @@ int db_select_continue_tree_common(Process *p,
if (iter) {
iter->next_route_key = lastkey;
- sc.common.root = catree_find_nextprev_root(iter, !!reverse != !!chunk_size);
+ sc.common.root = catree_find_nextprev_root(iter, !!reverse != !!chunk_size, NULL);
}
else
sc.common.root = &((DbTableTree*)tb)->root;
@@ -1354,7 +1354,7 @@ int db_select_count_continue_tree_common(Process *p,
if (iter) {
iter->next_route_key = lastkey;
- sc.common.root = catree_find_prev_root(iter);
+ sc.common.root = catree_find_prev_root(iter, NULL);
}
else {
sc.common.root = &tb->tree.root;
@@ -1765,7 +1765,7 @@ int db_select_delete_continue_tree_common(Process *p,
if (iter) {
iter->next_route_key = lastkey;
- sc.common.root = catree_find_prev_root(iter);
+ sc.common.root = catree_find_prev_root(iter, NULL);
}
else {
sc.common.root = &tbl->tree.root;
@@ -2001,7 +2001,7 @@ int db_select_replace_continue_tree_common(Process *p,
if (iter) {
iter->next_route_key = lastkey;
- sc.common.root = catree_find_prev_root(iter);
+ sc.common.root = catree_find_prev_root(iter, NULL);
}
else {
sc.common.root = &tbl->tree.root;
@@ -2734,8 +2734,22 @@ static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root,
{
TreeDbTerm *this;
TreeDbTerm *tmp;
+ TreeDbTerm *lastobj;
+ Eterm lastkey;
TreeDbTerm **pp;
- DbTreeStack* stack = get_any_stack(tb,stack_container);
+ DbTreeStack* stack;
+
+ if (iter) {
+ /* Find first non-empty tree */
+ while (!root) {
+ TreeDbTerm** pp = catree_find_next_root(iter, NULL);
+ if (!pp)
+ return NULL;
+ root = *pp;
+ }
+ }
+
+ stack = get_any_stack(tb,stack_container);
ASSERT(stack != NULL);
if (slot == 1) { /* Don't search from where we are if we are
@@ -2762,6 +2776,7 @@ static TreeDbTerm *slot_search(Process *p, TreeDbTerm *root,
this = TOP_NODE(stack);
while (stack->slot != slot) {
ASSERT(this);
+ lastobj = this;
if (slot > stack->slot) {
if (this->right != NULL) {
this = this->right;
@@ -2810,14 +2825,19 @@ next_root:
if (!iter)
break; /* EOT */
- pp = catree_find_next_root(iter);
+ ASSERT(slot > stack->slot);
+ if (lastobj) {
+ lastkey = GETKEY(tb, lastobj->dbterm.tpl);
+ lastobj = NULL;
+ }
+ pp = catree_find_next_root(iter, &lastkey);
if (!pp)
break; /* EOT */
root = *pp;
stack->pos = 0;
+ find_next(&tb->common, root, stack, lastkey);
}
-
release_stack(tb,stack_container,stack);
return this;
}
@@ -2952,7 +2972,8 @@ static TreeDbTerm *find_next_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp,
if (iter) {
*rootpp = catree_find_next_from_pb_key_root(key, iter);
- root = *rootpp ? **rootpp : NULL;
+ ASSERT(*rootpp);
+ root = **rootpp;
}
else {
*rootpp = &tbl->tree.root;
@@ -2991,7 +3012,8 @@ static TreeDbTerm *find_prev_from_pb_key(DbTable *tbl, TreeDbTerm*** rootpp,
if (iter) {
*rootpp = catree_find_prev_from_pb_key_root(key, iter);
- root = *rootpp ? **rootpp : NULL;
+ ASSERT(*rootpp);
+ root = **rootpp;
}
else {
*rootpp = &tbl->tree.root;
@@ -3212,36 +3234,45 @@ static void traverse_backwards(DbTableCommon *tb,
TreeDbTerm *this, *next;
TreeDbTerm** root = context->root;
- do {
- if (lastkey == THE_NON_VALUE) {
- stack->pos = stack->slot = 0;
- if (( this = *root ) == NULL) {
- goto next_root;
- }
- while (this != NULL) {
- PUSH_NODE(stack, this);
- this = this->right;
+ if (lastkey == THE_NON_VALUE) {
+ if (iter) {
+ while (*root == NULL) {
+ root = catree_find_prev_root(iter, NULL);
+ if (!root)
+ return;
}
- this = TOP_NODE(stack);
- next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 0)))
- return;
- } else {
- next = find_prev(tb, *root, stack, lastkey);
+ context->root = root;
}
+ stack->pos = stack->slot = 0;
+ next = *root;
+ while (next != NULL) {
+ PUSH_NODE(stack, next);
+ next = next->right;
+ }
+ next = TOP_NODE(stack);
+ } else {
+ next = find_prev(tb, *root, stack, lastkey);
+ }
- while ((this = next) != NULL) {
- next = find_prev(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
+ while (1) {
+ while (next) {
+ this = next;
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ next = find_prev(tb, *root, stack, lastkey);
if (!((*doit)(tb, this, context, 0)))
return;
}
-next_root:
if (!iter)
return;
- root = context->root = catree_find_prev_root(iter);
- lastkey = THE_NON_VALUE;
- } while (root);
+ ASSERT(is_value(lastkey));
+ root = catree_find_prev_root(iter, &lastkey);
+ if (!root)
+ return;
+ context->root = root;
+ stack->pos = stack->slot = 0;
+ next = find_prev(tb, *root, stack, lastkey);
+ }
}
/*
@@ -3257,36 +3288,45 @@ static void traverse_forward(DbTableCommon *tb,
TreeDbTerm *this, *next;
TreeDbTerm **root = context->root;
- do {
- if (lastkey == THE_NON_VALUE) {
- stack->pos = stack->slot = 0;
- if (( this = *root ) == NULL) {
- goto next_root;
+ if (lastkey == THE_NON_VALUE) {
+ if (iter) {
+ while (*root == NULL) {
+ root = catree_find_next_root(iter, NULL);
+ if (!root)
+ return;
}
- while (this != NULL) {
- PUSH_NODE(stack, this);
- this = this->left;
- }
- this = TOP_NODE(stack);
- next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
- if (!((*doit)(tb, this, context, 1)))
- return;
- } else {
- next = find_next(tb, *root, stack, lastkey);
+ context->root = root;
}
+ stack->pos = stack->slot = 0;
+ next = *root;
+ while (next != NULL) {
+ PUSH_NODE(stack, next);
+ next = next->left;
+ }
+ next = TOP_NODE(stack);
+ } else {
+ next = find_next(tb, *root, stack, lastkey);
+ }
- while ((this = next) != NULL) {
- next = find_next(tb, *root, stack, GETKEY(tb, this->dbterm.tpl));
+ while (1) {
+ while (next) {
+ this = next;
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ next = find_next(tb, *root, stack, lastkey);
if (!((*doit)(tb, this, context, 1)))
return;
}
-next_root:
if (!iter)
return;
- root = context->root = catree_find_next_root(iter);
- lastkey = THE_NON_VALUE;
- } while(root);
+ ASSERT(is_value(lastkey));
+ root = catree_find_next_root(iter, &lastkey);
+ if (!root)
+ return;
+ context->root = root;
+ stack->pos = stack->slot = 0;
+ next = find_next(tb, *root, stack, lastkey);
+ }
}
/*
@@ -3306,44 +3346,51 @@ static void traverse_update_backwards(DbTableCommon *tb,
TreeDbTerm *this, *next, **this_ptr;
TreeDbTerm** root = context->root;
- do {
- if (lastkey == THE_NON_VALUE) {
- stack->pos = stack->slot = 0;
- if (( this = *root ) == NULL) {
- goto next_root;
+ if (lastkey == THE_NON_VALUE) {
+ if (iter) {
+ while (*root == NULL) {
+ root = catree_find_prev_root(iter, NULL);
+ if (!root)
+ return;
+ context->root = root;
}
- while (this != NULL) {
- PUSH_NODE(stack, this);
- this = this->right;
- }
- this = TOP_NODE(stack);
- this_ptr = find_ptr(tb, root, stack, this);
- ASSERT(this_ptr != NULL);
- res = (*doit)(tb, this_ptr, context, 0);
- REPLACE_TOP_NODE(stack, *this_ptr);
- next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
- if (!res)
- return;
- } else {
- next = find_prev(tb, *root, stack, lastkey);
}
+ stack->pos = stack->slot = 0;
+ next = *root;
+ while (next) {
+ PUSH_NODE(stack, next);
+ next = next->right;
+ }
+ next = TOP_NODE(stack);
+ }
+ else
+ next = find_prev(tb, *root, stack, lastkey);
- while ((this = next) != NULL) {
+
+ while (1) {
+ while (next) {
+ this = next;
this_ptr = find_ptr(tb, root, stack, this);
ASSERT(this_ptr != NULL);
res = (*doit)(tb, this_ptr, context, 0);
- REPLACE_TOP_NODE(stack, *this_ptr);
- next = find_prev(tb, *root, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl));
+ this = *this_ptr;
+ REPLACE_TOP_NODE(stack, this);
if (!res)
return;
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ next = find_prev(tb, *root, stack, lastkey);
}
-next_root:
if (!iter)
return;
- root = context->root = catree_find_prev_root(iter);
- lastkey = THE_NON_VALUE;
- } while (root);
+ ASSERT(is_value(lastkey));
+ root = catree_find_prev_root(iter, &lastkey);
+ if (!root)
+ return;
+ context->root = root;
+ stack->pos = stack->slot = 0;
+ next = find_prev(tb, *root, stack, lastkey);
+ }
}
static enum ms_key_boundness key_boundness(DbTableCommon *tb,