diff options
Diffstat (limited to 'erts/emulator/beam/erl_db_tree.c')
-rw-r--r-- | erts/emulator/beam/erl_db_tree.c | 567 |
1 files changed, 479 insertions, 88 deletions
diff --git a/erts/emulator/beam/erl_db_tree.c b/erts/emulator/beam/erl_db_tree.c index 02d211a4bb..d7deadacf0 100644 --- a/erts/emulator/beam/erl_db_tree.c +++ b/erts/emulator/beam/erl_db_tree.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 1998-2016. All Rights Reserved. + * Copyright Ericsson AB 1998-2017. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -76,9 +76,18 @@ ((Dtt->pos) ? \ (Dtt)->array[(Dtt)->pos - 1] : NULL) -#define EMPTY_NODE(Dtt) (TOP_NODE(Dtt) == NULL) +#define TOPN_NODE(Dtt, Pos) \ + (((Pos) < Dtt->pos) ? \ + (Dtt)->array[(Dtt)->pos - ((Pos) + 1)] : NULL) + +#define REPLACE_TOP_NODE(Dtt, Node) \ + if ((Dtt)->pos) (Dtt)->array[(Dtt)->pos - 1] = (Node) +#define EMPTY_NODE(Dtt) (TOP_NODE(Dtt) == NULL) +#ifndef MIN +#define MIN(X, Y) ((X) < (Y) ? (X) : (Y)) +#endif /* Obtain table static stack if available. NULL if not. ** Must be released with release_stack() @@ -180,7 +189,6 @@ static ERTS_INLINE TreeDbTerm* replace_dbterm(DbTableTree *tb, TreeDbTerm* old, static TreeDbTerm *traverse_until(TreeDbTerm *t, int *current, int to); static void check_slot_pos(DbTableTree *tb); static void check_saved_stack(DbTableTree *tb); -static int check_table_tree(DbTableTree* tb, TreeDbTerm *t); #define TREE_DEBUG #endif @@ -226,9 +234,9 @@ struct mp_info { Eterm most; /* The highest matching key (possibly * partially bound expression) */ - TreeDbTerm *save_term; /* If the key is completely bound, this - * will be the Tree node we're searching - * for, otherwise it will be useless */ + TreeDbTerm **save_term; /* If the key is completely bound, this + * will be the Tree node we're searching + * for, otherwise it will be useless */ Binary *mp; /* The compiled match program */ }; @@ -278,12 +286,30 @@ struct select_delete_context { }; /* + * Used by doit_select_replace + */ +struct select_replace_context { + Process *p; + DbTableTree *tb; + Binary *mp; + Eterm end_condition; + Eterm *lastobj; + Sint32 max; + int keypos; + int all_objects; + Sint replaced; +}; + +/* Used by select_replace on analyze_pattern */ +typedef int (*extra_match_validator_t)(int keypos, Eterm match, Eterm guard, Eterm body); + +/* ** Forward declarations */ static TreeDbTerm *linkout_tree(DbTableTree *tb, Eterm key); static TreeDbTerm *linkout_object_tree(DbTableTree *tb, Eterm object); -static int do_free_tree_cont(DbTableTree *tb, int num_left); +static SWord do_free_tree_continue(DbTableTree *tb, SWord reds); static void free_term(DbTableTree *tb, TreeDbTerm* p); static int balance_left(TreeDbTerm **this); static int balance_right(TreeDbTerm **this); @@ -291,6 +317,7 @@ static int delsub(TreeDbTerm **this); static TreeDbTerm *slot_search(Process *p, DbTableTree *tb, Sint slot); static TreeDbTerm *find_node(DbTableTree *tb, Eterm key); static TreeDbTerm **find_node2(DbTableTree *tb, Eterm key); +static TreeDbTerm **find_ptr(DbTableTree *tb, DbTreeStack*, TreeDbTerm *this); static TreeDbTerm *find_next(DbTableTree *tb, DbTreeStack*, Eterm key); static TreeDbTerm *find_prev(DbTableTree *tb, DbTreeStack*, Eterm key); static TreeDbTerm *find_next_from_pb_key(DbTableTree *tb, DbTreeStack*, @@ -312,14 +339,23 @@ static void traverse_forward(DbTableTree *tb, TreeDbTerm *, void *, int), - void *context); -static int key_given(DbTableTree *tb, Eterm pattern, TreeDbTerm **ret, + void *context); +static void traverse_update_backwards(DbTableTree *tb, + DbTreeStack*, + Eterm lastkey, + int (*doit)(DbTableTree *tb, + TreeDbTerm **, // out + void *, + int), + void *context); +static int key_given(DbTableTree *tb, Eterm pattern, TreeDbTerm ***ret, Eterm *partly_bound_key); static Sint cmp_partly_bound(Eterm partly_bound_key, Eterm bound_key); static Sint do_cmp_partly_bound(Eterm a, Eterm b, int *done); static int analyze_pattern(DbTableTree *tb, Eterm pattern, - struct mp_info *mpi); + extra_match_validator_t extra_validator, /* Optional callback */ + struct mp_info *mpi); static int doit_select(DbTableTree *tb, TreeDbTerm *this, void *ptr, @@ -336,6 +372,10 @@ static int doit_select_delete(DbTableTree *tb, TreeDbTerm *this, void *ptr, int forward); +static int doit_select_replace(DbTableTree *tb, + TreeDbTerm **this_ptr, + void *ptr, + int forward); static int partly_bound_can_match_lesser(Eterm partly_bound_1, Eterm partly_bound_2); @@ -369,27 +409,31 @@ static int db_erase_tree(DbTable *tbl, Eterm key, Eterm *ret); static int db_erase_object_tree(DbTable *tbl, Eterm object,Eterm *ret); static int db_slot_tree(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret); -static int db_select_tree(Process *p, DbTable *tbl, +static int db_select_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, int reversed, Eterm *ret); -static int db_select_count_tree(Process *p, DbTable *tbl, +static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret); -static int db_select_chunk_tree(Process *p, DbTable *tbl, +static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, int reversed, Eterm *ret); static int db_select_continue_tree(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret); static int db_select_count_continue_tree(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret); -static int db_select_delete_tree(Process *p, DbTable *tbl, +static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret); static int db_select_delete_continue_tree(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret); +static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret); +static int db_select_replace_continue_tree(Process *p, DbTable *tbl, + Eterm continuation, Eterm *ret); static int db_take_tree(Process *, DbTable *, Eterm, Eterm *); -static void db_print_tree(int to, void *to_arg, +static void db_print_tree(fmtfn_t to, void *to_arg, int show, DbTable *tbl); static int db_free_table_tree(DbTable *tbl); -static int db_free_table_continue_tree(DbTable *tbl); +static SWord db_free_table_continue_tree(DbTable *tbl, SWord); static void db_foreach_offheap_tree(DbTable *, void (*)(ErlOffHeap *, void *), @@ -436,17 +480,14 @@ DbTableMethod db_tree = db_select_delete_continue_tree, db_select_count_tree, db_select_count_continue_tree, + db_select_replace_tree, + db_select_replace_continue_tree, db_take_tree, db_delete_all_objects_tree, db_free_table_tree, db_free_table_continue_tree, db_print_tree, db_foreach_offheap_tree, -#ifdef HARDDEBUG - db_check_table_tree, -#else - NULL, -#endif db_lookup_dbterm_tree, db_finalize_dbterm_tree @@ -935,17 +976,15 @@ static int db_select_continue_tree(Process *p, if (arityval(*tptr) != 8) RET_TO_BIF(NIL,DB_ERROR_BADPARAM); - if (!is_small(tptr[4]) || !is_binary(tptr[5]) || + if (!is_small(tptr[4]) || !(is_list(tptr[6]) || tptr[6] == NIL) || !is_small(tptr[7]) || !is_small(tptr[8])) RET_TO_BIF(NIL,DB_ERROR_BADPARAM); lastkey = tptr[2]; end_condition = tptr[3]; - if (!(thing_subtag(*binary_val(tptr[5])) == REFC_BINARY_SUBTAG)) - RET_TO_BIF(NIL,DB_ERROR_BADPARAM); - mp = ((ProcBin *) binary_val(tptr[5]))->val; - if (!IsMatchProgBinary(mp)) + mp = erts_db_get_match_prog_binary(tptr[5]); + if (!mp) RET_TO_BIF(NIL,DB_ERROR_BADPARAM); chunk_size = signed_val(tptr[4]); @@ -956,7 +995,7 @@ static int db_select_continue_tree(Process *p, sc.lastobj = NULL; sc.max = 1000; sc.keypos = tb->common.keypos; - sc.all_objects = mp->flags & BIN_FLAG_ALL_OBJECTS; + sc.all_objects = mp->intern.flags & BIN_FLAG_ALL_OBJECTS; sc.chunk_size = chunk_size; reverse = unsigned_val(tptr[7]); sc.got = signed_val(tptr[8]); @@ -1060,7 +1099,7 @@ static int db_select_continue_tree(Process *p, } -static int db_select_tree(Process *p, DbTable *tbl, +static int db_select_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, int reverse, Eterm *ret) { /* Strategy: Traverse backwards to build resulting list from tail to head */ @@ -1097,7 +1136,7 @@ static int db_select_tree(Process *p, DbTable *tbl, sc.got = 0; sc.chunk_size = 0; - if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } @@ -1111,7 +1150,7 @@ static int db_select_tree(Process *p, DbTable *tbl, if (!mpi.got_partial && mpi.some_limitation && CMP_EQ(mpi.least,mpi.most)) { - doit_select(tb,mpi.save_term,&sc,0 /* direction doesn't matter */); + doit_select(tb,*(mpi.save_term),&sc,0 /* direction doesn't matter */); RET_TO_BIF(sc.accum,DB_ERROR_NONE); } @@ -1145,15 +1184,15 @@ static int db_select_tree(Process *p, DbTable *tbl, key = GETKEY(tb, sc.lastobj); sz = size_object(key); - hp = HAlloc(p, 9 + sz + PROC_BIN_SIZE); + hp = HAlloc(p, 9 + sz + ERTS_MAGIC_REF_THING_SIZE); key = copy_struct(key, sz, &hp, &MSO(p)); if (mpi.all_objects) - (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS; - mpb=db_make_mp_binary(p,mpi.mp,&hp); + (mpi.mp)->intern.flags |= BIN_FLAG_ALL_OBJECTS; + mpb= erts_db_make_match_prog_ref(p,mpi.mp,&hp); continuation = TUPLE8 (hp, - tb->common.id, + tid, key, sc.end_condition, /* From the match program, needn't be copied */ make_small(0), /* Chunk size of zero means not chunked to the @@ -1208,10 +1247,8 @@ static int db_select_count_continue_tree(Process *p, lastkey = tptr[2]; end_condition = tptr[3]; - if (!(thing_subtag(*binary_val(tptr[4])) == REFC_BINARY_SUBTAG)) - RET_TO_BIF(NIL,DB_ERROR_BADPARAM); - mp = ((ProcBin *) binary_val(tptr[4]))->val; - if (!IsMatchProgBinary(mp)) + mp = erts_db_get_match_prog_binary(tptr[4]); + if (!mp) RET_TO_BIF(NIL,DB_ERROR_BADPARAM); sc.p = p; @@ -1267,7 +1304,7 @@ static int db_select_count_continue_tree(Process *p, } -static int db_select_count_tree(Process *p, DbTable *tbl, +static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { DbTableTree *tb = &tbl->tree; @@ -1302,7 +1339,7 @@ static int db_select_count_tree(Process *p, DbTable *tbl, sc.keypos = tb->common.keypos; sc.got = 0; - if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } @@ -1316,7 +1353,7 @@ static int db_select_count_tree(Process *p, DbTable *tbl, if (!mpi.got_partial && mpi.some_limitation && CMP_EQ(mpi.least,mpi.most)) { - doit_select_count(tb,mpi.save_term,&sc,0 /* dummy */); + doit_select_count(tb,*(mpi.save_term),&sc,0 /* dummy */); RET_TO_BIF(erts_make_integer(sc.got,p),DB_ERROR_NONE); } @@ -1338,22 +1375,22 @@ static int db_select_count_tree(Process *p, DbTable *tbl, key = GETKEY(tb, sc.lastobj); sz = size_object(key); if (IS_USMALL(0, sc.got)) { - hp = HAlloc(p, sz + PROC_BIN_SIZE + 6); + hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6); egot = make_small(sc.got); } else { - hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + PROC_BIN_SIZE + 6); + hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + ERTS_MAGIC_REF_THING_SIZE + 6); egot = uint_to_big(sc.got, hp); hp += BIG_UINT_HEAP_SIZE; } key = copy_struct(key, sz, &hp, &MSO(p)); if (mpi.all_objects) - (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS; - mpb = db_make_mp_binary(p,mpi.mp,&hp); + (mpi.mp)->intern.flags |= BIN_FLAG_ALL_OBJECTS; + mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp); continuation = TUPLE5 (hp, - tb->common.id, + tid, key, sc.end_condition, /* From the match program, needn't be copied */ mpb, @@ -1367,7 +1404,7 @@ static int db_select_count_tree(Process *p, DbTable *tbl, } -static int db_select_chunk_tree(Process *p, DbTable *tbl, +static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, int reverse, Eterm *ret) @@ -1405,7 +1442,7 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, sc.got = 0; sc.chunk_size = chunk_size; - if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } @@ -1419,7 +1456,7 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, if (!mpi.got_partial && mpi.some_limitation && CMP_EQ(mpi.least,mpi.most)) { - doit_select(tb,mpi.save_term,&sc, 0 /* direction doesn't matter */); + doit_select(tb,*(mpi.save_term),&sc, 0 /* direction doesn't matter */); if (sc.accum != NIL) { hp=HAlloc(p, 3); RET_TO_BIF(TUPLE2(hp,sc.accum,am_EOT),DB_ERROR_NONE); @@ -1470,15 +1507,15 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, key = GETKEY(tb, sc.lastobj); sz = size_object(key); - hp = HAlloc(p, 9 + sz + PROC_BIN_SIZE); + hp = HAlloc(p, 9 + sz + ERTS_MAGIC_REF_THING_SIZE); key = copy_struct(key, sz, &hp, &MSO(p)); if (mpi.all_objects) - (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS; - mpb = db_make_mp_binary(p,mpi.mp,&hp); + (mpi.mp)->intern.flags |= BIN_FLAG_ALL_OBJECTS; + mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp); continuation = TUPLE8 (hp, - tb->common.id, + tid, key, sc.end_condition, /* From the match program, needn't be copied */ @@ -1495,15 +1532,15 @@ static int db_select_chunk_tree(Process *p, DbTable *tbl, key = GETKEY(tb, sc.lastobj); sz = size_object(key); - hp = HAlloc(p, 9 + sz + PROC_BIN_SIZE); + hp = HAlloc(p, 9 + sz + ERTS_MAGIC_REF_THING_SIZE); key = copy_struct(key, sz, &hp, &MSO(p)); if (mpi.all_objects) - (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS; - mpb = db_make_mp_binary(p,mpi.mp,&hp); + (mpi.mp)->intern.flags |= BIN_FLAG_ALL_OBJECTS; + mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp); continuation = TUPLE8 (hp, - tb->common.id, + tid, key, sc.end_condition, /* From the match program, needn't be copied */ make_small(chunk_size), @@ -1558,7 +1595,7 @@ static int db_select_delete_continue_tree(Process *p, sc.erase_lastterm = 0; /* Before first RET_TO_BIF */ sc.lastterm = NULL; - mp = ((ProcBin *) binary_val(tptr[4]))->val; + mp = erts_db_get_match_prog_binary_unchecked(tptr[4]); sc.p = p; sc.tb = tb; if (is_big(tptr[5])) { @@ -1609,7 +1646,7 @@ static int db_select_delete_continue_tree(Process *p, #undef RET_TO_BIF } -static int db_select_delete_tree(Process *p, DbTable *tbl, +static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { DbTableTree *tb = &tbl->tree; @@ -1647,7 +1684,7 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, sc.keypos = tb->common.keypos; sc.tb = tb; - if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) { + if ((errcode = analyze_pattern(tb, pattern, NULL, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(0,errcode); } @@ -1660,7 +1697,7 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, if (!mpi.got_partial && mpi.some_limitation && CMP_EQ(mpi.least,mpi.most)) { - doit_select_delete(tb,mpi.save_term,&sc, 0 /* direction doesn't + doit_select_delete(tb,*(mpi.save_term),&sc, 0 /* direction doesn't matter */); RET_TO_BIF(erts_make_integer(sc.accum,p),DB_ERROR_NONE); } @@ -1682,20 +1719,20 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, key = GETKEY(tb, (sc.lastterm)->dbterm.tpl); sz = size_object(key); if (IS_USMALL(0, sc.accum)) { - hp = HAlloc(p, sz + PROC_BIN_SIZE + 6); + hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6); eaccsum = make_small(sc.accum); } else { - hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + PROC_BIN_SIZE + 6); + hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + ERTS_MAGIC_REF_THING_SIZE + 6); eaccsum = uint_to_big(sc.accum, hp); hp += BIG_UINT_HEAP_SIZE; } key = copy_struct(key, sz, &hp, &MSO(p)); - mpb = db_make_mp_binary(p,mpi.mp,&hp); + mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp); continuation = TUPLE5 (hp, - tb->common.id, + tid, key, sc.end_condition, /* From the match program, needn't be copied */ mpb, @@ -1712,6 +1749,208 @@ static int db_select_delete_tree(Process *p, DbTable *tbl, } +static int db_select_replace_continue_tree(Process *p, + DbTable *tbl, + Eterm continuation, + Eterm *ret) +{ + DbTableTree *tb = &tbl->tree; + DbTreeStack* stack; + struct select_replace_context sc; + unsigned sz; + Eterm *hp; + Eterm lastkey; + Eterm end_condition; + Binary *mp; + Eterm key; + Eterm *tptr; + Eterm ereplaced; + Sint prev_replaced; + + +#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0); + + /* Decode continuation. We know it's a tuple and everything else as + this is only called by ourselves */ + + /* continuation: + {Table, Lastkey, EndCondition, MatchProgBin, HowManyReplaced}*/ + + tptr = tuple_val(continuation); + + if (arityval(*tptr) != 5) + erts_exit(ERTS_ERROR_EXIT,"Internal error in ets:select_replace/1"); + + lastkey = tptr[2]; + end_condition = tptr[3]; + mp = erts_db_get_match_prog_binary_unchecked(tptr[4]); + + sc.p = p; + sc.mp = mp; + sc.end_condition = NIL; + sc.lastobj = NULL; + sc.max = 1000; + sc.keypos = tb->common.keypos; + if (is_big(tptr[5])) { + sc.replaced = big_to_uint32(tptr[5]); + } else { + sc.replaced = unsigned_val(tptr[5]); + } + prev_replaced = sc.replaced; + + stack = get_any_stack(tb); + traverse_update_backwards(tb, stack, lastkey, &doit_select_replace, &sc); + release_stack(tb,stack); + + // the more objects we've replaced, the more reductions we've consumed + BUMP_REDS(p, MIN(2000, (1000 - sc.max) + (sc.replaced - prev_replaced))); + + if (sc.max > 0) { + RET_TO_BIF(erts_make_integer(sc.replaced,p), DB_ERROR_NONE); + } + key = GETKEY(tb, sc.lastobj); + if (end_condition != NIL && + (cmp_partly_bound(end_condition,key) > 0)) { + /* done anyway */ + RET_TO_BIF(make_small(sc.replaced),DB_ERROR_NONE); + } + /* Not done yet, let's trap. */ + sz = size_object(key); + if (IS_USMALL(0, sc.replaced)) { + hp = HAlloc(p, sz + 6); + ereplaced = make_small(sc.replaced); + } + else { + hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + 6); + ereplaced = uint_to_big(sc.replaced, hp); + hp += BIG_UINT_HEAP_SIZE; + } + key = copy_struct(key, sz, &hp, &MSO(p)); + continuation = TUPLE5 + (hp, + tptr[1], + key, + tptr[3], + tptr[4], + ereplaced); + RET_TO_BIF(bif_trap1(&ets_select_replace_continue_exp, p, continuation), + DB_ERROR_NONE); + +#undef RET_TO_BIF +} + +static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret) +{ + DbTableTree *tb = &tbl->tree; + DbTreeStack* stack; + struct select_replace_context sc; + struct mp_info mpi; + Eterm lastkey = THE_NON_VALUE; + Eterm key; + Eterm continuation; + unsigned sz; + Eterm *hp; + TreeDbTerm *this; + int errcode; + Eterm ereplaced; + Eterm mpb; + + +#define RET_TO_BIF(Term,RetVal) do { \ + if (mpi.mp != NULL) { \ + erts_bin_free(mpi.mp); \ + } \ + *ret = (Term); \ + return RetVal; \ + } while(0) + + mpi.mp = NULL; + + sc.lastobj = NULL; + sc.p = p; + sc.tb = tb; + sc.max = 1000; + sc.end_condition = NIL; + sc.keypos = tb->common.keypos; + sc.replaced = 0; + + if ((errcode = analyze_pattern(tb, pattern, db_match_keeps_key, &mpi)) != DB_ERROR_NONE) { + RET_TO_BIF(NIL,errcode); + } + + if (!mpi.something_can_match) { + RET_TO_BIF(make_small(0),DB_ERROR_NONE); + /* can't possibly match anything */ + } + + sc.mp = mpi.mp; + sc.all_objects = mpi.all_objects; + + stack = get_static_stack(tb); + if (!mpi.got_partial && mpi.some_limitation && + CMP_EQ(mpi.least,mpi.most)) { + TreeDbTerm* term = *(mpi.save_term); + doit_select_replace(tb,mpi.save_term,&sc,0 /* dummy */); + if (stack != NULL) { + if (TOP_NODE(stack) == term) + // throw away potentially invalid reference + REPLACE_TOP_NODE(stack, *(mpi.save_term)); + release_stack(tb, stack); + } + RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE); + } + + if (stack == NULL) + stack = get_any_stack(tb); + + if (mpi.some_limitation) { + if ((this = find_next_from_pb_key(tb, stack, mpi.most)) != NULL) { + lastkey = GETKEY(tb, this->dbterm.tpl); + } + sc.end_condition = mpi.least; + } + + traverse_update_backwards(tb, stack, lastkey, &doit_select_replace, &sc); + release_stack(tb,stack); + // the more objects we've replaced, the more reductions we've consumed + BUMP_REDS(p, MIN(2000, (1000 - sc.max) + sc.replaced)); + if (sc.max > 0) { + RET_TO_BIF(erts_make_integer(sc.replaced,p),DB_ERROR_NONE); + } + + key = GETKEY(tb, sc.lastobj); + sz = size_object(key); + if (IS_USMALL(0, sc.replaced)) { + hp = HAlloc(p, sz + ERTS_MAGIC_REF_THING_SIZE + 6); + ereplaced = make_small(sc.replaced); + } + else { + hp = HAlloc(p, BIG_UINT_HEAP_SIZE + sz + ERTS_MAGIC_REF_THING_SIZE + 6); + ereplaced = uint_to_big(sc.replaced, hp); + hp += BIG_UINT_HEAP_SIZE; + } + key = copy_struct(key, sz, &hp, &MSO(p)); + if (mpi.all_objects) + (mpi.mp)->intern.flags |= BIN_FLAG_ALL_OBJECTS; + mpb = erts_db_make_match_prog_ref(p,mpi.mp,&hp); + + continuation = TUPLE5 + (hp, + tid, + key, + sc.end_condition, /* From the match program, needn't be copied */ + mpb, + ereplaced); + + /* Don't free mpi.mp, so don't use macro */ + *ret = bif_trap1(&ets_select_replace_continue_exp, p, continuation); + return DB_ERROR_NONE; + +#undef RET_TO_BIF + +} + static int db_take_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) { DbTableTree *tb = &tbl->tree; @@ -1740,7 +1979,7 @@ static int db_take_tree(Process *p, DbTable *tbl, Eterm key, Eterm *ret) /* Display tree contents (for dump) */ -static void db_print_tree(int to, void *to_arg, +static void db_print_tree(fmtfn_t to, void *to_arg, int show, DbTable *tbl) { @@ -1761,23 +2000,22 @@ static void db_print_tree(int to, void *to_arg, /* release all memory occupied by a single table */ static int db_free_table_tree(DbTable *tbl) { - while (!db_free_table_continue_tree(tbl)) + while (db_free_table_continue_tree(tbl, ERTS_SWORD_MAX) < 0) ; return 1; } -static int db_free_table_continue_tree(DbTable *tbl) +static SWord db_free_table_continue_tree(DbTable *tbl, SWord reds) { DbTableTree *tb = &tbl->tree; - int result; if (!tb->deletion) { tb->static_stack.pos = 0; tb->deletion = 1; PUSH_NODE(&tb->static_stack, tb->root); } - result = do_free_tree_cont(tb, DELETE_RECORD_LIMIT); - if (result) { /* Completely done. */ + reds = do_free_tree_continue(tb, reds); + if (reds >= 0) { /* Completely done. */ erts_db_free(ERTS_ALC_T_DB_STK, (DbTable *) tb, (void *) tb->static_stack.array, @@ -1785,7 +2023,7 @@ static int db_free_table_continue_tree(DbTable *tbl) ASSERT(erts_smp_atomic_read_nob(&tb->common.memory_size) == sizeof(DbTable)); } - return result; + return reds; } static int db_delete_all_objects_tree(Process* p, DbTable* tbl) @@ -1958,8 +2196,9 @@ static TreeDbTerm *linkout_object_tree(DbTableTree *tb, ** For the select functions, analyzes the pattern and determines which ** part of the tree should be searched. Also compiles the match program */ -static int analyze_pattern(DbTableTree *tb, Eterm pattern, - struct mp_info *mpi) +static int analyze_pattern(DbTableTree *tb, Eterm pattern, + extra_match_validator_t extra_validator, /* Optional callback */ + struct mp_info *mpi) { Eterm lst, tpl, ttpl; Eterm *matches,*guards, *bodies; @@ -1997,7 +2236,10 @@ static int analyze_pattern(DbTableTree *tb, Eterm pattern, i = 0; for(lst = pattern; is_list(lst); lst = CDR(list_val(lst))) { - Eterm body; + Eterm match; + Eterm guard; + Eterm body; + ttpl = CAR(list_val(lst)); if (!is_tuple(ttpl)) { if (buff != sbuff) { @@ -2012,9 +2254,17 @@ static int analyze_pattern(DbTableTree *tb, Eterm pattern, } return DB_ERROR_BADPARAM; } - matches[i] = tpl = ptpl[1]; - guards[i] = ptpl[2]; + matches[i] = match = tpl = ptpl[1]; + guards[i] = guard = ptpl[2]; bodies[i] = body = ptpl[3]; + + if(extra_validator != NULL && !extra_validator(tb->common.keypos, match, guard, body)) { + if (buff != sbuff) { + erts_free(ERTS_ALC_T_DB_TMP, buff); + } + return DB_ERROR_BADPARAM; + } + if (!is_list(body) || CDR(list_val(body)) != NIL || CAR(list_val(body)) != am_DollarUnderscore) { mpi->all_objects = 0; @@ -2022,7 +2272,7 @@ static int analyze_pattern(DbTableTree *tb, Eterm pattern, ++i; partly_bound = NIL; - res = key_given(tb, tpl, &mpi->save_term, &partly_bound); + res = key_given(tb, tpl, &(mpi->save_term), &partly_bound); if ( res >= 0 ) { /* Can match something */ key = 0; mpi->something_can_match = 1; @@ -2068,7 +2318,7 @@ static int analyze_pattern(DbTableTree *tb, Eterm pattern, return DB_ERROR_NONE; } -static int do_free_tree_cont(DbTableTree *tb, int num_left) +static SWord do_free_tree_continue(DbTableTree *tb, SWord reds) { TreeDbTerm *root; TreeDbTerm *p; @@ -2087,15 +2337,14 @@ static int do_free_tree_cont(DbTableTree *tb, int num_left) root = p; } else { free_term(tb, root); - if (--num_left > 0) { - break; - } else { - return 0; /* Done enough for now */ - } + if (--reds < 0) { + return reds; /* Done enough for now */ + } + break; } } } - return 1; + return reds; } /* @@ -2526,6 +2775,58 @@ static TreeDbTerm **find_node2(DbTableTree *tb, Eterm key) return this; } +/* + * Find node and return the address of the node pointer (NULL if not found) + * Tries to reuse the existing stack for performance. + */ + +static TreeDbTerm **find_ptr(DbTableTree *tb, DbTreeStack *stack, TreeDbTerm *this) { + Eterm key = GETKEY(tb, this->dbterm.tpl); + TreeDbTerm *tmp; + TreeDbTerm *parent; + Sint c; + + if(( tmp = TOP_NODE(stack)) != NULL) { + if (!cmp_key_eq(tb,key,tmp)) { + /* Start from the beginning */ + stack->pos = stack->slot = 0; + } + } + if (EMPTY_NODE(stack)) { /* Have to rebuild the stack */ + if (( tmp = tb->root ) == NULL) + return NULL; + for (;;) { + PUSH_NODE(stack, tmp); + if (( c = cmp_key(tb,key,tmp) ) < 0) { + if (tmp->left == NULL) /* We are at the next + and the element does + not exist */ + break; + else + tmp = tmp->left; + } else if (c > 0) { + if (tmp->right == NULL) /* Done */ + return NULL; + else + tmp = tmp->right; + } else + break; + } + } + + if (TOP_NODE(stack) != this) + return NULL; + + parent = TOPN_NODE(stack, 1); + if (parent == NULL) + return ((this != tb->root) ? NULL : &(tb->root)); + if (parent->left == this) + return &(parent->left); + if (parent->right == this) + return &(parent->right); + return NULL; +} + static int db_lookup_dbterm_tree(Process *p, DbTable *tbl, Eterm key, Eterm obj, DbUpdateHandle* handle) @@ -2667,13 +2968,60 @@ static void traverse_forward(DbTableTree *tb, } /* + * Traverse the tree with an update callback function, used by db_select_replace + */ +static void traverse_update_backwards(DbTableTree *tb, + DbTreeStack* stack, + Eterm lastkey, + int (*doit)(DbTableTree*, + TreeDbTerm**, + void*, + int), + void* context) +{ + int res; + TreeDbTerm *this, *next, **this_ptr; + + if (lastkey == THE_NON_VALUE) { + stack->pos = stack->slot = 0; + if (( this = tb->root ) == NULL) { + return; + } + while (this != NULL) { + PUSH_NODE(stack, this); + this = this->right; + } + this = TOP_NODE(stack); + this_ptr = find_ptr(tb, stack, this); + ASSERT(this_ptr != NULL); + res = (*doit)(tb, this_ptr, context, 0); + REPLACE_TOP_NODE(stack, *this_ptr); + next = find_prev(tb, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl)); + if (!res) + return; + } else { + next = find_prev(tb, stack, lastkey); + } + + while ((this = next) != NULL) { + this_ptr = find_ptr(tb, stack, this); + ASSERT(this_ptr != NULL); + res = (*doit)(tb, this_ptr, context, 0); + REPLACE_TOP_NODE(stack, *this_ptr); + next = find_prev(tb, stack, GETKEY(tb, (*this_ptr)->dbterm.tpl)); + if (!res) + return; + } +} + +/* * Returns 0 if not given 1 if given and -1 on no possible match * if key is given; *ret is set to point to the object concerned. */ -static int key_given(DbTableTree *tb, Eterm pattern, TreeDbTerm **ret, +static int key_given(DbTableTree *tb, Eterm pattern, TreeDbTerm ***ret, Eterm *partly_bound) { - TreeDbTerm *this; + TreeDbTerm **this; Eterm key; ASSERT(ret != NULL); @@ -2683,7 +3031,7 @@ static int key_given(DbTableTree *tb, Eterm pattern, TreeDbTerm **ret, if (is_non_value(key)) return -1; /* can't possibly match anything */ if (!db_has_variable(key)) { /* Bound key */ - if (( this = find_node(tb, key) ) == NULL) { + if (( this = find_node2(tb, key) ) == NULL) { return -1; } *ret = this; @@ -3106,6 +3454,46 @@ static int doit_select_delete(DbTableTree *tb, TreeDbTerm *this, void *ptr, return 1; } +static int doit_select_replace(DbTableTree *tb, TreeDbTerm **this, void *ptr, + int forward) +{ + struct select_replace_context *sc = (struct select_replace_context *) ptr; + Eterm ret; + + sc->lastobj = (*this)->dbterm.tpl; + + /* Always backwards traversing */ + if (sc->end_condition != NIL && + (cmp_partly_bound(sc->end_condition, + GETKEY_WITH_POS(sc->keypos, (*this)->dbterm.tpl)) > 0)) { + return 0; + } + ret = db_match_dbterm(&tb->common, sc->p, sc->mp, 0, + &(*this)->dbterm, NULL, 0); + + if (is_value(ret)) { + TreeDbTerm* new; + TreeDbTerm* old = *this; +#ifdef DEBUG + Eterm key = db_getkey(tb->common.keypos, ret); + ASSERT(is_value(key)); + ASSERT(cmp_key(tb, key, old) == 0); +#endif + new = new_dbterm(tb, ret); + new->left = old->left; + new->right = old->right; + new->balance = old->balance; + sc->lastobj = new->dbterm.tpl; + *this = new; + free_term(tb, old); + ++(sc->replaced); + } + if (--(sc->max) <= 0) { + return 0; + } + return 1; +} + #ifdef TREE_DEBUG static void do_dump_tree2(DbTableTree* tb, int to, void *to_arg, int show, TreeDbTerm *t, int offset) @@ -3134,6 +3522,9 @@ static void do_dump_tree2(DbTableTree* tb, int to, void *to_arg, int show, #ifdef HARDDEBUG +/* + * No called, but kept as it might come to use + */ void db_check_table_tree(DbTable *tbl) { DbTableTree *tb = &tbl->tree; |