aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_db_tree.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_db_tree.c')
-rw-r--r--erts/emulator/beam/erl_db_tree.c3289
1 files changed, 3289 insertions, 0 deletions
diff --git a/erts/emulator/beam/erl_db_tree.c b/erts/emulator/beam/erl_db_tree.c
new file mode 100644
index 0000000000..d3a916d2d9
--- /dev/null
+++ b/erts/emulator/beam/erl_db_tree.c
@@ -0,0 +1,3289 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 1998-2009. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+/*
+** Implementation of ordered ETS tables.
+** The tables are implemented as AVL trees (Published by Adelson-Velski
+** and Landis). A nice source for learning about these trees is
+** Wirth's Algorithms + Datastructures = Programs.
+** The implementation here is however not made with recursion
+** as the examples in Wirths book are.
+*/
+
+/*
+#ifdef DEBUG
+#define HARDDEBUG 1
+#endif
+*/
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include "sys.h"
+#include "erl_vm.h"
+#include "global.h"
+#include "erl_process.h"
+#include "error.h"
+#define ERTS_WANT_DB_INTERNAL__
+#include "erl_db.h"
+#include "bif.h"
+#include "big.h"
+#include "erl_binary.h"
+
+#include "erl_db_tree.h"
+
+
+
+#define GETKEY(dtt, tplp) (*((tplp) + (dtt)->common.keypos))
+#define GETKEY_WITH_POS(Keypos, Tplp) (*((Tplp) + Keypos))
+#define NITEMS(tb) ((int)erts_smp_atomic_read(&(tb)->common.nitems))
+
+/*
+** A stack of this size is enough for an AVL tree with more than
+** 0xFFFFFFFF elements. May be subject to change if
+** the datatype of the element counter is changed to a 64 bit integer.
+** The Maximal height of an AVL tree is calculated as:
+** h(n) <= 1.4404 * log(n + 2) - 0.328
+** Where n denotes the number of nodes, h(n) the height of the tree
+** with n nodes and log is the binary logarithm.
+*/
+
+#define STACK_NEED 50
+#define TREE_MAX_ELEMENTS 0xFFFFFFFFUL
+
+#define PUSH_NODE(Dtt, Tdt) \
+ ((Dtt)->array[(Dtt)->pos++] = Tdt)
+
+#define POP_NODE(Dtt) \
+ (((Dtt)->pos) ? \
+ (Dtt)->array[--((Dtt)->pos)] : NULL)
+
+#define TOP_NODE(Dtt) \
+ ((Dtt->pos) ? \
+ (Dtt)->array[(Dtt)->pos - 1] : NULL)
+
+#define EMPTY_NODE(Dtt) (TOP_NODE(Dtt) == NULL)
+
+
+
+/* Obtain table static stack if available. NULL if not.
+** Must be released with release_stack()
+*/
+static DbTreeStack* get_static_stack(DbTableTree* tb)
+{
+ if (!erts_smp_atomic_xchg(&tb->is_stack_busy, 1)) {
+ return &tb->static_stack;
+ }
+ return NULL;
+}
+
+/* Obtain static stack if available, otherwise empty dynamic stack.
+** Must be released with release_stack()
+*/
+static DbTreeStack* get_any_stack(DbTableTree* tb)
+{
+ DbTreeStack* stack;
+ if (!erts_smp_atomic_xchg(&tb->is_stack_busy, 1)) {
+ return &tb->static_stack;
+ }
+ stack = erts_db_alloc(ERTS_ALC_T_DB_STK, (DbTable *) tb,
+ sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
+ stack->pos = 0;
+ stack->slot = 0;
+ stack->array = (TreeDbTerm**) (stack + 1);
+ return stack;
+}
+
+static void release_stack(DbTableTree* tb, DbTreeStack* stack)
+{
+ if (stack == &tb->static_stack) {
+ ASSERT(erts_smp_atomic_read(&tb->is_stack_busy) == 1);
+ erts_smp_atomic_set(&tb->is_stack_busy, 0);
+ }
+ else {
+ erts_db_free(ERTS_ALC_T_DB_STK, (DbTable *) tb,
+ (void *) stack, sizeof(DbTreeStack) + sizeof(TreeDbTerm*) * STACK_NEED);
+ }
+}
+
+static void reset_static_stack(DbTableTree* tb)
+{
+ tb->static_stack.pos = 0;
+ tb->static_stack.slot = 0;
+}
+
+
+/*
+** Some macros for "direction stacks"
+*/
+#define DIR_LEFT 0
+#define DIR_RIGHT 1
+#define DIR_END 2
+
+/*
+ * Special binary flag
+ */
+#define BIN_FLAG_ALL_OBJECTS BIN_FLAG_USR1
+
+/*
+ * Number of records to delete before trapping.
+ */
+#define DELETE_RECORD_LIMIT 12000
+
+/*
+** Debugging
+*/
+#ifdef HARDDEBUG
+static TreeDbTerm *traverse_until(TreeDbTerm *t, int *current, int to);
+static void check_slot_pos(DbTableTree *tb);
+static void check_saved_stack(DbTableTree *tb);
+static int check_table_tree(TreeDbTerm *t);
+
+#define TREE_DEBUG
+#endif
+
+#ifdef TREE_DEBUG
+/*
+** Primitive trace macro
+*/
+#define DBG erts_fprintf(stderr,"%d\n",__LINE__)
+
+/*
+** Debugging dump
+*/
+
+static void do_dump_tree2(int to, void *to_arg, int show, TreeDbTerm *t,
+ int offset);
+
+#else
+
+#define DBG /* nothing */
+
+#endif
+
+/*
+ * Size calculations
+ */
+#define SIZ_OVERHEAD ((sizeof(TreeDbTerm)/sizeof(Eterm)) - 1)
+#define SIZ_DBTERM(TDT) (SIZ_OVERHEAD + (TDT)->dbterm.size)
+
+/*
+** Datatypes
+*/
+
+/*
+ * This structure is filled in by analyze_pattern() for the select
+ * functions.
+ */
+struct mp_info {
+ int all_objects; /* True if complete objects are always
+ * returned from the match_spec (can use
+ * copy_shallow on the return value) */
+ int something_can_match; /* The match_spec is not "impossible" */
+ int some_limitation; /* There is some limitation on the search
+ * area, i. e. least and/or most is set.*/
+ int got_partial; /* The limitation has a partially bound
+ * key */
+ Eterm least; /* The lowest matching key (possibly
+ * partially bound expression) */
+ Eterm most; /* The highest matching key (possibly
+ * partially bound expression) */
+
+ TreeDbTerm *save_term; /* If the key is completely bound, this
+ * will be the Tree node we're searching
+ * for, otherwise it will be useless */
+ Binary *mp; /* The compiled match program */
+};
+
+/*
+ * Used by doit_select(_chunk)
+ */
+struct select_context {
+ Process *p;
+ Eterm accum;
+ Binary *mp;
+ Eterm end_condition;
+ Eterm *lastobj;
+ Sint32 max;
+ int keypos;
+ int all_objects;
+ Sint got;
+ Sint chunk_size;
+};
+
+/*
+ * Used by doit_select_count
+ */
+struct select_count_context {
+ Process *p;
+ Binary *mp;
+ Eterm end_condition;
+ Eterm *lastobj;
+ Sint32 max;
+ int keypos;
+ int all_objects;
+ Sint got;
+};
+
+/*
+ * Used by doit_select_delete
+ */
+struct select_delete_context {
+ Process *p;
+ DbTableTree *tb;
+ Uint accum;
+ Binary *mp;
+ Eterm end_condition;
+ int erase_lastterm;
+ TreeDbTerm *lastterm;
+ Sint32 max;
+ int keypos;
+};
+
+/*
+** Forward declarations
+*/
+static TreeDbTerm *linkout_tree(DbTableTree *tb, Eterm key);
+static TreeDbTerm *linkout_object_tree(DbTableTree *tb,
+ Eterm object);
+static int do_free_tree_cont(DbTableTree *tb, int num_left);
+static TreeDbTerm* get_term(DbTableTree *tb,
+ TreeDbTerm* old,
+ Eterm obj);
+static void free_term(DbTableTree *tb, TreeDbTerm* p);
+static int balance_left(TreeDbTerm **this);
+static int balance_right(TreeDbTerm **this);
+static int delsub(TreeDbTerm **this);
+static TreeDbTerm *slot_search(Process *p, DbTableTree *tb, Sint slot);
+static TreeDbTerm *find_node(DbTableTree *tb, Eterm key);
+static TreeDbTerm **find_node2(DbTableTree *tb, Eterm key);
+static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack*, Eterm key);
+static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack*, Eterm key);
+static TreeDbTerm *find_next_from_pb_key(DbTableTree *tb, DbTreeStack*,
+ Eterm key);
+static TreeDbTerm *find_prev_from_pb_key(DbTableTree *tb, DbTreeStack*,
+ Eterm key);
+static void traverse_backwards(DbTableTree *tb,
+ DbTreeStack*,
+ Eterm lastkey,
+ int (*doit)(DbTableTree *tb,
+ TreeDbTerm *,
+ void *,
+ int),
+ void *context);
+static void traverse_forward(DbTableTree *tb,
+ DbTreeStack*,
+ Eterm lastkey,
+ int (*doit)(DbTableTree *tb,
+ TreeDbTerm *,
+ void *,
+ int),
+ void *context);
+static int key_given(DbTableTree *tb, Eterm pattern, TreeDbTerm **ret,
+ Eterm *partly_bound_key);
+static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key);
+static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done);
+
+static int analyze_pattern(DbTableTree *tb, Eterm pattern,
+ struct mp_info *mpi);
+static int doit_select(DbTableTree *tb,
+ TreeDbTerm *this,
+ void *ptr,
+ int forward);
+static int doit_select_count(DbTableTree *tb,
+ TreeDbTerm *this,
+ void *ptr,
+ int forward);
+static int doit_select_chunk(DbTableTree *tb,
+ TreeDbTerm *this,
+ void *ptr,
+ int forward);
+static int doit_select_delete(DbTableTree *tb,
+ TreeDbTerm *this,
+ void *ptr,
+ int forward);
+static void do_dump_tree(int to, void *to_arg, TreeDbTerm *t);
+
+static int partly_bound_can_match_lesser(Eterm partly_bound_1,
+ Eterm partly_bound_2);
+static int partly_bound_can_match_greater(Eterm partly_bound_1,
+ Eterm partly_bound_2);
+static int do_partly_bound_can_match_lesser(Eterm a, Eterm b,
+ int *done);
+static int do_partly_bound_can_match_greater(Eterm a, Eterm b,
+ int *done);
+static BIF_RETTYPE ets_select_reverse(Process *p, Eterm a1,
+ Eterm a2, Eterm a3);
+
+/* Method interface functions */
+static int db_first_tree(Process *p, DbTable *tbl,
+ Eterm *ret);
+static int db_next_tree(Process *p, DbTable *tbl,
+ Eterm key, Eterm *ret);
+static int db_last_tree(Process *p, DbTable *tbl,
+ Eterm *ret);
+static int db_prev_tree(Process *p, DbTable *tbl,
+ Eterm key,
+ Eterm *ret);
+static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail);
+static int db_get_tree(Process *p, DbTable *tbl,
+ Eterm key, Eterm *ret);
+static int db_member_tree(DbTable *tbl, Eterm key, Eterm *ret);
+static int db_get_element_tree(Process *p, DbTable *tbl,
+ Eterm key,int ndex,
+ Eterm *ret);
+static int db_erase_tree(DbTable *tbl, Eterm key, Eterm *ret);
+static int db_erase_object_tree(DbTable *tbl, Eterm object,Eterm *ret);
+static int db_slot_tree(Process *p, DbTable *tbl,
+ Eterm slot_term, Eterm *ret);
+static int db_select_tree(Process *p, DbTable *tbl,
+ Eterm pattern, int reversed, Eterm *ret);
+static int db_select_count_tree(Process *p, DbTable *tbl,
+ Eterm pattern, Eterm *ret);
+static int db_select_chunk_tree(Process *p, DbTable *tbl,
+ Eterm pattern, Sint chunk_size,
+ int reversed, Eterm *ret);
+static int db_select_continue_tree(Process *p, DbTable *tbl,
+ Eterm continuation, Eterm *ret);
+static int db_select_count_continue_tree(Process *p, DbTable *tbl,
+ Eterm continuation, Eterm *ret);
+static int db_select_delete_tree(Process *p, DbTable *tbl,
+ Eterm pattern, Eterm *ret);
+static int db_select_delete_continue_tree(Process *p, DbTable *tbl,
+ Eterm continuation, Eterm *ret);
+static void db_print_tree(int to, void *to_arg,
+ int show, DbTable *tbl);
+static int db_free_table_tree(DbTable *tbl);
+
+static int db_free_table_continue_tree(DbTable *tbl);
+
+static void db_foreach_offheap_tree(DbTable *,
+ void (*)(ErlOffHeap *, void *),
+ void *);
+
+static int db_delete_all_objects_tree(Process* p, DbTable* tbl);
+
+#ifdef HARDDEBUG
+static void db_check_table_tree(DbTable *tbl);
+#endif
+static int db_lookup_dbterm_tree(DbTable *, Eterm key, DbUpdateHandle*);
+static void db_finalize_dbterm_tree(DbUpdateHandle*);
+
+/*
+** Static variables
+*/
+
+Export ets_select_reverse_exp;
+
+/*
+** External interface
+*/
+DbTableMethod db_tree =
+{
+ db_create_tree,
+ db_first_tree,
+ db_next_tree,
+ db_last_tree,
+ db_prev_tree,
+ db_put_tree,
+ db_get_tree,
+ db_get_element_tree,
+ db_member_tree,
+ db_erase_tree,
+ db_erase_object_tree,
+ db_slot_tree,
+ db_select_chunk_tree,
+ db_select_tree, /* why not chunk size=0 ??? */
+ db_select_delete_tree,
+ db_select_continue_tree,
+ db_select_delete_continue_tree,
+ db_select_count_tree,
+ db_select_count_continue_tree,
+ db_delete_all_objects_tree,
+ db_free_table_tree,
+ db_free_table_continue_tree,
+ db_print_tree,
+ db_foreach_offheap_tree,
+#ifdef HARDDEBUG
+ db_check_table_tree,
+#else
+ NULL,
+#endif
+ db_lookup_dbterm_tree,
+ db_finalize_dbterm_tree
+
+};
+
+
+
+
+
+void db_initialize_tree(void)
+{
+ memset(&ets_select_reverse_exp, 0, sizeof(Export));
+ ets_select_reverse_exp.address =
+ &ets_select_reverse_exp.code[3];
+ ets_select_reverse_exp.code[0] = am_ets;
+ ets_select_reverse_exp.code[1] = am_reverse;
+ ets_select_reverse_exp.code[2] = 3;
+ ets_select_reverse_exp.code[3] =
+ (Eterm) em_apply_bif;
+ ets_select_reverse_exp.code[4] =
+ (Eterm) &ets_select_reverse;
+ return;
+};
+
+/*
+** Table interface routines ie what's called by the bif's
+*/
+
+int db_create_tree(Process *p, DbTable *tbl)
+{
+ DbTableTree *tb = &tbl->tree;
+ tb->root = NULL;
+ tb->static_stack.array = erts_db_alloc(ERTS_ALC_T_DB_STK,
+ (DbTable *) tb,
+ sizeof(TreeDbTerm *) * STACK_NEED);
+ tb->static_stack.pos = 0;
+ tb->static_stack.slot = 0;
+ erts_smp_atomic_init(&tb->is_stack_busy, 0);
+ tb->deletion = 0;
+ return DB_ERROR_NONE;
+}
+
+static int db_first_tree(Process *p, DbTable *tbl, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ DbTreeStack* stack;
+ TreeDbTerm *this;
+ Eterm e;
+ Eterm *hp;
+ Uint sz;
+
+ if (( this = tb->root ) == NULL) {
+ *ret = am_EOT;
+ return DB_ERROR_NONE;
+ }
+ /* Walk down to the tree to the left */
+ if ((stack = get_static_stack(tb)) != NULL) {
+ stack->pos = stack->slot = 0;
+ }
+ while (this->left != NULL) {
+ if (stack) PUSH_NODE(stack, this);
+ this = this->left;
+ }
+ if (stack) {
+ PUSH_NODE(stack, this);
+ stack->slot = 1;
+ release_stack(tb,stack);
+ }
+ e = GETKEY(tb, this->dbterm.tpl);
+ sz = size_object(e);
+
+ hp = HAlloc(p, sz);
+
+ *ret = copy_struct(e,sz,&hp,&MSO(p));
+
+ return DB_ERROR_NONE;
+}
+
+static int db_next_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ DbTreeStack* stack;
+ TreeDbTerm *this;
+ Eterm e;
+ Eterm *hp;
+ Uint sz;
+
+ if (is_atom(key) && key == am_EOT)
+ return DB_ERROR_BADKEY;
+ stack = get_any_stack(tb);
+ this = find_next(tb, stack, key);
+ release_stack(tb,stack);
+ if (this == NULL) {
+ *ret = am_EOT;
+ return DB_ERROR_NONE;
+ }
+ e = GETKEY(tb, this->dbterm.tpl);
+ sz = size_object(e);
+
+ hp = HAlloc(p, sz);
+
+ *ret = copy_struct(e,sz,&hp,&MSO(p));
+
+ return DB_ERROR_NONE;
+}
+
+static int db_last_tree(Process *p, DbTable *tbl, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ TreeDbTerm *this;
+ DbTreeStack* stack;
+ Eterm e;
+ Eterm *hp;
+ Uint sz;
+
+ if (( this = tb->root ) == NULL) {
+ *ret = am_EOT;
+ return DB_ERROR_NONE;
+ }
+ /* Walk down to the tree to the left */
+ if ((stack = get_static_stack(tb)) != NULL) {
+ stack->pos = stack->slot = 0;
+ }
+ while (this->right != NULL) {
+ if (stack) PUSH_NODE(stack, this);
+ this = this->right;
+ }
+ if (stack) {
+ PUSH_NODE(stack, this);
+ stack->slot = NITEMS(tb);
+ release_stack(tb,stack);
+ }
+ e = GETKEY(tb, this->dbterm.tpl);
+ sz = size_object(e);
+
+ hp = HAlloc(p, sz);
+
+ *ret = copy_struct(e,sz,&hp,&MSO(p));
+
+ return DB_ERROR_NONE;
+}
+
+static int db_prev_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ TreeDbTerm *this;
+ DbTreeStack* stack;
+ Eterm e;
+ Eterm *hp;
+ Uint sz;
+
+ if (is_atom(key) && key == am_EOT)
+ return DB_ERROR_BADKEY;
+ stack = get_any_stack(tb);
+ this = find_prev(tb, stack, key);
+ release_stack(tb,stack);
+ if (this == NULL) {
+ *ret = am_EOT;
+ return DB_ERROR_NONE;
+ }
+ e = GETKEY(tb, this->dbterm.tpl);
+ sz = size_object(e);
+
+ hp = HAlloc(p, sz);
+
+ *ret = copy_struct(e,sz,&hp,&MSO(p));
+
+ return DB_ERROR_NONE;
+}
+
+static int db_put_tree(DbTable *tbl, Eterm obj, int key_clash_fail)
+{
+ DbTableTree *tb = &tbl->tree;
+ /* Non recursive insertion in AVL tree, building our own stack */
+ TreeDbTerm **tstack[STACK_NEED];
+ int tpos = 0;
+ int dstack[STACK_NEED+1];
+ int dpos = 0;
+ int state = 0;
+ TreeDbTerm **this = &tb->root;
+ Sint c;
+ Eterm key;
+ int dir;
+ TreeDbTerm *p1, *p2, *p;
+
+ key = GETKEY(tb, tuple_val(obj));
+
+ reset_static_stack(tb);
+
+ dstack[dpos++] = DIR_END;
+ for (;;)
+ if (!*this) { /* Found our place */
+ state = 1;
+ if (erts_smp_atomic_inctest(&tb->common.nitems) >= TREE_MAX_ELEMENTS) {
+ erts_smp_atomic_dec(&tb->common.nitems);
+ return DB_ERROR_SYSRES;
+ }
+ *this = get_term(tb, NULL, obj);
+ (*this)->balance = 0;
+ (*this)->left = (*this)->right = NULL;
+ break;
+ } else if ((c = cmp(key,GETKEY(tb,(*this)->dbterm.tpl))) < 0) {
+ /* go left */
+ dstack[dpos++] = DIR_LEFT;
+ tstack[tpos++] = this;
+ this = &((*this)->left);
+ } else if (c > 0) { /* go right */
+ dstack[dpos++] = DIR_RIGHT;
+ tstack[tpos++] = this;
+ this = &((*this)->right);
+ } else if (!key_clash_fail) { /* Equal key and this is a set, replace. */
+ *this = get_term(tb, *this, obj);
+ break;
+ } else {
+ return DB_ERROR_BADKEY; /* key already exists */
+ }
+
+ while (state && ( dir = dstack[--dpos] ) != DIR_END) {
+ this = tstack[--tpos];
+ p = *this;
+ if (dir == DIR_LEFT) {
+ switch (p->balance) {
+ case 1:
+ p->balance = 0;
+ state = 0;
+ break;
+ case 0:
+ p->balance = -1;
+ break;
+ case -1: /* The icky case */
+ p1 = p->left;
+ if (p1->balance == -1) { /* Single LL rotation */
+ p->left = p1->right;
+ p1->right = p;
+ p->balance = 0;
+ (*this) = p1;
+ } else { /* Double RR rotation */
+ p2 = p1->right;
+ p1->right = p2->left;
+ p2->left = p1;
+ p->left = p2->right;
+ p2->right = p;
+ p->balance = (p2->balance == -1) ? +1 : 0;
+ p1->balance = (p2->balance == 1) ? -1 : 0;
+ (*this) = p2;
+ }
+ (*this)->balance = 0;
+ state = 0;
+ break;
+ }
+ } else { /* dir == DIR_RIGHT */
+ switch (p->balance) {
+ case -1:
+ p->balance = 0;
+ state = 0;
+ break;
+ case 0:
+ p->balance = 1;
+ break;
+ case 1:
+ p1 = p->right;
+ if (p1->balance == 1) { /* Single RR rotation */
+ p->right = p1->left;
+ p1->left = p;
+ p->balance = 0;
+ (*this) = p1;
+ } else { /* Double RL rotation */
+ p2 = p1->left;
+ p1->left = p2->right;
+ p2->right = p1;
+ p->right = p2->left;
+ p2->left = p;
+ p->balance = (p2->balance == 1) ? -1 : 0;
+ p1->balance = (p2->balance == -1) ? 1 : 0;
+ (*this) = p2;
+ }
+ (*this)->balance = 0;
+ state = 0;
+ break;
+ }
+ }
+ }
+ return DB_ERROR_NONE;
+}
+
+static int db_get_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ Eterm copy;
+ Eterm *hp;
+ TreeDbTerm *this;
+
+ /*
+ * This is always a set, so we know exactly how large
+ * the data is when we have found it.
+ * The list created around it is purely for interface conformance.
+ */
+
+ this = find_node(tb,key);
+ if (this == NULL) {
+ *ret = NIL;
+ } else {
+ hp = HAlloc(p, this->dbterm.size + 2);
+ copy = copy_shallow(DBTERM_BUF(&this->dbterm),
+ this->dbterm.size,
+ &hp,
+ &MSO(p));
+ *ret = CONS(hp, copy, NIL);
+ }
+ return DB_ERROR_NONE;
+}
+
+static int db_member_tree(DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+
+ *ret = (find_node(tb,key) == NULL) ? am_false : am_true;
+ return DB_ERROR_NONE;
+}
+
+static int db_get_element_tree(Process *p, DbTable *tbl,
+ Eterm key, int ndex, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ /*
+ * Look the node up:
+ */
+ Eterm *hp;
+ TreeDbTerm *this;
+
+ /*
+ * This is always a set, so we know exactly how large
+ * the data is when we have found it.
+ * No list is created around elements in set's so there are no list
+ * around the element here either.
+ */
+
+ this = find_node(tb,key);
+ if (this == NULL) {
+ return DB_ERROR_BADKEY;
+ } else {
+ Eterm element;
+ Uint sz;
+ if (ndex > arityval(this->dbterm.tpl[0])) {
+ return DB_ERROR_BADPARAM;
+ }
+ element = this->dbterm.tpl[ndex];
+ sz = size_object(element);
+ hp = HAlloc(p, sz);
+ *ret = copy_struct(element,
+ sz,
+ &hp,
+ &MSO(p));
+ }
+ return DB_ERROR_NONE;
+}
+
+static int db_erase_tree(DbTable *tbl, Eterm key, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ TreeDbTerm *res;
+
+ *ret = am_true;
+
+ if ((res = linkout_tree(tb, key)) != NULL) {
+ free_term(tb, res);
+ }
+ return DB_ERROR_NONE;
+}
+
+static int db_erase_object_tree(DbTable *tbl, Eterm object, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ TreeDbTerm *res;
+
+ *ret = am_true;
+
+ if ((res = linkout_object_tree(tb, object)) != NULL) {
+ free_term(tb, res);
+ }
+ return DB_ERROR_NONE;
+}
+
+
+static int db_slot_tree(Process *p, DbTable *tbl,
+ Eterm slot_term, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ Sint slot;
+ TreeDbTerm *st;
+ Eterm *hp;
+ Eterm copy;
+
+ /*
+ * The notion of a "slot" is not natural in a tree, but we try to
+ * simulate it by giving the n'th node in the tree instead.
+ * Traversing a tree in this way is not very convenient, but by
+ * using the saved stack we at least sometimes will get acceptable
+ * performance.
+ */
+
+ if (is_not_small(slot_term) ||
+ ((slot = signed_val(slot_term)) < 0) ||
+ (slot > NITEMS(tb)))
+ return DB_ERROR_BADPARAM;
+
+ if (slot == NITEMS(tb)) {
+ *ret = am_EOT;
+ return DB_ERROR_NONE;
+ }
+
+ /*
+ * We use the slot position and search from there, slot positions
+ * are counted from 1 and up.
+ */
+ ++slot;
+ st = slot_search(p, tb, slot);
+ if (st == NULL) {
+ *ret = am_false;
+ return DB_ERROR_UNSPEC;
+ }
+ hp = HAlloc(p, st->dbterm.size + 2);
+ copy = copy_shallow(DBTERM_BUF(&st->dbterm),
+ st->dbterm.size,
+ &hp,
+ &MSO(p));
+ *ret = CONS(hp, copy, NIL);
+ return DB_ERROR_NONE;
+}
+
+
+
+static BIF_RETTYPE ets_select_reverse(Process *p, Eterm a1, Eterm a2, Eterm a3)
+{
+ Eterm list;
+ Eterm result;
+ Eterm* hp;
+ Eterm* hend;
+
+ int max_iter = CONTEXT_REDS * 10;
+
+ if (is_nil(a1)) {
+ hp = HAlloc(p, 3);
+ BIF_RET(TUPLE2(hp,a2,a3));
+ } else if (is_not_list(a1)) {
+ error:
+ BIF_ERROR(p, BADARG);
+ }
+
+ list = a1;
+ result = a2;
+ hp = hend = NULL;
+ while (is_list(list)) {
+ Eterm* pair = list_val(list);
+ if (--max_iter == 0) {
+ BUMP_ALL_REDS(p);
+ HRelease(p, hend, hp);
+ BIF_TRAP3(&ets_select_reverse_exp, p, list, result, a3);
+ }
+ if (hp == hend) {
+ hp = HAlloc(p, 64);
+ hend = hp + 64;
+ }
+ result = CONS(hp, CAR(pair), result);
+ hp += 2;
+ list = CDR(pair);
+ }
+ if (is_not_nil(list)) {
+ goto error;
+ }
+ HRelease(p, hend, hp);
+ BUMP_REDS(p,CONTEXT_REDS - max_iter / 10);
+ hp = HAlloc(p,3);
+ BIF_RET(TUPLE2(hp, result, a3));
+}
+
+static BIF_RETTYPE bif_trap1(Export *bif,
+ Process *p,
+ Eterm p1)
+{
+ BIF_TRAP1(bif, p, p1);
+}
+
+static BIF_RETTYPE bif_trap3(Export *bif,
+ Process *p,
+ Eterm p1,
+ Eterm p2,
+ Eterm p3)
+{
+ BIF_TRAP3(bif, p, p1, p2, p3);
+}
+
+/*
+** This is called either when the select bif traps or when ets:select/1
+** is called. It does mostly the same as db_select_tree and may in either case
+** trap to itself again (via the ets:select/1 bif).
+** Note that this is common for db_select_tree and db_select_chunk_tree.
+*/
+static int db_select_continue_tree(Process *p,
+ DbTable *tbl,
+ Eterm continuation,
+ Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ DbTreeStack* stack;
+ struct select_context sc;
+ unsigned sz;
+ Eterm *hp;
+ Eterm lastkey;
+ Eterm end_condition;
+ Binary *mp;
+ Eterm key;
+ Eterm *tptr;
+ Sint chunk_size;
+ Sint reverse;
+
+
+#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);
+
+ /* Decode continuation. We know it's a tuple but not the arity or
+ anything else */
+
+ tptr = tuple_val(continuation);
+
+ if (arityval(*tptr) != 8)
+ RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
+
+ if (!is_small(tptr[4]) || !is_binary(tptr[5]) ||
+ !(is_list(tptr[6]) || tptr[6] == NIL) || !is_small(tptr[7]) ||
+ !is_small(tptr[8]))
+ RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
+
+ lastkey = tptr[2];
+ end_condition = tptr[3];
+ if (!(thing_subtag(*binary_val(tptr[5])) == REFC_BINARY_SUBTAG))
+ RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
+ mp = ((ProcBin *) binary_val(tptr[5]))->val;
+ if (!IsMatchProgBinary(mp))
+ RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
+ chunk_size = signed_val(tptr[4]);
+
+ sc.p = p;
+ sc.accum = tptr[6];
+ sc.mp = mp;
+ sc.end_condition = NIL;
+ sc.lastobj = NULL;
+ sc.max = 1000;
+ sc.keypos = tb->common.keypos;
+ sc.all_objects = mp->flags & BIN_FLAG_ALL_OBJECTS;
+ sc.chunk_size = chunk_size;
+ reverse = unsigned_val(tptr[7]);
+ sc.got = signed_val(tptr[8]);
+
+ stack = get_any_stack(tb);
+ if (chunk_size) {
+ if (reverse) {
+ traverse_backwards(tb, stack, lastkey, &doit_select_chunk, &sc);
+ } else {
+ traverse_forward(tb, stack, lastkey, &doit_select_chunk, &sc);
+ }
+ } else {
+ if (reverse) {
+ traverse_forward(tb, stack, lastkey, &doit_select, &sc);
+ } else {
+ traverse_backwards(tb, stack, lastkey, &doit_select, &sc);
+ }
+ }
+ release_stack(tb,stack);
+
+ BUMP_REDS(p, 1000 - sc.max);
+
+ if (sc.max > 0 || (chunk_size && sc.got == chunk_size)) {
+ if (chunk_size) {
+ Eterm *hp;
+ unsigned sz;
+
+ if (sc.got < chunk_size || sc.lastobj == NULL) {
+ /* end of table, sc.lastobj may be NULL as we may have been
+ at the very last object in the table when trapping. */
+ if (!sc.got) {
+ RET_TO_BIF(am_EOT, DB_ERROR_NONE);
+ } else {
+ RET_TO_BIF(bif_trap3(&ets_select_reverse_exp, p,
+ sc.accum, NIL, am_EOT),
+ DB_ERROR_NONE);
+ }
+ }
+
+ key = GETKEY(tb, sc.lastobj);
+
+ sz = size_object(key);
+ hp = HAlloc(p, 9 + sz);
+ key = copy_struct(key, sz, &hp, &MSO(p));
+ continuation = TUPLE8
+ (hp,
+ tptr[1],
+ key,
+ tptr[3],
+ tptr[4],
+ tptr[5],
+ NIL,
+ tptr[7],
+ make_small(0));
+ RET_TO_BIF(bif_trap3(&ets_select_reverse_exp, p,
+ sc.accum, NIL, continuation),
+ DB_ERROR_NONE);
+ } else {
+ RET_TO_BIF(sc.accum, DB_ERROR_NONE);
+ }
+ }
+ key = GETKEY(tb, sc.lastobj);
+ if (chunk_size) {
+ if (end_condition != NIL &&
+ ((!reverse && cmp_partly_bound(end_condition,key) < 0) ||
+ (reverse && cmp_partly_bound(end_condition,key) > 0))) {
+ /* done anyway */
+ if (!sc.got) {
+ RET_TO_BIF(am_EOT, DB_ERROR_NONE);
+ } else {
+ RET_TO_BIF(bif_trap3(&ets_select_reverse_exp, p,
+ sc.accum, NIL, am_EOT),
+ DB_ERROR_NONE);
+ }
+ }
+ } else {
+ if (end_condition != NIL &&
+ ((!reverse && cmp_partly_bound(end_condition,key) > 0) ||
+ (reverse && cmp_partly_bound(end_condition,key) < 0))) {
+ /* done anyway */
+ RET_TO_BIF(sc.accum,DB_ERROR_NONE);
+ }
+ }
+ /* Not done yet, let's trap. */
+ sz = size_object(key);
+ hp = HAlloc(p, 9 + sz);
+ key = copy_struct(key, sz, &hp, &MSO(p));
+ continuation = TUPLE8
+ (hp,
+ tptr[1],
+ key,
+ tptr[3],
+ tptr[4],
+ tptr[5],
+ sc.accum,
+ tptr[7],
+ make_small(sc.got));
+ RET_TO_BIF(bif_trap1(bif_export[BIF_ets_select_1], p, continuation),
+ DB_ERROR_NONE);
+
+#undef RET_TO_BIF
+}
+
+
+static int db_select_tree(Process *p, DbTable *tbl,
+ Eterm pattern, int reverse, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ DbTreeStack* stack;
+ struct select_context sc;
+ struct mp_info mpi;
+ Eterm lastkey = NIL;
+ Eterm key;
+ Eterm continuation;
+ unsigned sz;
+ Eterm *hp;
+ TreeDbTerm *this;
+ int errcode;
+ Eterm mpb;
+
+
+#define RET_TO_BIF(Term,RetVal) do { \
+ if (mpi.mp != NULL) { \
+ erts_bin_free(mpi.mp); \
+ } \
+ *ret = (Term); \
+ return RetVal; \
+ } while(0)
+
+ mpi.mp = NULL;
+
+ sc.accum = NIL;
+ sc.lastobj = NULL;
+ sc.p = p;
+ sc.max = 1000;
+ sc.end_condition = NIL;
+ sc.keypos = tb->common.keypos;
+ sc.got = 0;
+ sc.chunk_size = 0;
+
+ if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) {
+ RET_TO_BIF(NIL,errcode);
+ }
+
+ if (!mpi.something_can_match) {
+ RET_TO_BIF(NIL,DB_ERROR_NONE);
+ /* can't possibly match anything */
+ }
+
+ sc.mp = mpi.mp;
+ sc.all_objects = mpi.all_objects;
+
+ if (!mpi.got_partial && mpi.some_limitation &&
+ cmp(mpi.least,mpi.most) == 0) {
+ doit_select(tb,mpi.save_term,&sc,0 /* direction doesn't matter */);
+ RET_TO_BIF(sc.accum,DB_ERROR_NONE);
+ }
+
+ stack = get_any_stack(tb);
+ if (reverse) {
+ if (mpi.some_limitation) {
+ if ((this = find_prev_from_pb_key(tb, stack, mpi.least)) != NULL) {
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ }
+ sc.end_condition = mpi.most;
+ }
+
+ traverse_forward(tb, stack, lastkey, &doit_select, &sc);
+ } else {
+ if (mpi.some_limitation) {
+ if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) {
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ }
+ sc.end_condition = mpi.least;
+ }
+
+ traverse_backwards(tb, stack, lastkey, &doit_select, &sc);
+ }
+ release_stack(tb,stack);
+#ifdef HARDDEBUG
+ erts_fprintf(stderr,"Least: %T\n", mpi.least);
+ erts_fprintf(stderr,"Most: %T\n", mpi.most);
+#endif
+ BUMP_REDS(p, 1000 - sc.max);
+ if (sc.max > 0) {
+ RET_TO_BIF(sc.accum,DB_ERROR_NONE);
+ }
+
+ key = GETKEY(tb, sc.lastobj);
+ sz = size_object(key);
+ hp = HAlloc(p, 9 + sz + PROC_BIN_SIZE);
+ key = copy_struct(key, sz, &hp, &MSO(p));
+ if (mpi.all_objects)
+ (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS;
+ mpb=db_make_mp_binary(p,mpi.mp,&hp);
+
+ continuation = TUPLE8
+ (hp,
+ tb->common.id,
+ key,
+ sc.end_condition, /* From the match program, needn't be copied */
+ make_small(0), /* Chunk size of zero means not chunked to the
+ continuation BIF */
+ mpb,
+ sc.accum,
+ make_small(reverse),
+ make_small(sc.got));
+
+ /* Don't free mpi.mp, so don't use macro */
+ *ret = bif_trap1(bif_export[BIF_ets_select_1], p, continuation);
+ return DB_ERROR_NONE;
+
+#undef RET_TO_BIF
+
+}
+
+
+/*
+** This is called either when the select_count bif traps.
+*/
+static int db_select_count_continue_tree(Process *p,
+ DbTable *tbl,
+ Eterm continuation,
+ Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ DbTreeStack* stack;
+ struct select_count_context sc;
+ unsigned sz;
+ Eterm *hp;
+ Eterm lastkey;
+ Eterm end_condition;
+ Binary *mp;
+ Eterm key;
+ Eterm *tptr;
+ Eterm egot;
+
+
+#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);
+
+ /* Decode continuation. We know it's a tuple and everything else as
+ this is only called by ourselves */
+
+ /* continuation:
+ {Table, Lastkey, EndCondition, MatchProgBin, HowManyGot}*/
+
+ tptr = tuple_val(continuation);
+
+ if (arityval(*tptr) != 5)
+ erl_exit(1,"Internal error in ets:select_count/1");
+
+ lastkey = tptr[2];
+ end_condition = tptr[3];
+ if (!(thing_subtag(*binary_val(tptr[4])) == REFC_BINARY_SUBTAG))
+ RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
+ mp = ((ProcBin *) binary_val(tptr[4]))->val;
+ if (!IsMatchProgBinary(mp))
+ RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
+
+ sc.p = p;
+ sc.mp = mp;
+ sc.end_condition = NIL;
+ sc.lastobj = NULL;
+ sc.max = 1000;
+ sc.keypos = tb->common.keypos;
+ if (is_big(tptr[5])) {
+ sc.got = big_to_uint32(tptr[5]);
+ } else {
+ sc.got = unsigned_val(tptr[5]);
+ }
+
+ stack = get_any_stack(tb);
+ traverse_backwards(tb, stack, lastkey, &doit_select_count, &sc);
+ release_stack(tb,stack);
+
+ BUMP_REDS(p, 1000 - sc.max);
+
+ if (sc.max > 0) {
+ RET_TO_BIF(erts_make_integer(sc.got,p), DB_ERROR_NONE);
+ }
+ key = GETKEY(tb, sc.lastobj);
+ if (end_condition != NIL &&
+ (cmp_partly_bound(end_condition,key) > 0)) {
+ /* done anyway */
+ RET_TO_BIF(make_small(sc.got),DB_ERROR_NONE);
+ }
+ /* Not done yet, let's trap. */
+ sz = size_object(key);
+ if (IS_USMALL(0, sc.got)) {
+ hp = HAlloc(p, sz + 6);
+ egot = make_small(sc.got);
+ }
+ else {
+ hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + 6);
+ egot = uint_to_big(sc.got, hp);
+ hp += BIG_UINT_HEAP_SIZE;
+ }
+ key = copy_struct(key, sz, &hp, &MSO(p));
+ continuation = TUPLE5
+ (hp,
+ tptr[1],
+ key,
+ tptr[3],
+ tptr[4],
+ egot);
+ RET_TO_BIF(bif_trap1(&ets_select_count_continue_exp, p, continuation),
+ DB_ERROR_NONE);
+
+#undef RET_TO_BIF
+}
+
+
+static int db_select_count_tree(Process *p, DbTable *tbl,
+ Eterm pattern, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ DbTreeStack* stack;
+ struct select_count_context sc;
+ struct mp_info mpi;
+ Eterm lastkey = NIL;
+ Eterm key;
+ Eterm continuation;
+ unsigned sz;
+ Eterm *hp;
+ TreeDbTerm *this;
+ int errcode;
+ Eterm egot;
+ Eterm mpb;
+
+
+#define RET_TO_BIF(Term,RetVal) do { \
+ if (mpi.mp != NULL) { \
+ erts_bin_free(mpi.mp); \
+ } \
+ *ret = (Term); \
+ return RetVal; \
+ } while(0)
+
+ mpi.mp = NULL;
+
+ sc.lastobj = NULL;
+ sc.p = p;
+ sc.max = 1000;
+ sc.end_condition = NIL;
+ sc.keypos = tb->common.keypos;
+ sc.got = 0;
+
+ if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) {
+ RET_TO_BIF(NIL,errcode);
+ }
+
+ if (!mpi.something_can_match) {
+ RET_TO_BIF(make_small(0),DB_ERROR_NONE);
+ /* can't possibly match anything */
+ }
+
+ sc.mp = mpi.mp;
+ sc.all_objects = mpi.all_objects;
+
+ if (!mpi.got_partial && mpi.some_limitation &&
+ cmp(mpi.least,mpi.most) == 0) {
+ doit_select_count(tb,mpi.save_term,&sc,0 /* dummy */);
+ RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE);
+ }
+
+ stack = get_any_stack(tb);
+ if (mpi.some_limitation) {
+ if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) {
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ }
+ sc.end_condition = mpi.least;
+ }
+
+ traverse_backwards(tb, stack, lastkey, &doit_select_count, &sc);
+ release_stack(tb,stack);
+ BUMP_REDS(p, 1000 - sc.max);
+ if (sc.max > 0) {
+ RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE);
+ }
+
+ key = GETKEY(tb, sc.lastobj);
+ sz = size_object(key);
+ if (IS_USMALL(0, sc.got)) {
+ hp = HAlloc(p, sz + PROC_BIN_SIZE + 6);
+ egot = make_small(sc.got);
+ }
+ else {
+ hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + PROC_BIN_SIZE + 6);
+ egot = uint_to_big(sc.got, hp);
+ hp += BIG_UINT_HEAP_SIZE;
+ }
+ key = copy_struct(key, sz, &hp, &MSO(p));
+ if (mpi.all_objects)
+ (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS;
+ mpb = db_make_mp_binary(p,mpi.mp,&hp);
+
+ continuation = TUPLE5
+ (hp,
+ tb->common.id,
+ key,
+ sc.end_condition, /* From the match program, needn't be copied */
+ mpb,
+ egot);
+
+ /* Don't free mpi.mp, so don't use macro */
+ *ret = bif_trap1(&ets_select_count_continue_exp, p, continuation);
+ return DB_ERROR_NONE;
+
+#undef RET_TO_BIF
+
+}
+
+static int db_select_chunk_tree(Process *p, DbTable *tbl,
+ Eterm pattern, Sint chunk_size,
+ int reverse,
+ Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ DbTreeStack* stack;
+ struct select_context sc;
+ struct mp_info mpi;
+ Eterm lastkey = NIL;
+ Eterm key;
+ Eterm continuation;
+ unsigned sz;
+ Eterm *hp;
+ TreeDbTerm *this;
+ int errcode;
+ Eterm mpb;
+
+
+#define RET_TO_BIF(Term,RetVal) do { \
+ if (mpi.mp != NULL) { \
+ erts_bin_free(mpi.mp); \
+ } \
+ *ret = (Term); \
+ return RetVal; \
+ } while(0)
+
+ mpi.mp = NULL;
+
+ sc.accum = NIL;
+ sc.lastobj = NULL;
+ sc.p = p;
+ sc.max = 1000;
+ sc.end_condition = NIL;
+ sc.keypos = tb->common.keypos;
+ sc.got = 0;
+ sc.chunk_size = chunk_size;
+
+ if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) {
+ RET_TO_BIF(NIL,errcode);
+ }
+
+ if (!mpi.something_can_match) {
+ RET_TO_BIF(am_EOT,DB_ERROR_NONE);
+ /* can't possibly match anything */
+ }
+
+ sc.mp = mpi.mp;
+ sc.all_objects = mpi.all_objects;
+
+ if (!mpi.got_partial && mpi.some_limitation &&
+ cmp(mpi.least,mpi.most) == 0) {
+ doit_select(tb,mpi.save_term,&sc, 0 /* direction doesn't matter */);
+ if (sc.accum != NIL) {
+ hp=HAlloc(p, 3);
+ RET_TO_BIF(TUPLE2(hp,sc.accum,am_EOT),DB_ERROR_NONE);
+ } else {
+ RET_TO_BIF(am_EOT,DB_ERROR_NONE);
+ }
+ }
+
+ stack = get_any_stack(tb);
+ if (reverse) {
+ if (mpi.some_limitation) {
+ if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) {
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ }
+ sc.end_condition = mpi.least;
+ }
+
+ traverse_backwards(tb, stack, lastkey, &doit_select_chunk, &sc);
+ } else {
+ if (mpi.some_limitation) {
+ if ((this = find_prev_from_pb_key(tb, stack, mpi.least)) != NULL) {
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ }
+ sc.end_condition = mpi.most;
+ }
+
+ traverse_forward(tb, stack, lastkey, &doit_select_chunk, &sc);
+ }
+ release_stack(tb,stack);
+
+ BUMP_REDS(p, 1000 - sc.max);
+ if (sc.max > 0 || sc.got == chunk_size) {
+ Eterm *hp;
+ unsigned sz;
+
+ if (sc.got < chunk_size ||
+ sc.lastobj == NULL) {
+ /* We haven't got all and we haven't trapped
+ which should mean we are at the end of the
+ table, sc.lastobj may be NULL if the table was empty */
+
+ if (!sc.got) {
+ RET_TO_BIF(am_EOT, DB_ERROR_NONE);
+ } else {
+ RET_TO_BIF(bif_trap3(&ets_select_reverse_exp, p,
+ sc.accum, NIL, am_EOT),
+ DB_ERROR_NONE);
+ }
+ }
+
+ key = GETKEY(tb, sc.lastobj);
+ sz = size_object(key);
+ hp = HAlloc(p, 9 + sz + PROC_BIN_SIZE);
+ key = copy_struct(key, sz, &hp, &MSO(p));
+ if (mpi.all_objects)
+ (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS;
+ mpb = db_make_mp_binary(p,mpi.mp,&hp);
+
+ continuation = TUPLE8
+ (hp,
+ tb->common.id,
+ key,
+ sc.end_condition, /* From the match program,
+ needn't be copied */
+ make_small(chunk_size),
+ mpb,
+ NIL,
+ make_small(reverse),
+ make_small(0));
+ /* Don't let RET_TO_BIF macro free mpi.mp*/
+ *ret = bif_trap3(&ets_select_reverse_exp, p,
+ sc.accum, NIL, continuation);
+ return DB_ERROR_NONE;
+ }
+
+ key = GETKEY(tb, sc.lastobj);
+ sz = size_object(key);
+ hp = HAlloc(p, 9 + sz + PROC_BIN_SIZE);
+ key = copy_struct(key, sz, &hp, &MSO(p));
+
+ if (mpi.all_objects)
+ (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS;
+ mpb = db_make_mp_binary(p,mpi.mp,&hp);
+ continuation = TUPLE8
+ (hp,
+ tb->common.id,
+ key,
+ sc.end_condition, /* From the match program, needn't be copied */
+ make_small(chunk_size),
+ mpb,
+ sc.accum,
+ make_small(reverse),
+ make_small(sc.got));
+ /* Don't let RET_TO_BIF macro free mpi.mp*/
+ *ret = bif_trap1(bif_export[BIF_ets_select_1], p, continuation);
+ return DB_ERROR_NONE;
+
+#undef RET_TO_BIF
+
+}
+
+/*
+** This is called when select_delete traps
+*/
+static int db_select_delete_continue_tree(Process *p,
+ DbTable *tbl,
+ Eterm continuation,
+ Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ struct select_delete_context sc;
+ unsigned sz;
+ Eterm *hp;
+ Eterm lastkey;
+ Eterm end_condition;
+ Binary *mp;
+ Eterm key;
+ Eterm *tptr;
+ Eterm eaccsum;
+
+
+#define RET_TO_BIF(Term, State) do { \
+ if (sc.erase_lastterm) { \
+ free_term(tb, sc.lastterm); \
+ } \
+ *ret = (Term); \
+ return State; \
+ } while(0);
+
+ /* Decode continuation. We know it's correct, this can only be called
+ by trapping */
+
+ tptr = tuple_val(continuation);
+
+ lastkey = tptr[2];
+ end_condition = tptr[3];
+
+ sc.erase_lastterm = 0; /* Before first RET_TO_BIF */
+ sc.lastterm = NULL;
+
+ mp = ((ProcBin *) binary_val(tptr[4]))->val;
+ sc.p = p;
+ sc.tb = tb;
+ if (is_big(tptr[5])) {
+ sc.accum = big_to_uint32(tptr[5]);
+ } else {
+ sc.accum = unsigned_val(tptr[5]);
+ }
+ sc.mp = mp;
+ sc.end_condition = NIL;
+ sc.max = 1000;
+ sc.keypos = tb->common.keypos;
+
+ ASSERT(!erts_smp_atomic_read(&tb->is_stack_busy));
+ traverse_backwards(tb, &tb->static_stack, lastkey, &doit_select_delete, &sc);
+
+ BUMP_REDS(p, 1000 - sc.max);
+
+ if (sc.max > 0) {
+ RET_TO_BIF(erts_make_integer(sc.accum, p), DB_ERROR_NONE);
+ }
+ key = GETKEY(tb, (sc.lastterm)->dbterm.tpl);
+ if (end_condition != NIL &&
+ cmp_partly_bound(end_condition,key) > 0) { /* done anyway */
+ RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE);
+ }
+ /* Not done yet, let's trap. */
+ sz = size_object(key);
+ if (IS_USMALL(0, sc.accum)) {
+ hp = HAlloc(p, sz + 6);
+ eaccsum = make_small(sc.accum);
+ }
+ else {
+ hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + 6);
+ eaccsum = uint_to_big(sc.accum, hp);
+ hp += BIG_UINT_HEAP_SIZE;
+ }
+ key = copy_struct(key, sz, &hp, &MSO(p));
+ continuation = TUPLE5
+ (hp,
+ tptr[1],
+ key,
+ tptr[3],
+ tptr[4],
+ eaccsum);
+ RET_TO_BIF(bif_trap1(&ets_select_delete_continue_exp, p, continuation),
+ DB_ERROR_NONE);
+
+#undef RET_TO_BIF
+}
+
+static int db_select_delete_tree(Process *p, DbTable *tbl,
+ Eterm pattern, Eterm *ret)
+{
+ DbTableTree *tb = &tbl->tree;
+ struct select_delete_context sc;
+ struct mp_info mpi;
+ Eterm lastkey = NIL;
+ Eterm key;
+ Eterm continuation;
+ unsigned sz;
+ Eterm *hp;
+ TreeDbTerm *this;
+ int errcode;
+ Eterm mpb;
+ Eterm eaccsum;
+
+#define RET_TO_BIF(Term,RetVal) do { \
+ if (mpi.mp != NULL) { \
+ erts_bin_free(mpi.mp); \
+ } \
+ if (sc.erase_lastterm) { \
+ free_term(tb, sc.lastterm); \
+ } \
+ *ret = (Term); \
+ return RetVal; \
+ } while(0)
+
+ mpi.mp = NULL;
+
+ sc.accum = 0;
+ sc.erase_lastterm = 0;
+ sc.lastterm = NULL;
+ sc.p = p;
+ sc.max = 1000;
+ sc.end_condition = NIL;
+ sc.keypos = tb->common.keypos;
+ sc.tb = tb;
+
+ if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) {
+ RET_TO_BIF(0,errcode);
+ }
+
+ if (!mpi.something_can_match) {
+ RET_TO_BIF(make_small(0),DB_ERROR_NONE);
+ /* can't possibly match anything */
+ }
+
+ sc.mp = mpi.mp;
+
+ if (!mpi.got_partial && mpi.some_limitation &&
+ cmp(mpi.least,mpi.most) == 0) {
+ doit_select_delete(tb,mpi.save_term,&sc, 0 /* direction doesn't
+ matter */);
+ RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE);
+ }
+
+ if (mpi.some_limitation) {
+ if ((this = find_next_from_pb_key(tb, &tb->static_stack, mpi.most)) != NULL) {
+ lastkey = GETKEY(tb, this->dbterm.tpl);
+ }
+ sc.end_condition = mpi.least;
+ }
+
+ traverse_backwards(tb, &tb->static_stack, lastkey, &doit_select_delete, &sc);
+ BUMP_REDS(p, 1000 - sc.max);
+
+ if (sc.max > 0) {
+ RET_TO_BIF(erts_make_integer(sc.accum,p), DB_ERROR_NONE);
+ }
+
+ key = GETKEY(tb, (sc.lastterm)->dbterm.tpl);
+ sz = size_object(key);
+ if (IS_USMALL(0, sc.accum)) {
+ hp = HAlloc(p, sz + PROC_BIN_SIZE + 6);
+ eaccsum = make_small(sc.accum);
+ }
+ else {
+ hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + PROC_BIN_SIZE + 6);
+ eaccsum = uint_to_big(sc.accum, hp);
+ hp += BIG_UINT_HEAP_SIZE;
+ }
+ key = copy_struct(key, sz, &hp, &MSO(p));
+ mpb = db_make_mp_binary(p,mpi.mp,&hp);
+
+ continuation = TUPLE5
+ (hp,
+ tb->common.id,
+ key,
+ sc.end_condition, /* From the match program, needn't be copied */
+ mpb,
+ eaccsum);
+
+ /* Don't free mpi.mp, so don't use macro */
+ if (sc.erase_lastterm) {
+ free_term(tb, sc.lastterm);
+ }
+ *ret = bif_trap1(&ets_select_delete_continue_exp, p, continuation);
+ return DB_ERROR_NONE;
+
+#undef RET_TO_BIF
+
+}
+
+/*
+** Other interface routines (not directly coupled to one bif)
+*/
+
+/* Display hash table contents (for dump) */
+static void db_print_tree(int to, void *to_arg,
+ int show,
+ DbTable *tbl)
+{
+ DbTableTree *tb = &tbl->tree;
+#ifdef TREE_DEBUG
+ if (show)
+ erts_print(to, to_arg, "\nTree data dump:\n"
+ "------------------------------------------------\n");
+ do_dump_tree2(to, to_arg, show, tb->root, 0);
+ if (show)
+ erts_print(to, to_arg, "\n"
+ "------------------------------------------------\n");
+#else
+ erts_print(to, to_arg, "Ordered set (AVL tree), Elements: %d\n", NITEMS(tb));
+ do_dump_tree(to, to_arg, tb->root);
+#endif
+}
+
+/* release all memory occupied by a single table */
+static int db_free_table_tree(DbTable *tbl)
+{
+ while (!db_free_table_continue_tree(tbl))
+ ;
+ return 1;
+}
+
+static int db_free_table_continue_tree(DbTable *tbl)
+{
+ DbTableTree *tb = &tbl->tree;
+ int result;
+
+ if (!tb->deletion) {
+ tb->static_stack.pos = 0;
+ tb->deletion = 1;
+ PUSH_NODE(&tb->static_stack, tb->root);
+ }
+ result = do_free_tree_cont(tb, DELETE_RECORD_LIMIT);
+ if (result) { /* Completely done. */
+ erts_db_free(ERTS_ALC_T_DB_STK,
+ (DbTable *) tb,
+ (void *) tb->static_stack.array,
+ sizeof(TreeDbTerm *) * STACK_NEED);
+ ASSERT(erts_smp_atomic_read(&tb->common.memory_size)
+ == sizeof(DbTable));
+ }
+ return result;
+}
+
+static int db_delete_all_objects_tree(Process* p, DbTable* tbl)
+{
+ db_free_table_tree(tbl);
+ db_create_tree(p, tbl);
+ erts_smp_atomic_set(&tbl->tree.common.nitems, 0);
+ return 0;
+}
+
+static void do_db_tree_foreach_offheap(TreeDbTerm *,
+ void (*)(ErlOffHeap *, void *),
+ void *);
+
+static void db_foreach_offheap_tree(DbTable *tbl,
+ void (*func)(ErlOffHeap *, void *),
+ void * arg)
+{
+ do_db_tree_foreach_offheap(tbl->tree.root, func, arg);
+}
+
+
+/*
+** Functions for internal use
+*/
+
+
+static void
+do_db_tree_foreach_offheap(TreeDbTerm *tdbt,
+ void (*func)(ErlOffHeap *, void *),
+ void * arg)
+{
+ if(!tdbt)
+ return;
+ do_db_tree_foreach_offheap(tdbt->left, func, arg);
+ (*func)(&(tdbt->dbterm.off_heap), arg);
+ do_db_tree_foreach_offheap(tdbt->right, func, arg);
+}
+
+static TreeDbTerm *linkout_tree(DbTableTree *tb,
+ Eterm key)
+{
+ TreeDbTerm **tstack[STACK_NEED];
+ int tpos = 0;
+ int dstack[STACK_NEED+1];
+ int dpos = 0;
+ int state = 0;
+ TreeDbTerm **this = &tb->root;
+ Sint c;
+ int dir;
+ TreeDbTerm *q = NULL;
+
+ /*
+ * Somewhat complicated, deletion in an AVL tree,
+ * The two helpers balance_left and balance_right are used to
+ * keep the balance. As in insert, we do the stacking ourselves.
+ */
+
+ reset_static_stack(tb);
+ dstack[dpos++] = DIR_END;
+ for (;;) {
+ if (!*this) { /* Failure */
+ return NULL;
+ } else if ((c = cmp(key,GETKEY(tb,(*this)->dbterm.tpl))) < 0) {
+ dstack[dpos++] = DIR_LEFT;
+ tstack[tpos++] = this;
+ this = &((*this)->left);
+ } else if (c > 0) { /* go right */
+ dstack[dpos++] = DIR_RIGHT;
+ tstack[tpos++] = this;
+ this = &((*this)->right);
+ } else { /* Equal key, found the one to delete*/
+ q = (*this);
+ if (q->right == NULL) {
+ (*this) = q->left;
+ state = 1;
+ } else if (q->left == NULL) {
+ (*this) = q->right;
+ state = 1;
+ } else {
+ dstack[dpos++] = DIR_LEFT;
+ tstack[tpos++] = this;
+ state = delsub(this);
+ }
+ erts_smp_atomic_dec(&tb->common.nitems);
+ break;
+ }
+ }
+ while (state && ( dir = dstack[--dpos] ) != DIR_END) {
+ this = tstack[--tpos];
+ if (dir == DIR_LEFT) {
+ state = balance_left(this);
+ } else {
+ state = balance_right(this);
+ }
+ }
+ return q;
+}
+
+static TreeDbTerm *linkout_object_tree(DbTableTree *tb,
+ Eterm object)
+{
+ TreeDbTerm **tstack[STACK_NEED];
+ int tpos = 0;
+ int dstack[STACK_NEED+1];
+ int dpos = 0;
+ int state = 0;
+ TreeDbTerm **this = &tb->root;
+ Sint c;
+ int dir;
+ TreeDbTerm *q = NULL;
+ Eterm key;
+
+ /*
+ * Somewhat complicated, deletion in an AVL tree,
+ * The two helpers balance_left and balance_right are used to
+ * keep the balance. As in insert, we do the stacking ourselves.
+ */
+
+
+ key = GETKEY(tb, tuple_val(object));
+
+ reset_static_stack(tb);
+ dstack[dpos++] = DIR_END;
+ for (;;) {
+ if (!*this) { /* Failure */
+ return NULL;
+ } else if ((c = cmp(key,GETKEY(tb,(*this)->dbterm.tpl))) < 0) {
+ dstack[dpos++] = DIR_LEFT;
+ tstack[tpos++] = this;
+ this = &((*this)->left);
+ } else if (c > 0) { /* go right */
+ dstack[dpos++] = DIR_RIGHT;
+ tstack[tpos++] = this;
+ this = &((*this)->right);
+ } else { /* Equal key, found the only possible matching object*/
+ if (!eq(object,make_tuple((*this)->dbterm.tpl))) {
+ return NULL;
+ }
+ q = (*this);
+ if (q->right == NULL) {
+ (*this) = q->left;
+ state = 1;
+ } else if (q->left == NULL) {
+ (*this) = q->right;
+ state = 1;
+ } else {
+ dstack[dpos++] = DIR_LEFT;
+ tstack[tpos++] = this;
+ state = delsub(this);
+ }
+ erts_smp_atomic_dec(&tb->common.nitems);
+ break;
+ }
+ }
+ while (state && ( dir = dstack[--dpos] ) != DIR_END) {
+ this = tstack[--tpos];
+ if (dir == DIR_LEFT) {
+ state = balance_left(this);
+ } else {
+ state = balance_right(this);
+ }
+ }
+ return q;
+}
+
+/*
+** For the select functions, analyzes the pattern and determines which
+** part of the tree should be searched. Also compiles the match program
+*/
+static int analyze_pattern(DbTableTree *tb, Eterm pattern,
+ struct mp_info *mpi)
+{
+ Eterm lst, tpl, ttpl;
+ Eterm *matches,*guards, *bodies;
+ Eterm sbuff[30];
+ Eterm *buff = sbuff;
+ Eterm *ptpl;
+ int i;
+ int num_heads = 0;
+ Eterm key;
+ Eterm partly_bound;
+ int res;
+ Eterm least = 0;
+ Eterm most = 0;
+
+ mpi->some_limitation = 1;
+ mpi->got_partial = 0;
+ mpi->something_can_match = 0;
+ mpi->mp = NULL;
+ mpi->all_objects = 1;
+ mpi->save_term = NULL;
+
+ for (lst = pattern; is_list(lst); lst = CDR(list_val(lst)))
+ ++num_heads;
+
+ if (lst != NIL) {/* proper list... */
+ return DB_ERROR_BADPARAM;
+ }
+ if (num_heads > 10) {
+ buff = erts_alloc(ERTS_ALC_T_DB_TMP, sizeof(Eterm) * num_heads * 3);
+ }
+
+ matches = buff;
+ guards = buff + num_heads;
+ bodies = buff + (num_heads * 2);
+
+ i = 0;
+ for(lst = pattern; is_list(lst); lst = CDR(list_val(lst))) {
+ Eterm body;
+ ttpl = CAR(list_val(lst));
+ if (!is_tuple(ttpl)) {
+ if (buff != sbuff) {
+ erts_free(ERTS_ALC_T_DB_TMP, buff);
+ }
+ return DB_ERROR_BADPARAM;
+ }
+ ptpl = tuple_val(ttpl);
+ if (ptpl[0] != make_arityval(3U)) {
+ if (buff != sbuff) {
+ erts_free(ERTS_ALC_T_DB_TMP, buff);
+ }
+ return DB_ERROR_BADPARAM;
+ }
+ matches[i] = tpl = ptpl[1];
+ guards[i] = ptpl[2];
+ bodies[i] = body = ptpl[3];
+ if (!is_list(body) || CDR(list_val(body)) != NIL ||
+ CAR(list_val(body)) != am_DollarUnderscore) {
+ mpi->all_objects = 0;
+ }
+ ++i;
+
+ partly_bound = NIL;
+ res = key_given(tb, tpl, &mpi->save_term, &partly_bound);
+ if ( res >= 0 ) { /* Can match something */
+ key = 0;
+ mpi->something_can_match = 1;
+ if (res > 0) {
+ key = GETKEY(tb,tuple_val(tpl));
+ } else if (partly_bound != NIL) {
+ mpi->got_partial = 1;
+ key = partly_bound;
+ } else {
+ mpi->some_limitation = 0;
+ }
+ if (key != 0) {
+ if (least == 0 ||
+ partly_bound_can_match_lesser(key,least)) {
+ least = key;
+ }
+ if (most == 0 ||
+ partly_bound_can_match_greater(key,most)) {
+ most = key;
+ }
+ }
+ }
+ }
+ mpi->least = least;
+ mpi->most = most;
+
+ /*
+ * It would be nice not to compile the match_spec if nothing could match,
+ * but then the select calls would not fail like they should on bad
+ * match specs that happen to specify non existent keys etc.
+ */
+ if ((mpi->mp = db_match_compile(matches, guards, bodies,
+ num_heads, DCOMP_TABLE, NULL))
+ == NULL) {
+ if (buff != sbuff) {
+ erts_free(ERTS_ALC_T_DB_TMP, buff);
+ }
+ return DB_ERROR_BADPARAM;
+ }
+ if (buff != sbuff) {
+ erts_free(ERTS_ALC_T_DB_TMP, buff);
+ }
+ return DB_ERROR_NONE;
+}
+
+static void do_dump_tree(int to, void *to_arg, TreeDbTerm *t)
+{
+ if (t != NULL) {
+ do_dump_tree(to, to_arg, t->left);
+ erts_print(to, to_arg, "%T\n", make_tuple(t->dbterm.tpl));
+ do_dump_tree(to, to_arg, t->right);
+ }
+}
+
+static void free_term(DbTableTree *tb, TreeDbTerm* p)
+{
+ db_free_term_data(&(p->dbterm));
+ erts_db_free(ERTS_ALC_T_DB_TERM,
+ (DbTable *) tb,
+ (void *) p,
+ SIZ_DBTERM(p)*sizeof(Uint));
+}
+
+static int do_free_tree_cont(DbTableTree *tb, int num_left)
+{
+ TreeDbTerm *root;
+ TreeDbTerm *p;
+
+ for (;;) {
+ root = POP_NODE(&tb->static_stack);
+ if (root == NULL) break;
+ for (;;) {
+ if ((p = root->left) != NULL) {
+ root->left = NULL;
+ PUSH_NODE(&tb->static_stack, root);
+ root = p;
+ } else if ((p = root->right) != NULL) {
+ root->right = NULL;
+ PUSH_NODE(&tb->static_stack, root);
+ root = p;
+ } else {
+ free_term(tb, root);
+ if (--num_left > 0) {
+ break;
+ } else {
+ return 0; /* Done enough for now */
+ }
+ }
+ }
+ }
+ return 1;
+}
+
+static TreeDbTerm* get_term(DbTableTree *tb,
+ TreeDbTerm* old,
+ Eterm obj)
+{
+ TreeDbTerm* p = db_get_term((DbTableCommon *) tb,
+ (old != NULL) ? &(old->dbterm) : NULL,
+ ((char *) &(old->dbterm)) - ((char *) old),
+ obj);
+ return p;
+}
+
+/*
+ * Deletion helpers
+ */
+static int balance_left(TreeDbTerm **this)
+{
+ TreeDbTerm *p, *p1, *p2;
+ int b1, b2, h = 1;
+
+ p = *this;
+ switch (p->balance) {
+ case -1:
+ p->balance = 0;
+ break;
+ case 0:
+ p->balance = 1;
+ h = 0;
+ break;
+ case 1:
+ p1 = p->right;
+ b1 = p1->balance;
+ if (b1 >= 0) { /* Single RR rotation */
+ p->right = p1->left;
+ p1->left = p;
+ if (b1 == 0) {
+ p->balance = 1;
+ p1->balance = -1;
+ h = 0;
+ } else {
+ p->balance = p1->balance = 0;
+ }
+ (*this) = p1;
+ } else { /* Double RL rotation */
+ p2 = p1->left;
+ b2 = p2->balance;
+ p1->left = p2->right;
+ p2->right = p1;
+ p->right = p2->left;
+ p2->left = p;
+ p->balance = (b2 == 1) ? -1 : 0;
+ p1->balance = (b2 == -1) ? 1 : 0;
+ p2->balance = 0;
+ (*this) = p2;
+ }
+ break;
+ }
+ return h;
+}
+
+static int balance_right(TreeDbTerm **this)
+{
+ TreeDbTerm *p, *p1, *p2;
+ int b1, b2, h = 1;
+
+ p = *this;
+ switch (p->balance) {
+ case 1:
+ p->balance = 0;
+ break;
+ case 0:
+ p->balance = -1;
+ h = 0;
+ break;
+ case -1:
+ p1 = p->left;
+ b1 = p1->balance;
+ if (b1 <= 0) { /* Single LL rotation */
+ p->left = p1->right;
+ p1->right = p;
+ if (b1 == 0) {
+ p->balance = -1;
+ p1->balance = 1;
+ h = 0;
+ } else {
+ p->balance = p1->balance = 0;
+ }
+ (*this) = p1;
+ } else { /* Double LR rotation */
+ p2 = p1->right;
+ b2 = p2->balance;
+ p1->right = p2->left;
+ p2->left = p1;
+ p->left = p2->right;
+ p2->right = p;
+ p->balance = (b2 == -1) ? 1 : 0;
+ p1->balance = (b2 == 1) ? -1 : 0;
+ p2->balance = 0;
+ (*this) = p2;
+ }
+ }
+ return h;
+}
+
+static int delsub(TreeDbTerm **this)
+{
+ TreeDbTerm **tstack[STACK_NEED];
+ int tpos = 0;
+ TreeDbTerm *q = (*this);
+ TreeDbTerm **r = &(q->left);
+ int h;
+
+ /*
+ * Walk down the tree to the right and search
+ * for a void right child, pick that child out
+ * and return it to be put in the deleted
+ * object's place.
+ */
+
+ while ((*r)->right != NULL) {
+ tstack[tpos++] = r;
+ r = &((*r)->right);
+ }
+ *this = *r;
+ *r = (*r)->left;
+ (*this)->left = q->left;
+ (*this)->right = q->right;
+ (*this)->balance = q->balance;
+ tstack[0] = &((*this)->left);
+ h = 1;
+ while (tpos && h) {
+ r = tstack[--tpos];
+ h = balance_right(r);
+ }
+ return h;
+}
+
+/*
+ * Helper for db_slot
+ */
+
+static TreeDbTerm *slot_search(Process *p, DbTableTree *tb, Sint slot)
+{
+ TreeDbTerm *this;
+ TreeDbTerm *tmp;
+ DbTreeStack* stack = get_any_stack(tb);
+ ASSERT(stack != NULL);
+
+ if (slot == 1) { /* Don't search from where we are if we are
+ looking for the first slot */
+ stack->slot = 0;
+ }
+
+ if (stack->slot == 0) { /* clear stack if slot positions
+ are not recorded */
+ stack->pos = 0;
+ }
+ if (EMPTY_NODE(stack)) {
+ this = tb->root;
+ if (this == NULL)
+ goto done;
+ while (this->left != NULL){
+ PUSH_NODE(stack, this);
+ this = this->left;
+ }
+ PUSH_NODE(stack, this);
+ stack->slot = 1;
+ }
+ this = TOP_NODE(stack);
+ while (stack->slot != slot && this != NULL) {
+ if (slot > stack->slot) {
+ if (this->right != NULL) {
+ this = this->right;
+ while (this->left != NULL) {
+ PUSH_NODE(stack, this);
+ this = this->left;
+ }
+ PUSH_NODE(stack, this);
+ } else {
+ for (;;) {
+ tmp = POP_NODE(stack);
+ this = TOP_NODE(stack);
+ if (this == NULL || this->left == tmp)
+ break;
+ }
+ }
+ ++(stack->slot);
+ } else {
+ if (this->left != NULL) {
+ this = this->left;
+ while (this->right != NULL) {
+ PUSH_NODE(stack, this);
+ this = this->right;
+ }
+ PUSH_NODE(stack, this);
+ } else {
+ for (;;) {
+ tmp = POP_NODE(stack);
+ this = TOP_NODE(stack);
+ if (this == NULL || this->right == tmp)
+ break;
+ }
+ }
+ --(stack->slot);
+ }
+ }
+done:
+ release_stack(tb,stack);
+ return this;
+}
+
+/*
+ * Find next and previous in sort order
+ */
+
+static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack* stack, Eterm key)
+{
+ TreeDbTerm *this;
+ TreeDbTerm *tmp;
+ Sint c;
+
+ if(( this = TOP_NODE(stack)) != NULL) {
+ if (!CMP_EQ(GETKEY(tb, this->dbterm.tpl),key)) {
+ /* Start from the beginning */
+ stack->pos = stack->slot = 0;
+ }
+ }
+ if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */
+ if (( this = tb->root ) == NULL)
+ return NULL;
+ for (;;) {
+ PUSH_NODE(stack, this);
+ if (( c = cmp(GETKEY(tb, this->dbterm.tpl),key) ) < 0) {
+ if (this->right == NULL) /* We are at the previos
+ and the element does
+ not exist */
+ break;
+ else
+ this = this->right;
+ } else if (c > 0) {
+ if (this->left == NULL) /* Done */
+ return this;
+ else
+ this = this->left;
+ } else
+ break;
+ }
+ }
+ /* The next element from this... */
+ if (this->right != NULL) {
+ this = this->right;
+ PUSH_NODE(stack,this);
+ while (this->left != NULL) {
+ this = this->left;
+ PUSH_NODE(stack, this);
+ }
+ if (stack->slot > 0)
+ ++(stack->slot);
+ } else {
+ do {
+ tmp = POP_NODE(stack);
+ if (( this = TOP_NODE(stack)) == NULL) {
+ stack->slot = 0;
+ return NULL;
+ }
+ } while (this->right == tmp);
+ if (stack->slot > 0)
+ ++(stack->slot);
+ }
+ return this;
+}
+
+static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack* stack, Eterm key)
+{
+ TreeDbTerm *this;
+ TreeDbTerm *tmp;
+ Sint c;
+
+ if(( this = TOP_NODE(stack)) != NULL) {
+ if (!CMP_EQ(GETKEY(tb, this->dbterm.tpl),key)) {
+ /* Start from the beginning */
+ stack->pos = stack->slot = 0;
+ }
+ }
+ if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */
+ if (( this = tb->root ) == NULL)
+ return NULL;
+ for (;;) {
+ PUSH_NODE(stack, this);
+ if (( c = cmp(GETKEY(tb, this->dbterm.tpl),key) ) > 0) {
+ if (this->left == NULL) /* We are at the next
+ and the element does
+ not exist */
+ break;
+ else
+ this = this->left;
+ } else if (c < 0) {
+ if (this->right == NULL) /* Done */
+ return this;
+ else
+ this = this->right;
+ } else
+ break;
+ }
+ }
+ /* The previous element from this... */
+ if (this->left != NULL) {
+ this = this->left;
+ PUSH_NODE(stack,this);
+ while (this->right != NULL) {
+ this = this->right;
+ PUSH_NODE(stack, this);
+ }
+ if (stack->slot > 0)
+ --(stack->slot);
+ } else {
+ do {
+ tmp = POP_NODE(stack);
+ if (( this = TOP_NODE(stack)) == NULL) {
+ stack->slot = 0;
+ return NULL;
+ }
+ } while (this->left == tmp);
+ if (stack->slot > 0)
+ --(stack->slot);
+ }
+ return this;
+}
+
+static TreeDbTerm *find_next_from_pb_key(DbTableTree *tb, DbTreeStack* stack,
+ Eterm key)
+{
+ TreeDbTerm *this;
+ TreeDbTerm *tmp;
+ Sint c;
+
+ /* spool the stack, we have to "re-search" */
+ stack->pos = stack->slot = 0;
+ if (( this = tb->root ) == NULL)
+ return NULL;
+ for (;;) {
+ PUSH_NODE(stack, this);
+ if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl)) ) >= 0) {
+ if (this->right == NULL) {
+ do {
+ tmp = POP_NODE(stack);
+ if (( this = TOP_NODE(stack)) == NULL) {
+ return NULL;
+ }
+ } while (this->right == tmp);
+ return this;
+ } else
+ this = this->right;
+ } else /*if (c < 0)*/ {
+ if (this->left == NULL) /* Done */
+ return this;
+ else
+ this = this->left;
+ }
+ }
+}
+
+static TreeDbTerm *find_prev_from_pb_key(DbTableTree *tb, DbTreeStack* stack,
+ Eterm key)
+{
+ TreeDbTerm *this;
+ TreeDbTerm *tmp;
+ Sint c;
+
+ /* spool the stack, we have to "re-search" */
+ stack->pos = stack->slot = 0;
+ if (( this = tb->root ) == NULL)
+ return NULL;
+ for (;;) {
+ PUSH_NODE(stack, this);
+ if (( c = cmp_partly_bound(key,GETKEY(tb, this->dbterm.tpl)) ) <= 0) {
+ if (this->left == NULL) {
+ do {
+ tmp = POP_NODE(stack);
+ if (( this = TOP_NODE(stack)) == NULL) {
+ return NULL;
+ }
+ } while (this->left == tmp);
+ return this;
+ } else
+ this = this->left;
+ } else /*if (c < 0)*/ {
+ if (this->right == NULL) /* Done */
+ return this;
+ else
+ this = this->right;
+ }
+ }
+}
+
+
+/*
+ * Just lookup a node
+ */
+static TreeDbTerm *find_node(DbTableTree *tb, Eterm key)
+{
+ TreeDbTerm *this;
+ Sint res;
+ DbTreeStack* stack = get_static_stack(tb);
+
+ if(!stack || EMPTY_NODE(stack)
+ || !CMP_EQ(GETKEY(tb, ( this = TOP_NODE(stack) )->dbterm.tpl), key)) {
+
+ this = tb->root;
+ while (this != NULL &&
+ ( res = cmp(key, GETKEY(tb, this->dbterm.tpl)) ) != 0) {
+ if (res < 0)
+ this = this->left;
+ else
+ this = this->right;
+ }
+ }
+ release_stack(tb,stack);
+ return this;
+}
+
+/*
+ * Lookup a node and return the address of the node pointer in the tree
+ */
+static TreeDbTerm **find_node2(DbTableTree *tb, Eterm key)
+{
+ TreeDbTerm **this;
+ Sint res;
+
+ this = &tb->root;
+ while ((*this) != NULL &&
+ ( res = cmp(key, GETKEY(tb, (*this)->dbterm.tpl)) ) != 0) {
+ if (res < 0)
+ this = &((*this)->left);
+ else
+ this = &((*this)->right);
+ }
+ if (*this == NULL)
+ return NULL;
+ return this;
+}
+
+static int db_lookup_dbterm_tree(DbTable *tbl, Eterm key, DbUpdateHandle* handle)
+{
+ DbTableTree *tb = &tbl->tree;
+ TreeDbTerm **pp = find_node2(tb, key);
+
+ if (pp == NULL) return 0;
+
+ handle->tb = tbl;
+ handle->dbterm = &(*pp)->dbterm;
+ handle->bp = (void**) pp;
+ handle->new_size = (*pp)->dbterm.size;
+ handle->mustResize = 0;
+ return 1;
+}
+
+static void db_finalize_dbterm_tree(DbUpdateHandle* handle)
+{
+ if (handle->mustResize) {
+ Eterm* top;
+ Eterm copy;
+ DbTerm* newDbTerm;
+ DbTableTree *tb = &handle->tb->tree;
+ TreeDbTerm* oldp = (TreeDbTerm*) *handle->bp;
+ TreeDbTerm* newp = erts_db_alloc(ERTS_ALC_T_DB_TERM,
+ handle->tb,
+ sizeof(TreeDbTerm)+sizeof(Eterm)*(handle->new_size-1));
+ memcpy(newp, oldp, sizeof(TreeDbTerm)-sizeof(DbTerm)); /* copy only tree header */
+ *(handle->bp) = newp;
+ reset_static_stack(tb);
+ newDbTerm = &newp->dbterm;
+
+ newDbTerm->size = handle->new_size;
+ newDbTerm->off_heap.mso = NULL;
+ newDbTerm->off_heap.externals = NULL;
+ #ifndef HYBRID /* FIND ME! */
+ newDbTerm->off_heap.funs = NULL;
+ #endif
+ newDbTerm->off_heap.overhead = 0;
+
+ /* make a flat copy */
+ top = DBTERM_BUF(newDbTerm);
+ copy = copy_struct(make_tuple(handle->dbterm->tpl),
+ handle->new_size,
+ &top, &newDbTerm->off_heap);
+ DBTERM_SET_TPL(newDbTerm,tuple_val(copy));
+
+ db_free_term_data(handle->dbterm);
+ erts_db_free(ERTS_ALC_T_DB_TERM,
+ handle->tb,
+ (void *) (((char *) handle->dbterm) - (sizeof(TreeDbTerm) - sizeof(DbTerm))),
+ sizeof(TreeDbTerm) + sizeof(Eterm)*(handle->dbterm->size-1));
+ }
+#ifdef DEBUG
+ handle->dbterm = 0;
+#endif
+ return;
+}
+
+/*
+ * Traverse the tree with a callback function, used by db_match_xxx
+ */
+static void traverse_backwards(DbTableTree *tb,
+ DbTreeStack* stack,
+ Eterm lastkey,
+ int (*doit)(DbTableTree *,
+ TreeDbTerm *,
+ void *,
+ int),
+ void *context)
+{
+ TreeDbTerm *this, *next;
+
+ if (lastkey == NIL) {
+ stack->pos = stack->slot = 0;
+ if (( this = tb->root ) == NULL) {
+ return;
+ }
+ while (this != NULL) {
+ PUSH_NODE(stack, this);
+ this = this->right;
+ }
+ this = TOP_NODE(stack);
+ next = find_prev(tb, stack, GETKEY(tb, this->dbterm.tpl));
+ if (!((*doit)(tb, this, context, 0)))
+ return;
+ } else {
+ next = find_prev(tb, stack, lastkey);
+ }
+
+ while ((this = next) != NULL) {
+ next = find_prev(tb, stack, GETKEY(tb, this->dbterm.tpl));
+ if (!((*doit)(tb, this, context, 0)))
+ return;
+ }
+}
+
+/*
+ * Traverse the tree with a callback function, used by db_match_xxx
+ */
+static void traverse_forward(DbTableTree *tb,
+ DbTreeStack* stack,
+ Eterm lastkey,
+ int (*doit)(DbTableTree *,
+ TreeDbTerm *,
+ void *,
+ int),
+ void *context)
+{
+ TreeDbTerm *this, *next;
+
+ if (lastkey == NIL) {
+ stack->pos = stack->slot = 0;
+ if (( this = tb->root ) == NULL) {
+ return;
+ }
+ while (this != NULL) {
+ PUSH_NODE(stack, this);
+ this = this->left;
+ }
+ this = TOP_NODE(stack);
+ next = find_next(tb, stack, GETKEY(tb, this->dbterm.tpl));
+ if (!((*doit)(tb, this, context, 1)))
+ return;
+ } else {
+ next = find_next(tb, stack, lastkey);
+ }
+
+ while ((this = next) != NULL) {
+ next = find_next(tb, stack, GETKEY(tb, this->dbterm.tpl));
+ if (!((*doit)(tb, this, context, 1)))
+ return;
+ }
+}
+
+/*
+ * Returns 0 if not given 1 if given and -1 on no possible match
+ * if key is given; *ret is set to point to the object concerned.
+ */
+static int key_given(DbTableTree *tb, Eterm pattern, TreeDbTerm **ret,
+ Eterm *partly_bound)
+{
+ TreeDbTerm *this;
+ Eterm key;
+
+ ASSERT(ret != NULL);
+ if (pattern == am_Underscore || db_is_variable(pattern) != -1)
+ return 0;
+ key = db_getkey(tb->common.keypos, pattern);
+ if (is_non_value(key))
+ return -1; /* can't possibly match anything */
+ if (!db_has_variable(key)) { /* Bound key */
+ if (( this = find_node(tb, key) ) == NULL) {
+ return -1;
+ }
+ *ret = this;
+ return 1;
+ } else if (partly_bound != NULL && key != am_Underscore &&
+ db_is_variable(key) < 0)
+ *partly_bound = key;
+
+ return 0;
+}
+
+
+
+static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done)
+{
+ Eterm* aa;
+ Eterm* bb;
+ Eterm a_hdr;
+ Eterm b_hdr;
+ int i;
+ Sint j;
+
+ /* A variable matches anything */
+ if (is_atom(a) && (a == am_Underscore || (db_is_variable(a) >= 0))) {
+ *done = 1;
+ return 0;
+ }
+ if (a == b)
+ return 0;
+
+ switch (a & _TAG_PRIMARY_MASK) {
+ case TAG_PRIMARY_LIST:
+ if (!is_list(b)) {
+ return cmp(a,b);
+ }
+ aa = list_val(a);
+ bb = list_val(b);
+ while (1) {
+ if ((j = do_cmp_partly_bound(*aa++, *bb++, done)) != 0 || *done)
+ return j;
+ if (*aa==*bb)
+ return 0;
+ if (is_not_list(*aa) || is_not_list(*bb))
+ return do_cmp_partly_bound(*aa, *bb, done);
+ aa = list_val(*aa);
+ bb = list_val(*bb);
+ }
+ case TAG_PRIMARY_BOXED:
+ if ((b & _TAG_PRIMARY_MASK) != TAG_PRIMARY_BOXED) {
+ return cmp(a,b);
+ }
+ a_hdr = ((*boxed_val(a)) & _TAG_HEADER_MASK) >> _TAG_PRIMARY_SIZE;
+ b_hdr = ((*boxed_val(b)) & _TAG_HEADER_MASK) >> _TAG_PRIMARY_SIZE;
+ if (a_hdr != b_hdr) {
+ return cmp(a, b);
+ }
+ if (a_hdr == (_TAG_HEADER_ARITYVAL >> _TAG_PRIMARY_SIZE)) {
+ aa = tuple_val(a);
+ bb = tuple_val(b);
+ /* compare the arities */
+ i = arityval(*aa); /* get the arity*/
+ if (i < arityval(*bb)) return(-1);
+ if (i > arityval(*bb)) return(1);
+ while (i--) {
+ if ((j = do_cmp_partly_bound(*++aa, *++bb, done)) != 0
+ || *done)
+ return j;
+ }
+ return 0;
+ }
+ /* Drop through */
+ default:
+ return cmp(a, b);
+ }
+}
+
+static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key)
+{
+ int done = 0;
+ Sint ret = do_cmp_partly_bound(partly_bound_key, bound_key, &done);
+#ifdef HARDDEBUG
+ erts_fprintf(stderr,"\ncmp_partly_bound: %T", partly_bound_key);
+ if (ret < 0)
+ erts_fprintf(stderr," < ");
+ else if (ret > 0)
+ erts_fprintf(stderr," > ");
+ else
+ erts_fprintf(stderr," == ");
+ erts_fprintf(stderr,"%T\n",bound_key);
+#endif
+ return ret;
+}
+
+/*
+** For partly_bound debugging....
+**
+BIF_RETTYPE ets_testnisse_2(BIF_ALIST_2)
+BIF_ADECL_2
+{
+ Eterm r1 = make_small(partly_bound_can_match_lesser(BIF_ARG_1,
+ BIF_ARG_2));
+ Eterm r2 = make_small(partly_bound_can_match_greater(BIF_ARG_1,
+ BIF_ARG_2));
+ Eterm *hp = HAlloc(BIF_P,3);
+ Eterm ret;
+
+ ret = TUPLE2(hp,r1,r2);
+ BIF_RET(ret);
+}
+**
+*/
+static int partly_bound_can_match_lesser(Eterm partly_bound_1,
+ Eterm partly_bound_2)
+{
+ int done = 0;
+ int ret = do_partly_bound_can_match_lesser(partly_bound_1,
+ partly_bound_2,
+ &done);
+#ifdef HARDDEBUG
+ erts_fprintf(stderr,"\npartly_bound_can_match_lesser: %T",partly_bound_1);
+ if (ret)
+ erts_fprintf(stderr," can match lesser than ");
+ else
+ erts_fprintf(stderr," can not match lesser than ");
+ erts_fprintf(stderr,"%T\n",partly_bound_2);
+#endif
+ return ret;
+}
+
+static int partly_bound_can_match_greater(Eterm partly_bound_1,
+ Eterm partly_bound_2)
+{
+ int done = 0;
+ int ret = do_partly_bound_can_match_greater(partly_bound_1,
+ partly_bound_2,
+ &done);
+#ifdef HARDDEBUG
+ erts_fprintf(stderr,"\npartly_bound_can_match_greater: %T",partly_bound_1);
+ if (ret)
+ erts_fprintf(stderr," can match greater than ");
+ else
+ erts_fprintf(stderr," can not match greater than ");
+ erts_fprintf(stderr,"%T\n",partly_bound_2);
+#endif
+ return ret;
+}
+
+static int do_partly_bound_can_match_lesser(Eterm a, Eterm b,
+ int *done)
+{
+ Eterm* aa;
+ Eterm* bb;
+ Sint i;
+ int j;
+
+ if (is_atom(a) && (a == am_Underscore ||
+ (db_is_variable(a) >= 0))) {
+ *done = 1;
+ if (is_atom(b) && (b == am_Underscore ||
+ (db_is_variable(b) >= 0))) {
+ return 0;
+ } else {
+ return 1;
+ }
+ } else if (is_atom(b) && (b == am_Underscore ||
+ (db_is_variable(b) >= 0))) {
+ *done = 1;
+ return 0;
+ }
+
+ if (a == b)
+ return 0;
+
+ if (not_eq_tags(a,b)) {
+ *done = 1;
+ return (cmp(a, b) < 0) ? 1 : 0;
+ }
+
+ /* we now know that tags are the same */
+ switch (tag_val_def(a)) {
+ case TUPLE_DEF:
+ aa = tuple_val(a);
+ bb = tuple_val(b);
+ /* compare the arities */
+ if (arityval(*aa) < arityval(*bb)) return 1;
+ if (arityval(*aa) > arityval(*bb)) return 0;
+ i = arityval(*aa); /* get the arity*/
+ while (i--) {
+ if ((j = do_partly_bound_can_match_lesser(*++aa, *++bb,
+ done)) != 0
+ || *done)
+ return j;
+ }
+ return 0;
+ case LIST_DEF:
+ aa = list_val(a);
+ bb = list_val(b);
+ while (1) {
+ if ((j = do_partly_bound_can_match_lesser(*aa++, *bb++,
+ done)) != 0
+ || *done)
+ return j;
+ if (*aa==*bb)
+ return 0;
+ if (is_not_list(*aa) || is_not_list(*bb))
+ return do_partly_bound_can_match_lesser(*aa, *bb,
+ done);
+ aa = list_val(*aa);
+ bb = list_val(*bb);
+ }
+ default:
+ if((i = cmp(a, b)) != 0) {
+ *done = 1;
+ }
+ return (i < 0) ? 1 : 0;
+ }
+}
+
+static int do_partly_bound_can_match_greater(Eterm a, Eterm b,
+ int *done)
+{
+ Eterm* aa;
+ Eterm* bb;
+ Sint i;
+ int j;
+
+ if (is_atom(a) && (a == am_Underscore ||
+ (db_is_variable(a) >= 0))) {
+ *done = 1;
+ if (is_atom(b) && (b == am_Underscore ||
+ (db_is_variable(b) >= 0))) {
+ return 0;
+ } else {
+ return 1;
+ }
+ } else if (is_atom(b) && (b == am_Underscore ||
+ (db_is_variable(b) >= 0))) {
+ *done = 1;
+ return 0;
+ }
+
+ if (a == b)
+ return 0;
+
+ if (not_eq_tags(a,b)) {
+ *done = 1;
+ return (cmp(a, b) > 0) ? 1 : 0;
+ }
+
+ /* we now know that tags are the same */
+ switch (tag_val_def(a)) {
+ case TUPLE_DEF:
+ aa = tuple_val(a);
+ bb = tuple_val(b);
+ /* compare the arities */
+ if (arityval(*aa) < arityval(*bb)) return 0;
+ if (arityval(*aa) > arityval(*bb)) return 1;
+ i = arityval(*aa); /* get the arity*/
+ while (i--) {
+ if ((j = do_partly_bound_can_match_greater(*++aa, *++bb,
+ done)) != 0
+ || *done)
+ return j;
+ }
+ return 0;
+ case LIST_DEF:
+ aa = list_val(a);
+ bb = list_val(b);
+ while (1) {
+ if ((j = do_partly_bound_can_match_greater(*aa++, *bb++,
+ done)) != 0
+ || *done)
+ return j;
+ if (*aa==*bb)
+ return 0;
+ if (is_not_list(*aa) || is_not_list(*bb))
+ return do_partly_bound_can_match_greater(*aa, *bb,
+ done);
+ aa = list_val(*aa);
+ bb = list_val(*bb);
+ }
+ default:
+ if((i = cmp(a, b)) != 0) {
+ *done = 1;
+ }
+ return (i > 0) ? 1 : 0;
+ }
+}
+
+/*
+ * Callback functions for the different match functions
+ */
+
+static int doit_select(DbTableTree *tb, TreeDbTerm *this, void *ptr,
+ int forward)
+{
+ struct select_context *sc = (struct select_context *) ptr;
+ Eterm ret;
+ Uint32 dummy;
+
+ sc->lastobj = this->dbterm.tpl;
+
+ if (sc->end_condition != NIL &&
+ ((forward &&
+ cmp_partly_bound(sc->end_condition,
+ GETKEY_WITH_POS(sc->keypos,
+ this->dbterm.tpl)) < 0) ||
+ (!forward &&
+ cmp_partly_bound(sc->end_condition,
+ GETKEY_WITH_POS(sc->keypos,
+ this->dbterm.tpl)) > 0))) {
+ return 0;
+ }
+ ret = db_prog_match(sc->p, sc->mp,
+ make_tuple(this->dbterm.tpl),
+ 0, &dummy);
+ if (is_value(ret)) {
+ Uint sz;
+ Eterm *hp;
+ if (sc->all_objects) {
+ hp = HAlloc(sc->p, this->dbterm.size + 2);
+ ret = copy_shallow(DBTERM_BUF(&this->dbterm),
+ this->dbterm.size,
+ &hp,
+ &MSO(sc->p));
+ } else {
+ sz = size_object(ret);
+ hp = HAlloc(sc->p, sz + 2);
+ ret = copy_struct(ret, sz,
+ &hp, &MSO(sc->p));
+ }
+ sc->accum = CONS(hp, ret, sc->accum);
+ }
+ if (MBUF(sc->p)) {
+ /*
+ * Force a trap and GC if a heap fragment was created. Many heap fragments
+ * make the GC slow.
+ */
+ sc->max = 0;
+ }
+ if (--(sc->max) <= 0) {
+ return 0;
+ }
+ return 1;
+}
+
+static int doit_select_count(DbTableTree *tb, TreeDbTerm *this, void *ptr,
+ int forward)
+{
+ struct select_count_context *sc = (struct select_count_context *) ptr;
+ Eterm ret;
+ Uint32 dummy;
+
+ sc->lastobj = this->dbterm.tpl;
+
+ /* Always backwards traversing */
+ if (sc->end_condition != NIL &&
+ (cmp_partly_bound(sc->end_condition,
+ GETKEY_WITH_POS(sc->keypos,
+ this->dbterm.tpl)) > 0)) {
+ return 0;
+ }
+ ret = db_prog_match(sc->p, sc->mp,
+ make_tuple(this->dbterm.tpl),
+ 0, &dummy);
+ if (ret == am_true) {
+ ++(sc->got);
+ }
+ if (--(sc->max) <= 0) {
+ return 0;
+ }
+ return 1;
+}
+
+static int doit_select_chunk(DbTableTree *tb, TreeDbTerm *this, void *ptr,
+ int forward)
+{
+ struct select_context *sc = (struct select_context *) ptr;
+ Eterm ret;
+ Uint32 dummy;
+
+ sc->lastobj = this->dbterm.tpl;
+
+ if (sc->end_condition != NIL &&
+ ((forward &&
+ cmp_partly_bound(sc->end_condition,
+ GETKEY_WITH_POS(sc->keypos,
+ this->dbterm.tpl)) < 0) ||
+ (!forward &&
+ cmp_partly_bound(sc->end_condition,
+ GETKEY_WITH_POS(sc->keypos,
+ this->dbterm.tpl)) > 0))) {
+ return 0;
+ }
+
+ ret = db_prog_match(sc->p, sc->mp,
+ make_tuple(this->dbterm.tpl),
+ 0, &dummy);
+ if (is_value(ret)) {
+ Uint sz;
+ Eterm *hp;
+
+ ++(sc->got);
+ if (sc->all_objects) {
+ hp = HAlloc(sc->p, this->dbterm.size + 2);
+ ret = copy_shallow(DBTERM_BUF(&this->dbterm),
+ this->dbterm.size,
+ &hp,
+ &MSO(sc->p));
+ } else {
+ sz = size_object(ret);
+ hp = HAlloc(sc->p, sz + 2);
+ ret = copy_struct(ret, sz, &hp, &MSO(sc->p));
+ }
+ sc->accum = CONS(hp, ret, sc->accum);
+ }
+ if (MBUF(sc->p)) {
+ /*
+ * Force a trap and GC if a heap fragment was created. Many heap fragments
+ * make the GC slow.
+ */
+ sc->max = 0;
+ }
+ if (--(sc->max) <= 0 || sc->got == sc->chunk_size) {
+ return 0;
+ }
+ return 1;
+}
+
+
+static int doit_select_delete(DbTableTree *tb, TreeDbTerm *this, void *ptr,
+ int forward)
+{
+ struct select_delete_context *sc = (struct select_delete_context *) ptr;
+ Eterm ret;
+ Uint32 dummy;
+ Eterm key;
+
+ if (sc->erase_lastterm)
+ free_term(tb, sc->lastterm);
+ sc->erase_lastterm = 0;
+ sc->lastterm = this;
+
+ if (sc->end_condition != NIL &&
+ cmp_partly_bound(sc->end_condition,
+ GETKEY_WITH_POS(sc->keypos,
+ this->dbterm.tpl)) > 0)
+ return 0;
+ ret = db_prog_match(sc->p, sc->mp,
+ make_tuple(this->dbterm.tpl),
+ 0, &dummy);
+ if (ret == am_true) {
+ key = GETKEY(sc->tb, this->dbterm.tpl);
+ linkout_tree(sc->tb, key);
+ sc->erase_lastterm = 1;
+ ++sc->accum;
+ }
+ if (--(sc->max) <= 0) {
+ return 0;
+ }
+ return 1;
+}
+
+#ifdef TREE_DEBUG
+static void do_dump_tree2(int to, void *to_arg, int show, TreeDbTerm *t,
+ int offset)
+{
+ if (t == NULL)
+ return 0;
+ do_dump_tree2(to, to_arg, show, t->right, offset + 4);
+ if (show) {
+ erts_print(to, to_arg, "%*s%T (addr = %p, bal = %d)\n"
+ offset, "", make_tuple(t->dbterm.tpl),
+ t, t->balance);
+ }
+ do_dump_tree2(to, to_arg, show, t->left, offset + 4);
+ return sum;
+}
+
+#endif
+
+#ifdef HARDDEBUG
+
+void db_check_table_tree(DbTable *tbl)
+{
+ DbTableTree *tb = &tbl->tree;
+ check_table_tree(tb->root);
+ check_saved_stack(tb);
+ check_slot_pos(tb);
+}
+
+static TreeDbTerm *traverse_until(TreeDbTerm *t, int *current, int to)
+{
+ TreeDbTerm *tmp;
+ if (t == NULL)
+ return NULL;
+ tmp = traverse_until(t->left, current, to);
+ if (tmp != NULL)
+ return tmp;
+ ++(*current);
+ if (*current == to)
+ return t;
+ return traverse_until(t->right, current, to);
+}
+
+static void check_slot_pos(DbTableTree *tb)
+{
+ int pos = 0;
+ TreeDbTerm *t;
+ if (tb->stack.slot == 0 || tb->stack.pos == 0)
+ return;
+ t = traverse_until(tb->root, &pos, tb->stack.slot);
+ if (t != tb->stack.array[tb->stack.pos - 1]) {
+ erts_fprintf(stderr, "Slot position does not correspont with stack, "
+ "element position %d is really 0x%08X, when stack says "
+ "it's 0x%08X\n", tb->stack.slot, t,
+ tb->stack.array[tb->stack.pos - 1]);
+ do_dump_tree2(ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
+ }
+}
+
+
+static void check_saved_stack(DbTableTree *tb)
+{
+ TreeDbTerm *t = tb->root;
+ DbTreeStack* stack = &tb->static_stack;
+ int n = 0;
+ if (stack->pos == 0)
+ return;
+ if (t != stack->array[0]) {
+ erts_fprintf(stderr,"tb->stack[0] is 0x%08X, should be 0x%08X\n",
+ stack->array[0], t);
+ do_dump_tree2(ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
+ return;
+ }
+ while (n < stack->pos) {
+ if (t == NULL) {
+ erts_fprintf(stderr, "NULL pointer in tree when stack not empty,"
+ " stack depth is %d\n", n);
+ do_dump_tree2(ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
+ return;
+ }
+ n++;
+ if (n < stack->pos) {
+ if (stack->array[n] == t->left)
+ t = t->left;
+ else if (stack->array[n] == t->right)
+ t = t->right;
+ else {
+ erts_fprintf(stderr, "tb->stack[%d] == 0x%08X does not "
+ "represent child pointer in tree!"
+ "(left == 0x%08X, right == 0x%08X\n",
+ n, tb->stack[n], t->left, t->right);
+ do_dump_tree2(ERTS_PRINT_STDERR, NULL, 1, tb->root, 0);
+ return;
+ }
+ }
+ }
+}
+
+static int check_table_tree(TreeDbTerm *t)
+{
+ int lh, rh;
+ if (t == NULL)
+ return 0;
+ lh = check_table_tree(t->left);
+ rh = check_table_tree(t->right);
+ if ((rh - lh) != t->balance) {
+ erts_fprintf(stderr, "Invalid tree balance for this node:\n");
+ erts_fprintf(stderr,"balance = %d, left = 0x%08X, right = 0x%08X\n"
+ "data = %T",
+ t->balance, t->left, t->right,
+ make_tuple(t->dbterm.tpl));
+ erts_fprintf(stderr,"\nDump:\n---------------------------------\n");
+ do_dump_tree2(ERTS_PRINT_STDERR, NULL, 1, t, 0);
+ erts_fprintf(stderr,"\n---------------------------------\n");
+ }
+ return ((rh > lh) ? rh : lh) + 1;
+}
+
+#endif