diff options
Diffstat (limited to 'erts/emulator')
-rw-r--r-- | erts/emulator/beam/erl_db.c | 72 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_catree.c | 52 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_hash.c | 116 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_tree.c | 51 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_util.h | 35 |
5 files changed, 209 insertions, 117 deletions
diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c index 4132a54934..0a50af4d1a 100644 --- a/erts/emulator/beam/erl_db.c +++ b/erts/emulator/beam/erl_db.c @@ -81,14 +81,6 @@ static BIF_RETTYPE db_bif_fail(Process* p, Uint freason, /* Get a key from any table structure and a tagged object */ #define TERM_GETKEY(tb, obj) db_getkey((tb)->common.keypos, (obj)) - -/* How safe are we from double-hits or missed objects -** when iterating without fixation? */ -enum DbIterSafety { - ITER_UNSAFE, /* Must fixate to be safe */ - ITER_SAFE_LOCKED, /* Safe while table is locked, not between trap calls */ - ITER_SAFE /* No need to fixate at all */ -}; # define ITERATION_SAFETY(Proc,Tab) \ ((IS_TREE_TABLE((Tab)->common.status) || IS_CATREE_TABLE((Tab)->common.status) \ || ONLY_WRITER(Proc,Tab)) ? ITER_SAFE \ @@ -195,9 +187,6 @@ static int fixed_tabs_find(DbFixation* first, DbFixation* fix) #define ERTS_RBT_WANT_DELETE #define ERTS_RBT_WANT_FOREACH #define ERTS_RBT_WANT_FOREACH_DESTROY -#ifdef DEBUG -# define ERTS_RBT_WANT_LOOKUP -#endif #define ERTS_RBT_UNDEF #include "erl_rbtree.h" @@ -2287,6 +2276,7 @@ static BIF_RETTYPE ets_select_delete_trap_1(BIF_ALIST_1) Eterm ret; Eterm *tptr; db_lock_kind_t kind = LCK_WRITE_REC; + enum DbIterSafety safety = ITER_SAFE; CHECK_TABLES(); ASSERT(is_tuple(a1)); @@ -2296,10 +2286,11 @@ static BIF_RETTYPE ets_select_delete_trap_1(BIF_ALIST_1) DB_TRAP_GET_TABLE(tb, tptr[1], DB_WRITE, kind, &ets_select_delete_continue_exp); - cret = tb->common.meth->db_select_delete_continue(p,tb,a1,&ret); + cret = tb->common.meth->db_select_delete_continue(p,tb,a1,&ret,&safety); - if(!DID_TRAP(p,ret) && ITERATION_SAFETY(p,tb) != ITER_SAFE) { - unfix_table_locked(p, tb, &kind); + if(!DID_TRAP(p,ret) && safety != ITER_SAFE) { + ASSERT(erts_refc_read(&tb->common.fix_count,1)); + unfix_table_locked(p, tb, &kind); } db_unlock(tb, kind); @@ -2337,7 +2328,8 @@ BIF_RETTYPE ets_internal_select_delete_2(BIF_ALIST_2) if (safety == ITER_UNSAFE) { local_fix_table(tb); } - cret = tb->common.meth->db_select_delete(BIF_P, tb, BIF_ARG_1, BIF_ARG_2, &ret); + cret = tb->common.meth->db_select_delete(BIF_P, tb, BIF_ARG_1, BIF_ARG_2, + &ret, safety); if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) { fix_table_locked(BIF_P,tb); @@ -2729,7 +2721,7 @@ ets_select3(Process* p, DbTable* tb, Eterm tid, Eterm ms, Sint chunk_size) cret = tb->common.meth->db_select_chunk(p, tb, tid, ms, chunk_size, 0 /* not reversed */, - &ret); + &ret, safety); if (DID_TRAP(p,ret) && safety != ITER_SAFE) { fix_table_locked(p, tb); } @@ -2756,7 +2748,8 @@ ets_select3(Process* p, DbTable* tb, Eterm tid, Eterm ms, Sint chunk_size) } -/* We get here instead of in the real BIF when trapping */ +/* Trap here from: ets_select_1/2/3 + */ static BIF_RETTYPE ets_select_trap_1(BIF_ALIST_1) { Process *p = BIF_P; @@ -2767,6 +2760,7 @@ static BIF_RETTYPE ets_select_trap_1(BIF_ALIST_1) Eterm ret; Eterm *tptr; db_lock_kind_t kind = LCK_READ; + enum DbIterSafety safety = ITER_SAFE; CHECK_TABLES(); @@ -2776,11 +2770,13 @@ static BIF_RETTYPE ets_select_trap_1(BIF_ALIST_1) DB_TRAP_GET_TABLE(tb, tptr[1], DB_READ, kind, &ets_select_continue_exp); - cret = tb->common.meth->db_select_continue(p, tb, a1, - &ret); + cret = tb->common.meth->db_select_continue(p, tb, a1, &ret, &safety); - if (!DID_TRAP(p,ret) && ITERATION_SAFETY(p,tb) != ITER_SAFE) { - unfix_table_locked(p, tb, &kind); + if (!DID_TRAP(p,ret)) { + if (safety != ITER_SAFE) { + ASSERT(erts_refc_read(&tb->common.fix_count,1)); + unfix_table_locked(p, tb, &kind); + } } db_unlock(tb, kind); @@ -2805,8 +2801,12 @@ static BIF_RETTYPE ets_select_trap_1(BIF_ALIST_1) BIF_RETTYPE ets_select_1(BIF_ALIST_1) { return ets_select1(BIF_P, BIF_ets_select_1, BIF_ARG_1); + /* TRAP: ets_select_trap_1 */ } +/* + * Common impl for select/1, select_reverse/1, match/1 and match_object/1 + */ static BIF_RETTYPE ets_select1(Process *p, int bif_ix, Eterm arg1) { BIF_RETTYPE result; @@ -2814,7 +2814,7 @@ static BIF_RETTYPE ets_select1(Process *p, int bif_ix, Eterm arg1) int cret; Eterm ret; Eterm *tptr; - enum DbIterSafety safety; + enum DbIterSafety safety, safety_copy; CHECK_TABLES(); @@ -2839,7 +2839,8 @@ static BIF_RETTYPE ets_select1(Process *p, int bif_ix, Eterm arg1) local_fix_table(tb); } - cret = tb->common.meth->db_select_continue(p,tb, arg1, &ret); + safety_copy = safety; + cret = tb->common.meth->db_select_continue(p,tb, arg1, &ret, &safety_copy); if (DID_TRAP(p,ret) && safety != ITER_SAFE) { fix_table_locked(p, tb); @@ -2871,6 +2872,7 @@ BIF_RETTYPE ets_select_2(BIF_ALIST_2) DbTable* tb; DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_2); return ets_select2(BIF_P, tb, BIF_ARG_1, BIF_ARG_2); + /* TRAP: ets_select_trap_1 */ } static BIF_RETTYPE @@ -2888,7 +2890,7 @@ ets_select2(Process* p, DbTable* tb, Eterm tid, Eterm ms) local_fix_table(tb); } - cret = tb->common.meth->db_select(p, tb, tid, ms, 0, &ret); + cret = tb->common.meth->db_select(p, tb, tid, ms, 0, &ret, safety); if (DID_TRAP(p,ret) && safety != ITER_SAFE) { fix_table_locked(p, tb); @@ -2926,6 +2928,7 @@ static BIF_RETTYPE ets_select_count_1(BIF_ALIST_1) Eterm ret; Eterm *tptr; db_lock_kind_t kind = LCK_READ; + enum DbIterSafety safety = ITER_SAFE; CHECK_TABLES(); @@ -2935,9 +2938,10 @@ static BIF_RETTYPE ets_select_count_1(BIF_ALIST_1) DB_TRAP_GET_TABLE(tb, tptr[1], DB_READ, kind, &ets_select_count_continue_exp); - cret = tb->common.meth->db_select_count_continue(p, tb, a1, &ret); + cret = tb->common.meth->db_select_count_continue(p, tb, a1, &ret, &safety); - if (!DID_TRAP(p,ret) && ITERATION_SAFETY(p,tb) != ITER_SAFE) { + if (!DID_TRAP(p,ret) && safety != ITER_SAFE) { + ASSERT(erts_refc_read(&tb->common.fix_count,1)); unfix_table_locked(p, tb, &kind); } db_unlock(tb, kind); @@ -2975,7 +2979,8 @@ BIF_RETTYPE ets_select_count_2(BIF_ALIST_2) if (safety == ITER_UNSAFE) { local_fix_table(tb); } - cret = tb->common.meth->db_select_count(BIF_P,tb, BIF_ARG_1, BIF_ARG_2, &ret); + cret = tb->common.meth->db_select_count(BIF_P,tb, BIF_ARG_1, BIF_ARG_2, + &ret, safety); if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) { fix_table_locked(BIF_P, tb); @@ -3014,6 +3019,7 @@ static BIF_RETTYPE ets_select_replace_1(BIF_ALIST_1) Eterm ret; Eterm *tptr; db_lock_kind_t kind = LCK_WRITE_REC; + enum DbIterSafety safety = ITER_SAFE; CHECK_TABLES(); ASSERT(is_tuple(a1)); @@ -3023,9 +3029,10 @@ static BIF_RETTYPE ets_select_replace_1(BIF_ALIST_1) DB_TRAP_GET_TABLE(tb, tptr[1], DB_WRITE, kind, &ets_select_replace_continue_exp); - cret = tb->common.meth->db_select_replace_continue(p,tb,a1,&ret); + cret = tb->common.meth->db_select_replace_continue(p,tb,a1,&ret,&safety); - if(!DID_TRAP(p,ret) && ITERATION_SAFETY(p,tb) != ITER_SAFE) { + if(!DID_TRAP(p,ret) && safety != ITER_SAFE) { + ASSERT(erts_refc_read(&tb->common.fix_count,1)); unfix_table_locked(p, tb, &kind); } @@ -3068,7 +3075,8 @@ BIF_RETTYPE ets_select_replace_2(BIF_ALIST_2) if (safety == ITER_UNSAFE) { local_fix_table(tb); } - cret = tb->common.meth->db_select_replace(BIF_P, tb, BIF_ARG_1, BIF_ARG_2, &ret); + cret = tb->common.meth->db_select_replace(BIF_P, tb, BIF_ARG_1, BIF_ARG_2, + &ret, safety); if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) { fix_table_locked(BIF_P,tb); @@ -3120,7 +3128,7 @@ BIF_RETTYPE ets_select_reverse_3(BIF_ALIST_3) } cret = tb->common.meth->db_select_chunk(BIF_P,tb, BIF_ARG_1, BIF_ARG_2, chunk_size, - 1 /* reversed */, &ret); + 1 /* reversed */, &ret, safety); if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) { fix_table_locked(BIF_P, tb); } @@ -3165,7 +3173,7 @@ BIF_RETTYPE ets_select_reverse_2(BIF_ALIST_2) local_fix_table(tb); } cret = tb->common.meth->db_select(BIF_P,tb, BIF_ARG_1, BIF_ARG_2, - 1 /*reversed*/, &ret); + 1 /*reversed*/, &ret, safety); if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) { fix_table_locked(BIF_P, tb); diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c index 75ac1c4a93..0402c6b7b4 100644 --- a/erts/emulator/beam/erl_db_catree.c +++ b/erts/emulator/beam/erl_db_catree.c @@ -116,24 +116,31 @@ static int db_erase_object_catree(DbTable *tbl, Eterm object,Eterm *ret); static int db_slot_catree(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret); static int db_select_catree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, int reversed, Eterm *ret); + Eterm pattern, int reversed, Eterm *ret, + enum DbIterSafety); static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret); + Eterm pattern, Eterm *ret, enum DbIterSafety); static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, - int reversed, Eterm *ret); + int reversed, Eterm *ret, enum DbIterSafety); static int db_select_continue_catree(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_select_count_continue_catree(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret); + Eterm pattern, Eterm *ret, + enum DbIterSafety); static int db_select_delete_continue_catree(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret); + Eterm pattern, Eterm *ret, + enum DbIterSafety); static int db_select_replace_continue_catree(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_take_catree(Process *, DbTable *, Eterm, Eterm *); static void db_print_catree(fmtfn_t to, void *to_arg, int show, DbTable *tbl); @@ -1843,7 +1850,8 @@ static int db_slot_catree(Process *p, DbTable *tbl, static int db_select_continue_catree(Process *p, DbTable *tbl, Eterm continuation, - Eterm *ret) + Eterm *ret, + enum DbIterSafety* safety_p) { int result; CATreeRootIterator iter; @@ -1856,7 +1864,8 @@ static int db_select_continue_catree(Process *p, } static int db_select_catree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, int reverse, Eterm *ret) + Eterm pattern, int reverse, Eterm *ret, + enum DbIterSafety safety) { int result; CATreeRootIterator iter; @@ -1871,7 +1880,8 @@ static int db_select_catree(Process *p, DbTable *tbl, Eterm tid, static int db_select_count_continue_catree(Process *p, DbTable *tbl, Eterm continuation, - Eterm *ret) + Eterm *ret, + enum DbIterSafety* safety_p) { int result; CATreeRootIterator iter; @@ -1885,7 +1895,8 @@ static int db_select_count_continue_catree(Process *p, } static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) + Eterm pattern, Eterm *ret, + enum DbIterSafety safety) { int result; CATreeRootIterator iter; @@ -1899,7 +1910,8 @@ static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid, static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, - int reversed, Eterm *ret) + int reversed, Eterm *ret, + enum DbIterSafety safety) { int result; CATreeRootIterator iter; @@ -1915,7 +1927,8 @@ static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid, static int db_select_delete_continue_catree(Process *p, DbTable *tbl, Eterm continuation, - Eterm *ret) + Eterm *ret, + enum DbIterSafety* safety_p) { DbTreeStack stack; TreeDbTerm * stack_array[STACK_NEED]; @@ -1931,7 +1944,8 @@ static int db_select_delete_continue_catree(Process *p, } static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) + Eterm pattern, Eterm *ret, + enum DbIterSafety safety) { DbTreeStack stack; TreeDbTerm * stack_array[STACK_NEED]; @@ -1948,7 +1962,8 @@ static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid, } static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) + Eterm pattern, Eterm *ret, + enum DbIterSafety safety_p) { int result; CATreeRootIterator iter; @@ -1961,7 +1976,8 @@ static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid, } static int db_select_replace_continue_catree(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret) + Eterm continuation, Eterm *ret, + enum DbIterSafety* safety_p) { int result; CATreeRootIterator iter; diff --git a/erts/emulator/beam/erl_db_hash.c b/erts/emulator/beam/erl_db_hash.c index a2690f6d6a..fac4703620 100644 --- a/erts/emulator/beam/erl_db_hash.c +++ b/erts/emulator/beam/erl_db_hash.c @@ -404,26 +404,31 @@ static int db_slot_hash(Process *p, DbTable *tbl, static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, - int reverse, Eterm *ret); + int reverse, Eterm *ret, enum DbIterSafety); static int db_select_hash(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, int reverse, Eterm *ret); + Eterm pattern, int reverse, Eterm *ret, + enum DbIterSafety); static int db_select_continue_hash(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret); + Eterm pattern, Eterm *ret, enum DbIterSafety); static int db_select_count_continue_hash(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); - + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret); + Eterm pattern, Eterm *ret, + enum DbIterSafety); static int db_select_delete_continue_hash(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret); + Eterm pattern, Eterm *ret, enum DbIterSafety); static int db_select_replace_continue_hash(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_take_hash(Process *, DbTable *, Eterm, Eterm *); static void db_print_hash(fmtfn_t to, @@ -535,7 +540,7 @@ DbTableMethod db_hash = db_select_chunk_hash, db_select_hash, db_select_delete_hash, - db_select_continue_hash, /* hmm continue_hash? */ + db_select_continue_hash, db_select_delete_continue_hash, db_select_count_hash, db_select_count_continue_hash, @@ -1209,6 +1214,7 @@ struct traverse_context_t_ DbTableHash* tb; Eterm tid; Eterm* prev_continuation_tptr; + enum DbIterSafety safety; }; @@ -1476,11 +1482,11 @@ static ERTS_INLINE int on_simple_trap(Export* trap_function, BUMP_ALL_REDS(ctx->p); if (IS_USMALL(0, got)) { - hp = HAllocX(ctx->p, base_halloc_sz + 5, ERTS_MAGIC_REF_THING_SIZE); + hp = HAllocX(ctx->p, base_halloc_sz + 6, ERTS_MAGIC_REF_THING_SIZE); egot = make_small(got); } else { - hp = HAllocX(ctx->p, base_halloc_sz + BIG_UINT_HEAP_SIZE + 5, + hp = HAllocX(ctx->p, base_halloc_sz + BIG_UINT_HEAP_SIZE + 6, ERTS_MAGIC_REF_THING_SIZE); egot = uint_to_big(got, hp); hp += BIG_UINT_HEAP_SIZE; @@ -1497,12 +1503,13 @@ static ERTS_INLINE int on_simple_trap(Export* trap_function, mpb = ctx->prev_continuation_tptr[3]; } - continuation = TUPLE4( + continuation = TUPLE5( hp, ctx->tid, make_small(slot_ix), mpb, - egot); + egot, + make_small(ctx->safety)); ERTS_BIF_PREP_TRAP1(*ret, trap_function, ctx->p, continuation); return DB_ERROR_NONE; } @@ -1512,17 +1519,18 @@ static ERTS_INLINE int unpack_simple_continuation(Eterm continuation, Eterm* tid_ptr, Sint* slot_ix_p, Binary** mpp, - Sint* got_p) + Sint* got_p, + enum DbIterSafety* safety_p) { Eterm* tptr; ASSERT(is_tuple(continuation)); tptr = tuple_val(continuation); - if (arityval(*tptr) != 4) + if (*tptr != make_arityval(5)) return 1; - if (! is_small(tptr[2]) || !(is_big(tptr[4]) || is_small(tptr[4]))) { + if (!is_small(tptr[2]) || !(is_big(tptr[4]) || is_small(tptr[4])) + || !is_small(tptr[5])) return 1; - } *tptr_ptr = tptr; *tid_ptr = tptr[1]; @@ -1534,6 +1542,7 @@ static ERTS_INLINE int unpack_simple_continuation(Eterm continuation, else { *got_p = unsigned_val(tptr[4]); } + *safety_p = signed_val(tptr[5]); return 0; } @@ -1657,32 +1666,34 @@ static int select_chunk_on_trap(traverse_context_t* ctx_base, if (ctx->base.prev_continuation_tptr == NULL) { Eterm tid = ctx->base.tid; /* First time we're trapping */ - hp = HAllocX(ctx->base.p, 7 + ERTS_MAGIC_REF_THING_SIZE, + hp = HAllocX(ctx->base.p, 8 + ERTS_MAGIC_REF_THING_SIZE, ERTS_MAGIC_REF_THING_SIZE); if (is_atom(tid)) tid = erts_db_make_tid(ctx->base.p, &ctx->base.tb->common); mpb = erts_db_make_match_prog_ref(ctx->base.p, *mpp, &hp); - continuation = TUPLE6( + continuation = TUPLE7( hp, tid, make_small(slot_ix), make_small(ctx->chunk_size), mpb, ctx->match_list, - make_small(got)); + make_small(got), + make_small(ctx->base.safety)); *mpp = NULL; /* otherwise the caller will destroy it */ } else { /* Not the first time we're trapping; reuse continuation terms */ - hp = HAlloc(ctx->base.p, 7); - continuation = TUPLE6( + hp = HAlloc(ctx->base.p, 8); + continuation = TUPLE7( hp, ctx->base.prev_continuation_tptr[1], make_small(slot_ix), ctx->base.prev_continuation_tptr[3], ctx->base.prev_continuation_tptr[4], ctx->match_list, - make_small(got)); + make_small(got), + make_small(ctx->base.safety)); } ERTS_BIF_PREP_TRAP1(*ret, &ets_select_continue_exp, ctx->base.p, continuation); @@ -1690,14 +1701,14 @@ static int select_chunk_on_trap(traverse_context_t* ctx_base, } static int db_select_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, - int reverse, Eterm *ret) + int reverse, Eterm *ret, enum DbIterSafety safety) { - return db_select_chunk_hash(p, tbl, tid, pattern, 0, reverse, ret); + return db_select_chunk_hash(p, tbl, tid, pattern, 0, reverse, ret, safety); } static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, - int reverse, Eterm *ret) + int reverse, Eterm *ret, enum DbIterSafety safety) { select_chunk_context_t ctx; @@ -1709,6 +1720,7 @@ static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid, ctx.base.tb = &tbl->hash; ctx.base.tid = tid; ctx.base.prev_continuation_tptr = NULL; + ctx.base.safety = safety; ctx.hp = NULL; ctx.chunk_size = chunk_size; ctx.match_list = NIL; @@ -1784,10 +1796,11 @@ int select_chunk_continue_on_loop_ended(traverse_context_t* ctx_base, } /* - * This is called when select traps + * This is called when ets:select/1/2/3 traps + * and for ets:select/1 with user continuation term. */ static int db_select_continue_hash(Process* p, DbTable* tbl, Eterm continuation, - Eterm* ret) + Eterm* ret, enum DbIterSafety* safety_p) { select_chunk_context_t ctx; Eterm* tptr; @@ -1803,7 +1816,13 @@ static int db_select_continue_hash(Process* p, DbTable* tbl, Eterm continuation, ASSERT(is_tuple(continuation)); tptr = tuple_val(continuation); - if (arityval(*tptr) != 6) + /* + * 6-tuple is select/1 user continuation term + * 7-tuple is select trap continuation + */ + if (*tptr == make_arityval(7) && is_small(tptr[7])) + *safety_p = signed_val(tptr[7]); + else if (*tptr != make_arityval(6)) goto badparam; if (!is_small(tptr[2]) || !is_small(tptr[3]) || @@ -1831,6 +1850,7 @@ static int db_select_continue_hash(Process* p, DbTable* tbl, Eterm continuation, ctx.base.tb = &tbl->hash; ctx.base.tid = tid; ctx.base.prev_continuation_tptr = tptr; + ctx.base.safety = *safety_p; ctx.hp = NULL; ctx.chunk_size = chunk_size; ctx.match_list = match_list; @@ -1891,7 +1911,8 @@ static int select_count_on_trap(traverse_context_t* ctx, } static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) + Eterm pattern, Eterm *ret, + enum DbIterSafety safety) { traverse_context_t ctx; Sint iterations_left = MAX_SELECT_COUNT_ITERATIONS; @@ -1905,6 +1926,7 @@ static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid, ctx.tb = &tbl->hash; ctx.tid = tid; ctx.prev_continuation_tptr = NULL; + ctx.safety = safety; return match_traverse( &ctx, @@ -1917,7 +1939,8 @@ static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid, * This is called when select_count traps */ static int db_select_count_continue_hash(Process* p, DbTable* tbl, - Eterm continuation, Eterm* ret) + Eterm continuation, Eterm* ret, + enum DbIterSafety* safety_p) { traverse_context_t ctx; Eterm* tptr; @@ -1928,7 +1951,8 @@ static int db_select_count_continue_hash(Process* p, DbTable* tbl, Sint chunk_size = 0; *ret = NIL; - if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) { + if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, + &got, safety_p)) { *ret = NIL; return DB_ERROR_BADPARAM; } @@ -1940,6 +1964,7 @@ static int db_select_count_continue_hash(Process* p, DbTable* tbl, ctx.tb = &tbl->hash; ctx.tid = tid; ctx.prev_continuation_tptr = tptr; + ctx.safety = *safety_p; return match_traverse_continue( &ctx, chunk_size, @@ -2033,7 +2058,8 @@ static int select_delete_on_trap(traverse_context_t* ctx_base, } static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) + Eterm pattern, Eterm *ret, + enum DbIterSafety safety) { select_delete_context_t ctx; Sint chunk_size = 0; @@ -2046,6 +2072,7 @@ static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid, ctx.base.tb = &tbl->hash; ctx.base.tid = tid; ctx.base.prev_continuation_tptr = NULL; + ctx.base.safety = safety; ctx.fixated_by_me = ctx.base.tb->common.is_thread_safe ? 0 : 1; ctx.last_pseudo_delete = (Uint) -1; ctx.free_us = NULL; @@ -2062,7 +2089,8 @@ static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid, * This is called when select_delete traps */ static int db_select_delete_continue_hash(Process* p, DbTable* tbl, - Eterm continuation, Eterm* ret) + Eterm continuation, Eterm* ret, + enum DbIterSafety* safety_p) { select_delete_context_t ctx; Eterm* tptr; @@ -2072,7 +2100,8 @@ static int db_select_delete_continue_hash(Process* p, DbTable* tbl, Sint slot_ix; Sint chunk_size = 0; - if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) { + if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, + &got, safety_p)) { *ret = NIL; return DB_ERROR_BADPARAM; } @@ -2084,6 +2113,7 @@ static int db_select_delete_continue_hash(Process* p, DbTable* tbl, ctx.base.tb = &tbl->hash; ctx.base.tid = tid; ctx.base.prev_continuation_tptr = tptr; + ctx.base.safety = *safety_p; ctx.fixated_by_me = ONLY_WRITER(p, ctx.base.tb) ? 0 : 1; ctx.last_pseudo_delete = (Uint) -1; ctx.free_us = NULL; @@ -2164,7 +2194,9 @@ static int select_replace_on_trap(traverse_context_t* ctx, slot_ix, got, mpp, ret); } -static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) +static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret, + enum DbIterSafety safety) { traverse_context_t ctx; Sint chunk_size = 0; @@ -2194,7 +2226,9 @@ static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pat /* * This is called when select_replace traps */ -static int db_select_replace_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) +static int db_select_replace_continue_hash(Process* p, DbTable* tbl, + Eterm continuation, Eterm* ret, + enum DbIterSafety* safety_p) { traverse_context_t ctx; Eterm* tptr; @@ -2205,7 +2239,8 @@ static int db_select_replace_continue_hash(Process* p, DbTable* tbl, Eterm conti Sint chunk_size = 0; *ret = NIL; - if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) { + if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, + &got, safety_p)) { *ret = NIL; return DB_ERROR_BADPARAM; } @@ -2218,6 +2253,7 @@ static int db_select_replace_continue_hash(Process* p, DbTable* tbl, Eterm conti ctx.tb = &tbl->hash; ctx.tid = tid; ctx.prev_continuation_tptr = tptr; + ctx.safety = *safety_p; return match_traverse_continue( &ctx, chunk_size, diff --git a/erts/emulator/beam/erl_db_tree.c b/erts/emulator/beam/erl_db_tree.c index fe57348700..f9ba04f399 100644 --- a/erts/emulator/beam/erl_db_tree.c +++ b/erts/emulator/beam/erl_db_tree.c @@ -397,24 +397,31 @@ static int db_erase_object_tree(DbTable *tbl, Eterm object,Eterm *ret); static int db_slot_tree(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret); static int db_select_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, int reversed, Eterm *ret); + Eterm pattern, int reversed, Eterm *ret, + enum DbIterSafety); static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret); + Eterm pattern, Eterm *ret, enum DbIterSafety); static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, - int reversed, Eterm *ret); + int reversed, Eterm *ret, enum DbIterSafety); static int db_select_continue_tree(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_select_count_continue_tree(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret); + Eterm pattern, Eterm *ret, + enum DbIterSafety); static int db_select_delete_continue_tree(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret); + Eterm pattern, Eterm *ret, + enum DbIterSafety); static int db_select_replace_continue_tree(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_take_tree(Process *, DbTable *, Eterm, Eterm *); static void db_print_tree(fmtfn_t to, void *to_arg, int show, DbTable *tbl); @@ -1160,7 +1167,8 @@ int db_select_continue_tree_common(Process *p, static int db_select_continue_tree(Process *p, DbTable *tbl, Eterm continuation, - Eterm *ret) + Eterm *ret, + enum DbIterSafety* safety_p) { DbTableTree *tb = &tbl->tree; return db_select_continue_tree_common(p, &tb->common, @@ -1297,7 +1305,8 @@ int db_select_tree_common(Process *p, DbTable *tb, } static int db_select_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, int reverse, Eterm *ret) + Eterm pattern, int reverse, Eterm *ret, + enum DbIterSafety safety) { return db_select_tree_common(p, tbl, tid, pattern, reverse, ret, &tbl->tree, NULL); @@ -1408,7 +1417,8 @@ int db_select_count_continue_tree_common(Process *p, static int db_select_count_continue_tree(Process *p, DbTable *tbl, Eterm continuation, - Eterm *ret) + Eterm *ret, + enum DbIterSafety* safety_p) { DbTableTree *tb = &tbl->tree; return db_select_count_continue_tree_common(p, tbl, @@ -1527,7 +1537,8 @@ int db_select_count_tree_common(Process *p, DbTable *tb, } static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) + Eterm pattern, Eterm *ret, + enum DbIterSafety safety) { DbTableTree *tb = &tbl->tree; return db_select_count_tree_common(p, tbl, @@ -1704,7 +1715,7 @@ int db_select_chunk_tree_common(Process *p, DbTable *tb, static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, int reverse, - Eterm *ret) + Eterm *ret, enum DbIterSafety safety) { DbTableTree *tb = &tbl->tree; return db_select_chunk_tree_common(p, tbl, @@ -1813,7 +1824,8 @@ int db_select_delete_continue_tree_common(Process *p, static int db_select_delete_continue_tree(Process *p, DbTable *tbl, Eterm continuation, - Eterm *ret) + Eterm *ret, + enum DbIterSafety* safety_p) { DbTableTree *tb = &tbl->tree; ASSERT(!erts_atomic_read_nob(&tb->is_stack_busy)); @@ -1942,7 +1954,8 @@ int db_select_delete_tree_common(Process *p, DbTable *tbl, } static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) + Eterm pattern, Eterm *ret, + enum DbIterSafety safety) { DbTableTree *tb = &tbl->tree; return db_select_delete_tree_common(p, tbl, tid, pattern, ret, @@ -2052,7 +2065,8 @@ int db_select_replace_continue_tree_common(Process *p, static int db_select_replace_continue_tree(Process *p, DbTable *tbl, Eterm continuation, - Eterm *ret) + Eterm *ret, + enum DbIterSafety* safety_p) { return db_select_replace_continue_tree_common(p, tbl, continuation, ret, &tbl->tree, NULL); @@ -2177,7 +2191,8 @@ int db_select_replace_tree_common(Process *p, DbTable *tbl, } static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) + Eterm pattern, Eterm *ret, + enum DbIterSafety safety) { return db_select_replace_tree_common(p, tbl, tid, pattern, ret, &tbl->tree, NULL); diff --git a/erts/emulator/beam/erl_db_util.h b/erts/emulator/beam/erl_db_util.h index e1af9210ea..e3d3c0e804 100644 --- a/erts/emulator/beam/erl_db_util.h +++ b/erts/emulator/beam/erl_db_util.h @@ -101,6 +101,14 @@ typedef struct { } u; } DbUpdateHandle; +/* How safe are we from double-hits or missed objects + * when iterating without fixation? + */ +enum DbIterSafety { + ITER_UNSAFE, /* Must fixate to be safe */ + ITER_SAFE_LOCKED, /* Safe while table is locked, not between trap calls */ + ITER_SAFE /* No need to fixate at all */ +}; typedef struct db_table_method { @@ -150,44 +158,53 @@ typedef struct db_table_method Eterm pattern, Sint chunk_size, int reverse, - Eterm* ret); + Eterm* ret, + enum DbIterSafety); int (*db_select)(Process* p, DbTable* tb, /* [in out] */ Eterm tid, Eterm pattern, int reverse, - Eterm* ret); + Eterm* ret, + enum DbIterSafety); int (*db_select_delete)(Process* p, DbTable* tb, /* [in out] */ Eterm tid, Eterm pattern, - Eterm* ret); + Eterm* ret, + enum DbIterSafety); int (*db_select_continue)(Process* p, DbTable* tb, /* [in out] */ Eterm continuation, - Eterm* ret); + Eterm* ret, + enum DbIterSafety*); int (*db_select_delete_continue)(Process* p, DbTable* tb, /* [in out] */ Eterm continuation, - Eterm* ret); + Eterm* ret, + enum DbIterSafety*); int (*db_select_count)(Process* p, DbTable* tb, /* [in out] */ Eterm tid, Eterm pattern, - Eterm* ret); + Eterm* ret, + enum DbIterSafety); int (*db_select_count_continue)(Process* p, DbTable* tb, /* [in out] */ Eterm continuation, - Eterm* ret); + Eterm* ret, + enum DbIterSafety*); int (*db_select_replace)(Process* p, DbTable* tb, /* [in out] */ Eterm tid, Eterm pattern, - Eterm* ret); + Eterm* ret, + enum DbIterSafety); int (*db_select_replace_continue)(Process* p, DbTable* tb, /* [in out] */ Eterm continuation, - Eterm* ret); + Eterm* ret, + enum DbIterSafety*); int (*db_take)(Process *, DbTable *, Eterm, Eterm *); SWord (*db_delete_all_objects)(Process* p, DbTable* db, SWord reds); |