diff options
-rw-r--r-- | erts/emulator/beam/erl_db.c | 72 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_catree.c | 52 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_hash.c | 431 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_tree.c | 51 | ||||
-rw-r--r-- | erts/emulator/beam/erl_db_util.h | 35 | ||||
-rw-r--r-- | lib/stdlib/doc/src/ets.xml | 16 | ||||
-rw-r--r-- | lib/stdlib/test/ets_SUITE.erl | 60 |
7 files changed, 417 insertions, 300 deletions
diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c index 4132a54934..0a50af4d1a 100644 --- a/erts/emulator/beam/erl_db.c +++ b/erts/emulator/beam/erl_db.c @@ -81,14 +81,6 @@ static BIF_RETTYPE db_bif_fail(Process* p, Uint freason, /* Get a key from any table structure and a tagged object */ #define TERM_GETKEY(tb, obj) db_getkey((tb)->common.keypos, (obj)) - -/* How safe are we from double-hits or missed objects -** when iterating without fixation? */ -enum DbIterSafety { - ITER_UNSAFE, /* Must fixate to be safe */ - ITER_SAFE_LOCKED, /* Safe while table is locked, not between trap calls */ - ITER_SAFE /* No need to fixate at all */ -}; # define ITERATION_SAFETY(Proc,Tab) \ ((IS_TREE_TABLE((Tab)->common.status) || IS_CATREE_TABLE((Tab)->common.status) \ || ONLY_WRITER(Proc,Tab)) ? ITER_SAFE \ @@ -195,9 +187,6 @@ static int fixed_tabs_find(DbFixation* first, DbFixation* fix) #define ERTS_RBT_WANT_DELETE #define ERTS_RBT_WANT_FOREACH #define ERTS_RBT_WANT_FOREACH_DESTROY -#ifdef DEBUG -# define ERTS_RBT_WANT_LOOKUP -#endif #define ERTS_RBT_UNDEF #include "erl_rbtree.h" @@ -2287,6 +2276,7 @@ static BIF_RETTYPE ets_select_delete_trap_1(BIF_ALIST_1) Eterm ret; Eterm *tptr; db_lock_kind_t kind = LCK_WRITE_REC; + enum DbIterSafety safety = ITER_SAFE; CHECK_TABLES(); ASSERT(is_tuple(a1)); @@ -2296,10 +2286,11 @@ static BIF_RETTYPE ets_select_delete_trap_1(BIF_ALIST_1) DB_TRAP_GET_TABLE(tb, tptr[1], DB_WRITE, kind, &ets_select_delete_continue_exp); - cret = tb->common.meth->db_select_delete_continue(p,tb,a1,&ret); + cret = tb->common.meth->db_select_delete_continue(p,tb,a1,&ret,&safety); - if(!DID_TRAP(p,ret) && ITERATION_SAFETY(p,tb) != ITER_SAFE) { - unfix_table_locked(p, tb, &kind); + if(!DID_TRAP(p,ret) && safety != ITER_SAFE) { + ASSERT(erts_refc_read(&tb->common.fix_count,1)); + unfix_table_locked(p, tb, &kind); } db_unlock(tb, kind); @@ -2337,7 +2328,8 @@ BIF_RETTYPE ets_internal_select_delete_2(BIF_ALIST_2) if (safety == ITER_UNSAFE) { local_fix_table(tb); } - cret = tb->common.meth->db_select_delete(BIF_P, tb, BIF_ARG_1, BIF_ARG_2, &ret); + cret = tb->common.meth->db_select_delete(BIF_P, tb, BIF_ARG_1, BIF_ARG_2, + &ret, safety); if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) { fix_table_locked(BIF_P,tb); @@ -2729,7 +2721,7 @@ ets_select3(Process* p, DbTable* tb, Eterm tid, Eterm ms, Sint chunk_size) cret = tb->common.meth->db_select_chunk(p, tb, tid, ms, chunk_size, 0 /* not reversed */, - &ret); + &ret, safety); if (DID_TRAP(p,ret) && safety != ITER_SAFE) { fix_table_locked(p, tb); } @@ -2756,7 +2748,8 @@ ets_select3(Process* p, DbTable* tb, Eterm tid, Eterm ms, Sint chunk_size) } -/* We get here instead of in the real BIF when trapping */ +/* Trap here from: ets_select_1/2/3 + */ static BIF_RETTYPE ets_select_trap_1(BIF_ALIST_1) { Process *p = BIF_P; @@ -2767,6 +2760,7 @@ static BIF_RETTYPE ets_select_trap_1(BIF_ALIST_1) Eterm ret; Eterm *tptr; db_lock_kind_t kind = LCK_READ; + enum DbIterSafety safety = ITER_SAFE; CHECK_TABLES(); @@ -2776,11 +2770,13 @@ static BIF_RETTYPE ets_select_trap_1(BIF_ALIST_1) DB_TRAP_GET_TABLE(tb, tptr[1], DB_READ, kind, &ets_select_continue_exp); - cret = tb->common.meth->db_select_continue(p, tb, a1, - &ret); + cret = tb->common.meth->db_select_continue(p, tb, a1, &ret, &safety); - if (!DID_TRAP(p,ret) && ITERATION_SAFETY(p,tb) != ITER_SAFE) { - unfix_table_locked(p, tb, &kind); + if (!DID_TRAP(p,ret)) { + if (safety != ITER_SAFE) { + ASSERT(erts_refc_read(&tb->common.fix_count,1)); + unfix_table_locked(p, tb, &kind); + } } db_unlock(tb, kind); @@ -2805,8 +2801,12 @@ static BIF_RETTYPE ets_select_trap_1(BIF_ALIST_1) BIF_RETTYPE ets_select_1(BIF_ALIST_1) { return ets_select1(BIF_P, BIF_ets_select_1, BIF_ARG_1); + /* TRAP: ets_select_trap_1 */ } +/* + * Common impl for select/1, select_reverse/1, match/1 and match_object/1 + */ static BIF_RETTYPE ets_select1(Process *p, int bif_ix, Eterm arg1) { BIF_RETTYPE result; @@ -2814,7 +2814,7 @@ static BIF_RETTYPE ets_select1(Process *p, int bif_ix, Eterm arg1) int cret; Eterm ret; Eterm *tptr; - enum DbIterSafety safety; + enum DbIterSafety safety, safety_copy; CHECK_TABLES(); @@ -2839,7 +2839,8 @@ static BIF_RETTYPE ets_select1(Process *p, int bif_ix, Eterm arg1) local_fix_table(tb); } - cret = tb->common.meth->db_select_continue(p,tb, arg1, &ret); + safety_copy = safety; + cret = tb->common.meth->db_select_continue(p,tb, arg1, &ret, &safety_copy); if (DID_TRAP(p,ret) && safety != ITER_SAFE) { fix_table_locked(p, tb); @@ -2871,6 +2872,7 @@ BIF_RETTYPE ets_select_2(BIF_ALIST_2) DbTable* tb; DB_BIF_GET_TABLE(tb, DB_READ, LCK_READ, BIF_ets_select_2); return ets_select2(BIF_P, tb, BIF_ARG_1, BIF_ARG_2); + /* TRAP: ets_select_trap_1 */ } static BIF_RETTYPE @@ -2888,7 +2890,7 @@ ets_select2(Process* p, DbTable* tb, Eterm tid, Eterm ms) local_fix_table(tb); } - cret = tb->common.meth->db_select(p, tb, tid, ms, 0, &ret); + cret = tb->common.meth->db_select(p, tb, tid, ms, 0, &ret, safety); if (DID_TRAP(p,ret) && safety != ITER_SAFE) { fix_table_locked(p, tb); @@ -2926,6 +2928,7 @@ static BIF_RETTYPE ets_select_count_1(BIF_ALIST_1) Eterm ret; Eterm *tptr; db_lock_kind_t kind = LCK_READ; + enum DbIterSafety safety = ITER_SAFE; CHECK_TABLES(); @@ -2935,9 +2938,10 @@ static BIF_RETTYPE ets_select_count_1(BIF_ALIST_1) DB_TRAP_GET_TABLE(tb, tptr[1], DB_READ, kind, &ets_select_count_continue_exp); - cret = tb->common.meth->db_select_count_continue(p, tb, a1, &ret); + cret = tb->common.meth->db_select_count_continue(p, tb, a1, &ret, &safety); - if (!DID_TRAP(p,ret) && ITERATION_SAFETY(p,tb) != ITER_SAFE) { + if (!DID_TRAP(p,ret) && safety != ITER_SAFE) { + ASSERT(erts_refc_read(&tb->common.fix_count,1)); unfix_table_locked(p, tb, &kind); } db_unlock(tb, kind); @@ -2975,7 +2979,8 @@ BIF_RETTYPE ets_select_count_2(BIF_ALIST_2) if (safety == ITER_UNSAFE) { local_fix_table(tb); } - cret = tb->common.meth->db_select_count(BIF_P,tb, BIF_ARG_1, BIF_ARG_2, &ret); + cret = tb->common.meth->db_select_count(BIF_P,tb, BIF_ARG_1, BIF_ARG_2, + &ret, safety); if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) { fix_table_locked(BIF_P, tb); @@ -3014,6 +3019,7 @@ static BIF_RETTYPE ets_select_replace_1(BIF_ALIST_1) Eterm ret; Eterm *tptr; db_lock_kind_t kind = LCK_WRITE_REC; + enum DbIterSafety safety = ITER_SAFE; CHECK_TABLES(); ASSERT(is_tuple(a1)); @@ -3023,9 +3029,10 @@ static BIF_RETTYPE ets_select_replace_1(BIF_ALIST_1) DB_TRAP_GET_TABLE(tb, tptr[1], DB_WRITE, kind, &ets_select_replace_continue_exp); - cret = tb->common.meth->db_select_replace_continue(p,tb,a1,&ret); + cret = tb->common.meth->db_select_replace_continue(p,tb,a1,&ret,&safety); - if(!DID_TRAP(p,ret) && ITERATION_SAFETY(p,tb) != ITER_SAFE) { + if(!DID_TRAP(p,ret) && safety != ITER_SAFE) { + ASSERT(erts_refc_read(&tb->common.fix_count,1)); unfix_table_locked(p, tb, &kind); } @@ -3068,7 +3075,8 @@ BIF_RETTYPE ets_select_replace_2(BIF_ALIST_2) if (safety == ITER_UNSAFE) { local_fix_table(tb); } - cret = tb->common.meth->db_select_replace(BIF_P, tb, BIF_ARG_1, BIF_ARG_2, &ret); + cret = tb->common.meth->db_select_replace(BIF_P, tb, BIF_ARG_1, BIF_ARG_2, + &ret, safety); if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) { fix_table_locked(BIF_P,tb); @@ -3120,7 +3128,7 @@ BIF_RETTYPE ets_select_reverse_3(BIF_ALIST_3) } cret = tb->common.meth->db_select_chunk(BIF_P,tb, BIF_ARG_1, BIF_ARG_2, chunk_size, - 1 /* reversed */, &ret); + 1 /* reversed */, &ret, safety); if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) { fix_table_locked(BIF_P, tb); } @@ -3165,7 +3173,7 @@ BIF_RETTYPE ets_select_reverse_2(BIF_ALIST_2) local_fix_table(tb); } cret = tb->common.meth->db_select(BIF_P,tb, BIF_ARG_1, BIF_ARG_2, - 1 /*reversed*/, &ret); + 1 /*reversed*/, &ret, safety); if (DID_TRAP(BIF_P,ret) && safety != ITER_SAFE) { fix_table_locked(BIF_P, tb); diff --git a/erts/emulator/beam/erl_db_catree.c b/erts/emulator/beam/erl_db_catree.c index 75ac1c4a93..0402c6b7b4 100644 --- a/erts/emulator/beam/erl_db_catree.c +++ b/erts/emulator/beam/erl_db_catree.c @@ -116,24 +116,31 @@ static int db_erase_object_catree(DbTable *tbl, Eterm object,Eterm *ret); static int db_slot_catree(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret); static int db_select_catree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, int reversed, Eterm *ret); + Eterm pattern, int reversed, Eterm *ret, + enum DbIterSafety); static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret); + Eterm pattern, Eterm *ret, enum DbIterSafety); static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, - int reversed, Eterm *ret); + int reversed, Eterm *ret, enum DbIterSafety); static int db_select_continue_catree(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_select_count_continue_catree(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret); + Eterm pattern, Eterm *ret, + enum DbIterSafety); static int db_select_delete_continue_catree(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret); + Eterm pattern, Eterm *ret, + enum DbIterSafety); static int db_select_replace_continue_catree(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_take_catree(Process *, DbTable *, Eterm, Eterm *); static void db_print_catree(fmtfn_t to, void *to_arg, int show, DbTable *tbl); @@ -1843,7 +1850,8 @@ static int db_slot_catree(Process *p, DbTable *tbl, static int db_select_continue_catree(Process *p, DbTable *tbl, Eterm continuation, - Eterm *ret) + Eterm *ret, + enum DbIterSafety* safety_p) { int result; CATreeRootIterator iter; @@ -1856,7 +1864,8 @@ static int db_select_continue_catree(Process *p, } static int db_select_catree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, int reverse, Eterm *ret) + Eterm pattern, int reverse, Eterm *ret, + enum DbIterSafety safety) { int result; CATreeRootIterator iter; @@ -1871,7 +1880,8 @@ static int db_select_catree(Process *p, DbTable *tbl, Eterm tid, static int db_select_count_continue_catree(Process *p, DbTable *tbl, Eterm continuation, - Eterm *ret) + Eterm *ret, + enum DbIterSafety* safety_p) { int result; CATreeRootIterator iter; @@ -1885,7 +1895,8 @@ static int db_select_count_continue_catree(Process *p, } static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) + Eterm pattern, Eterm *ret, + enum DbIterSafety safety) { int result; CATreeRootIterator iter; @@ -1899,7 +1910,8 @@ static int db_select_count_catree(Process *p, DbTable *tbl, Eterm tid, static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, - int reversed, Eterm *ret) + int reversed, Eterm *ret, + enum DbIterSafety safety) { int result; CATreeRootIterator iter; @@ -1915,7 +1927,8 @@ static int db_select_chunk_catree(Process *p, DbTable *tbl, Eterm tid, static int db_select_delete_continue_catree(Process *p, DbTable *tbl, Eterm continuation, - Eterm *ret) + Eterm *ret, + enum DbIterSafety* safety_p) { DbTreeStack stack; TreeDbTerm * stack_array[STACK_NEED]; @@ -1931,7 +1944,8 @@ static int db_select_delete_continue_catree(Process *p, } static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) + Eterm pattern, Eterm *ret, + enum DbIterSafety safety) { DbTreeStack stack; TreeDbTerm * stack_array[STACK_NEED]; @@ -1948,7 +1962,8 @@ static int db_select_delete_catree(Process *p, DbTable *tbl, Eterm tid, } static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) + Eterm pattern, Eterm *ret, + enum DbIterSafety safety_p) { int result; CATreeRootIterator iter; @@ -1961,7 +1976,8 @@ static int db_select_replace_catree(Process *p, DbTable *tbl, Eterm tid, } static int db_select_replace_continue_catree(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret) + Eterm continuation, Eterm *ret, + enum DbIterSafety* safety_p) { int result; CATreeRootIterator iter; diff --git a/erts/emulator/beam/erl_db_hash.c b/erts/emulator/beam/erl_db_hash.c index 426c7d2d48..fac4703620 100644 --- a/erts/emulator/beam/erl_db_hash.c +++ b/erts/emulator/beam/erl_db_hash.c @@ -404,26 +404,31 @@ static int db_slot_hash(Process *p, DbTable *tbl, static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, - int reverse, Eterm *ret); + int reverse, Eterm *ret, enum DbIterSafety); static int db_select_hash(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, int reverse, Eterm *ret); + Eterm pattern, int reverse, Eterm *ret, + enum DbIterSafety); static int db_select_continue_hash(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret); + Eterm pattern, Eterm *ret, enum DbIterSafety); static int db_select_count_continue_hash(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); - + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret); + Eterm pattern, Eterm *ret, + enum DbIterSafety); static int db_select_delete_continue_hash(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret); + Eterm pattern, Eterm *ret, enum DbIterSafety); static int db_select_replace_continue_hash(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_take_hash(Process *, DbTable *, Eterm, Eterm *); static void db_print_hash(fmtfn_t to, @@ -535,7 +540,7 @@ DbTableMethod db_hash = db_select_chunk_hash, db_select_hash, db_select_delete_hash, - db_select_continue_hash, /* hmm continue_hash? */ + db_select_continue_hash, db_select_delete_continue_hash, db_select_count_hash, db_select_count_continue_hash, @@ -1154,8 +1159,9 @@ static int db_slot_hash(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret) * Match traversal callbacks */ -typedef struct match_callbacks_t_ match_callbacks_t; -struct match_callbacks_t_ + +typedef struct traverse_context_t_ traverse_context_t; +struct traverse_context_t_ { /* Called when no match is possible. * context_ptr: Pointer to context @@ -1163,7 +1169,7 @@ struct match_callbacks_t_ * * Both the direct return value and 'ret' are used as the traversal function return values. */ - int (*on_nothing_can_match)(match_callbacks_t* ctx, Eterm* ret); + int (*on_nothing_can_match)(traverse_context_t* ctx, Eterm* ret); /* Called for each match result. * context_ptr: Pointer to context @@ -1174,7 +1180,7 @@ struct match_callbacks_t_ * * Should return 1 for successful match, 0 otherwise. */ - int (*on_match_res)(match_callbacks_t* ctx, Sint slot_ix, + int (*on_match_res)(traverse_context_t* ctx, Sint slot_ix, HashDbTerm*** current_ptr_ptr, Eterm match_res); /* Called when either we've matched enough elements in this cycle or EOT was reached. @@ -1188,7 +1194,7 @@ struct match_callbacks_t_ * Both the direct return value and 'ret' are used as the traversal function return values. * If *mpp is set to NULL, it won't be deallocated (useful for trapping.) */ - int (*on_loop_ended)(match_callbacks_t* ctx, Sint slot_ix, Sint got, + int (*on_loop_ended)(traverse_context_t* ctx, Sint slot_ix, Sint got, Sint iterations_left, Binary** mpp, Eterm* ret); /* Called when it's time to trap @@ -1201,16 +1207,21 @@ struct match_callbacks_t_ * Both the direct return value and 'ret' are used as the traversal function return values. * If *mpp is set to NULL, it won't be deallocated (useful for trapping.) */ - int (*on_trap)(match_callbacks_t* ctx, Sint slot_ix, Sint got, Binary** mpp, + int (*on_trap)(traverse_context_t* ctx, Sint slot_ix, Sint got, Binary** mpp, Eterm* ret); + Process* p; + DbTableHash* tb; + Eterm tid; + Eterm* prev_continuation_tptr; + enum DbIterSafety safety; }; /* * Begin hash table match traversal */ -static int match_traverse(Process* p, DbTableHash* tb, +static int match_traverse(traverse_context_t* ctx, Eterm pattern, extra_match_validator_t extra_match_validator, /* Optional */ Sint chunk_size, /* If 0, no chunking */ @@ -1218,9 +1229,9 @@ static int match_traverse(Process* p, DbTableHash* tb, Eterm** hpp, /* Heap */ int lock_for_write, /* Set to 1 if we're going to delete or modify existing terms */ - match_callbacks_t* ctx, Eterm* ret) { + DbTableHash* tb = ctx->tb; Sint slot_ix; /* Slot index */ HashDbTerm** current_ptr; /* Refers to either the bucket pointer or * the 'next' pointer in the previous term @@ -1287,7 +1298,7 @@ static int match_traverse(Process* p, DbTableHash* tb, for(;;) { if (*current_ptr != NULL) { if (!is_pseudo_deleted(*current_ptr)) { - match_res = db_match_dbterm(&tb->common, p, mpi.mp, + match_res = db_match_dbterm(&tb->common, ctx->p, mpi.mp, &(*current_ptr)->dbterm, hpp, 2); saved_current = *current_ptr; if (ctx->on_match_res(ctx, slot_ix, ¤t_ptr, match_res)) { @@ -1352,7 +1363,7 @@ done: /* * Continue hash table match traversal */ -static int match_traverse_continue(Process* p, DbTableHash* tb, +static int match_traverse_continue(traverse_context_t* ctx, Sint chunk_size, /* If 0, no chunking */ Sint iterations_left, /* Nr. of iterations left */ Eterm** hpp, /* Heap */ @@ -1361,9 +1372,9 @@ static int match_traverse_continue(Process* p, DbTableHash* tb, Binary** mpp, /* Existing match program */ int lock_for_write, /* Set to 1 if we're going to delete or modify existing terms */ - match_callbacks_t* ctx, Eterm* ret) { + DbTableHash* tb = ctx->tb; HashDbTerm** current_ptr; /* Refers to either the bucket pointer or * the 'next' pointer in the previous term */ @@ -1406,7 +1417,7 @@ static int match_traverse_continue(Process* p, DbTableHash* tb, for(;;) { if (*current_ptr != NULL) { if (!is_pseudo_deleted(*current_ptr)) { - match_res = db_match_dbterm(&tb->common, p, *mpp, + match_res = db_match_dbterm(&tb->common, ctx->p, *mpp, &(*current_ptr)->dbterm, hpp, 2); saved_current = *current_ptr; if (ctx->on_match_res(ctx, slot_ix, ¤t_ptr, match_res)) { @@ -1456,52 +1467,50 @@ done: */ static ERTS_INLINE int on_simple_trap(Export* trap_function, - Process* p, - DbTableHash* tb, - Eterm tid, - Eterm* prev_continuation_tptr, - Sint slot_ix, - Sint got, - Binary** mpp, - Eterm* ret) + traverse_context_t* ctx, + Sint slot_ix, + Sint got, + Binary** mpp, + Eterm* ret) { Eterm* hp; Eterm egot; Eterm mpb; Eterm continuation; - int is_first_trap = (prev_continuation_tptr == NULL); + int is_first_trap = (ctx->prev_continuation_tptr == NULL); size_t base_halloc_sz = (is_first_trap ? ERTS_MAGIC_REF_THING_SIZE : 0); - BUMP_ALL_REDS(p); + BUMP_ALL_REDS(ctx->p); if (IS_USMALL(0, got)) { - hp = HAllocX(p, base_halloc_sz + 5, ERTS_MAGIC_REF_THING_SIZE); + hp = HAllocX(ctx->p, base_halloc_sz + 6, ERTS_MAGIC_REF_THING_SIZE); egot = make_small(got); } else { - hp = HAllocX(p, base_halloc_sz + BIG_UINT_HEAP_SIZE + 5, + hp = HAllocX(ctx->p, base_halloc_sz + BIG_UINT_HEAP_SIZE + 6, ERTS_MAGIC_REF_THING_SIZE); egot = uint_to_big(got, hp); hp += BIG_UINT_HEAP_SIZE; } if (is_first_trap) { - if (is_atom(tid)) - tid = erts_db_make_tid(p, &tb->common); - mpb = erts_db_make_match_prog_ref(p, *mpp, &hp); + if (is_atom(ctx->tid)) + ctx->tid = erts_db_make_tid(ctx->p, &ctx->tb->common); + mpb = erts_db_make_match_prog_ref(ctx->p, *mpp, &hp); *mpp = NULL; /* otherwise the caller will destroy it */ } else { - ASSERT(!is_atom(tid)); - mpb = prev_continuation_tptr[3]; + ASSERT(!is_atom(ctx->tid)); + mpb = ctx->prev_continuation_tptr[3]; } - continuation = TUPLE4( + continuation = TUPLE5( hp, - tid, + ctx->tid, make_small(slot_ix), mpb, - egot); - ERTS_BIF_PREP_TRAP1(*ret, trap_function, p, continuation); + egot, + make_small(ctx->safety)); + ERTS_BIF_PREP_TRAP1(*ret, trap_function, ctx->p, continuation); return DB_ERROR_NONE; } @@ -1510,17 +1519,18 @@ static ERTS_INLINE int unpack_simple_continuation(Eterm continuation, Eterm* tid_ptr, Sint* slot_ix_p, Binary** mpp, - Sint* got_p) + Sint* got_p, + enum DbIterSafety* safety_p) { Eterm* tptr; ASSERT(is_tuple(continuation)); tptr = tuple_val(continuation); - if (arityval(*tptr) != 4) + if (*tptr != make_arityval(5)) return 1; - if (! is_small(tptr[2]) || !(is_big(tptr[4]) || is_small(tptr[4]))) { + if (!is_small(tptr[2]) || !(is_big(tptr[4]) || is_small(tptr[4])) + || !is_small(tptr[5])) return 1; - } *tptr_ptr = tptr; *tid_ptr = tptr[1]; @@ -1532,6 +1542,7 @@ static ERTS_INLINE int unpack_simple_continuation(Eterm continuation, else { *got_p = unsigned_val(tptr[4]); } + *safety_p = signed_val(tptr[5]); return 0; } @@ -1545,24 +1556,20 @@ static ERTS_INLINE int unpack_simple_continuation(Eterm continuation, #define MAX_SELECT_CHUNK_ITERATIONS 1000 typedef struct { - match_callbacks_t base; - Process* p; - DbTableHash* tb; - Eterm tid; + traverse_context_t base; Eterm* hp; Sint chunk_size; Eterm match_list; - Eterm* prev_continuation_tptr; } select_chunk_context_t; -static int select_chunk_on_nothing_can_match(match_callbacks_t* ctx_base, Eterm* ret) +static int select_chunk_on_nothing_can_match(traverse_context_t* ctx_base, Eterm* ret) { select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base; *ret = (ctx->chunk_size > 0 ? am_EOT : NIL); return DB_ERROR_NONE; } -static int select_chunk_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix, +static int select_chunk_on_match_res(traverse_context_t* ctx_base, Sint slot_ix, HashDbTerm*** current_ptr_ptr, Eterm match_res) { @@ -1574,7 +1581,7 @@ static int select_chunk_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix, return 0; } -static int select_chunk_on_loop_ended(match_callbacks_t* ctx_base, +static int select_chunk_on_loop_ended(traverse_context_t* ctx_base, Sint slot_ix, Sint got, Sint iterations_left, Binary** mpp, Eterm* ret) @@ -1590,7 +1597,7 @@ static int select_chunk_on_loop_ended(match_callbacks_t* ctx_base, } else { ASSERT(iterations_left < MAX_SELECT_CHUNK_ITERATIONS); - BUMP_REDS(ctx->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left); + BUMP_REDS(ctx->base.p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left); if (ctx->chunk_size) { Eterm continuation; Eterm rest = NIL; @@ -1609,14 +1616,14 @@ static int select_chunk_on_loop_ended(match_callbacks_t* ctx_base, been in 'user space' */ } if (rest != NIL || slot_ix >= 0) { /* Need more calls */ - Eterm tid = ctx->tid; - ctx->hp = HAllocX(ctx->p, + Eterm tid = ctx->base.tid; + ctx->hp = HAllocX(ctx->base.p, 3 + 7 + ERTS_MAGIC_REF_THING_SIZE, ERTS_MAGIC_REF_THING_SIZE); - mpb = erts_db_make_match_prog_ref(ctx->p, *mpp, &ctx->hp); + mpb = erts_db_make_match_prog_ref(ctx->base.p, *mpp, &ctx->hp); if (is_atom(tid)) - tid = erts_db_make_tid(ctx->p, - &ctx->tb->common); + tid = erts_db_make_tid(ctx->base.p, + &ctx->base.tb->common); continuation = TUPLE6( ctx->hp, tid, @@ -1631,7 +1638,7 @@ static int select_chunk_on_loop_ended(match_callbacks_t* ctx_base, } else { /* All data is exhausted */ if (ctx->match_list != NIL) { /* No more data to search but still a result to return to the caller */ - ctx->hp = HAlloc(ctx->p, 3); + ctx->hp = HAlloc(ctx->base.p, 3); *ret = TUPLE2(ctx->hp, ctx->match_list, am_EOT); return DB_ERROR_NONE; } else { /* Reached the end of the ttable with no data to return */ @@ -1645,7 +1652,7 @@ static int select_chunk_on_loop_ended(match_callbacks_t* ctx_base, } } -static int select_chunk_on_trap(match_callbacks_t* ctx_base, +static int select_chunk_on_trap(traverse_context_t* ctx_base, Sint slot_ix, Sint got, Binary** mpp, Eterm* ret) { @@ -1654,74 +1661,77 @@ static int select_chunk_on_trap(match_callbacks_t* ctx_base, Eterm continuation; Eterm* hp; - BUMP_ALL_REDS(ctx->p); + BUMP_ALL_REDS(ctx->base.p); - if (ctx->prev_continuation_tptr == NULL) { - Eterm tid = ctx->tid; + if (ctx->base.prev_continuation_tptr == NULL) { + Eterm tid = ctx->base.tid; /* First time we're trapping */ - hp = HAllocX(ctx->p, 7 + ERTS_MAGIC_REF_THING_SIZE, + hp = HAllocX(ctx->base.p, 8 + ERTS_MAGIC_REF_THING_SIZE, ERTS_MAGIC_REF_THING_SIZE); if (is_atom(tid)) - tid = erts_db_make_tid(ctx->p, &ctx->tb->common); - mpb = erts_db_make_match_prog_ref(ctx->p, *mpp, &hp); - continuation = TUPLE6( + tid = erts_db_make_tid(ctx->base.p, &ctx->base.tb->common); + mpb = erts_db_make_match_prog_ref(ctx->base.p, *mpp, &hp); + continuation = TUPLE7( hp, tid, make_small(slot_ix), make_small(ctx->chunk_size), mpb, ctx->match_list, - make_small(got)); + make_small(got), + make_small(ctx->base.safety)); *mpp = NULL; /* otherwise the caller will destroy it */ } else { /* Not the first time we're trapping; reuse continuation terms */ - hp = HAlloc(ctx->p, 7); - continuation = TUPLE6( + hp = HAlloc(ctx->base.p, 8); + continuation = TUPLE7( hp, - ctx->prev_continuation_tptr[1], + ctx->base.prev_continuation_tptr[1], make_small(slot_ix), - ctx->prev_continuation_tptr[3], - ctx->prev_continuation_tptr[4], + ctx->base.prev_continuation_tptr[3], + ctx->base.prev_continuation_tptr[4], ctx->match_list, - make_small(got)); + make_small(got), + make_small(ctx->base.safety)); } - ERTS_BIF_PREP_TRAP1(*ret, &ets_select_continue_exp, ctx->p, + ERTS_BIF_PREP_TRAP1(*ret, &ets_select_continue_exp, ctx->base.p, continuation); return DB_ERROR_NONE; } static int db_select_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, - int reverse, Eterm *ret) + int reverse, Eterm *ret, enum DbIterSafety safety) { - return db_select_chunk_hash(p, tbl, tid, pattern, 0, reverse, ret); + return db_select_chunk_hash(p, tbl, tid, pattern, 0, reverse, ret, safety); } static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, - int reverse, Eterm *ret) + int reverse, Eterm *ret, enum DbIterSafety safety) { select_chunk_context_t ctx; ctx.base.on_nothing_can_match = select_chunk_on_nothing_can_match; ctx.base.on_match_res = select_chunk_on_match_res; ctx.base.on_loop_ended = select_chunk_on_loop_ended; - ctx.base.on_trap = select_chunk_on_trap, - ctx.p = p; - ctx.tb = &tbl->hash; - ctx.tid = tid; + ctx.base.on_trap = select_chunk_on_trap; + ctx.base.p = p; + ctx.base.tb = &tbl->hash; + ctx.base.tid = tid; + ctx.base.prev_continuation_tptr = NULL; + ctx.base.safety = safety; ctx.hp = NULL; ctx.chunk_size = chunk_size; ctx.match_list = NIL; - ctx.prev_continuation_tptr = NULL; return match_traverse( - ctx.p, ctx.tb, + &ctx.base, pattern, NULL, ctx.chunk_size, MAX_SELECT_CHUNK_ITERATIONS, &ctx.hp, 0, - &ctx.base, ret); + ret); } /* @@ -1731,7 +1741,7 @@ static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid, */ static -int select_chunk_continue_on_loop_ended(match_callbacks_t* ctx_base, +int select_chunk_continue_on_loop_ended(traverse_context_t* ctx_base, Sint slot_ix, Sint got, Sint iterations_left, Binary** mpp, Eterm* ret) @@ -1742,14 +1752,14 @@ int select_chunk_continue_on_loop_ended(match_callbacks_t* ctx_base, Eterm* hp; ASSERT(iterations_left <= MAX_SELECT_CHUNK_ITERATIONS); - BUMP_REDS(ctx->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left); + BUMP_REDS(ctx->base.p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left); if (ctx->chunk_size) { Sint rest_size = 0; if (got > ctx->chunk_size) { /* Cannot write destructively here, the list may have been in user space */ - hp = HAlloc(ctx->p, (got - ctx->chunk_size) * 2); + hp = HAlloc(ctx->base.p, (got - ctx->chunk_size) * 2); while (got-- > ctx->chunk_size) { rest = CONS(hp, CAR(list_val(ctx->match_list)), rest); hp += 2; @@ -1758,13 +1768,13 @@ int select_chunk_continue_on_loop_ended(match_callbacks_t* ctx_base, } } if (rest != NIL || slot_ix >= 0) { - hp = HAlloc(ctx->p, 3 + 7); + hp = HAlloc(ctx->base.p, 3 + 7); continuation = TUPLE6( hp, - ctx->prev_continuation_tptr[1], + ctx->base.prev_continuation_tptr[1], make_small(slot_ix), - ctx->prev_continuation_tptr[3], - ctx->prev_continuation_tptr[4], + ctx->base.prev_continuation_tptr[3], + ctx->base.prev_continuation_tptr[4], rest, make_small(rest_size)); hp += 7; @@ -1772,7 +1782,7 @@ int select_chunk_continue_on_loop_ended(match_callbacks_t* ctx_base, return DB_ERROR_NONE; } else { if (ctx->match_list != NIL) { - hp = HAlloc(ctx->p, 3); + hp = HAlloc(ctx->base.p, 3); *ret = TUPLE2(hp, ctx->match_list, am_EOT); return DB_ERROR_NONE; } else { @@ -1786,10 +1796,11 @@ int select_chunk_continue_on_loop_ended(match_callbacks_t* ctx_base, } /* - * This is called when select traps + * This is called when ets:select/1/2/3 traps + * and for ets:select/1 with user continuation term. */ static int db_select_continue_hash(Process* p, DbTable* tbl, Eterm continuation, - Eterm* ret) + Eterm* ret, enum DbIterSafety* safety_p) { select_chunk_context_t ctx; Eterm* tptr; @@ -1805,7 +1816,13 @@ static int db_select_continue_hash(Process* p, DbTable* tbl, Eterm continuation, ASSERT(is_tuple(continuation)); tptr = tuple_val(continuation); - if (arityval(*tptr) != 6) + /* + * 6-tuple is select/1 user continuation term + * 7-tuple is select trap continuation + */ + if (*tptr == make_arityval(7) && is_small(tptr[7])) + *safety_p = signed_val(tptr[7]); + else if (*tptr != make_arityval(6)) goto badparam; if (!is_small(tptr[2]) || !is_small(tptr[3]) || @@ -1829,18 +1846,19 @@ static int db_select_continue_hash(Process* p, DbTable* tbl, Eterm continuation, ctx.base.on_match_res = select_chunk_on_match_res; ctx.base.on_loop_ended = select_chunk_continue_on_loop_ended; ctx.base.on_trap = select_chunk_on_trap; - ctx.p = p; - ctx.tb = &tbl->hash; - ctx.tid = tid; + ctx.base.p = p; + ctx.base.tb = &tbl->hash; + ctx.base.tid = tid; + ctx.base.prev_continuation_tptr = tptr; + ctx.base.safety = *safety_p; ctx.hp = NULL; ctx.chunk_size = chunk_size; ctx.match_list = match_list; - ctx.prev_continuation_tptr = tptr; return match_traverse_continue( - ctx.p, ctx.tb, ctx.chunk_size, - iterations_left, &ctx.hp, slot_ix, got, &mp, 0, - &ctx.base, ret); + &ctx.base, ctx.chunk_size, + iterations_left, &ctx.hp, slot_ix, got, &mp, 0, + ret); badparam: *ret = NIL; @@ -1858,84 +1876,73 @@ badparam: #define MAX_SELECT_COUNT_ITERATIONS 1000 -typedef struct { - match_callbacks_t base; - Process* p; - DbTableHash* tb; - Eterm tid; - Eterm* prev_continuation_tptr; -} select_count_context_t; - -static int select_count_on_nothing_can_match(match_callbacks_t* ctx_base, +static int select_count_on_nothing_can_match(traverse_context_t* ctx_base, Eterm* ret) { *ret = make_small(0); return DB_ERROR_NONE; } -static int select_count_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix, +static int select_count_on_match_res(traverse_context_t* ctx_base, Sint slot_ix, HashDbTerm*** current_ptr_ptr, Eterm match_res) { return (match_res == am_true); } -static int select_count_on_loop_ended(match_callbacks_t* ctx_base, +static int select_count_on_loop_ended(traverse_context_t* ctx, Sint slot_ix, Sint got, Sint iterations_left, Binary** mpp, Eterm* ret) { - select_count_context_t* ctx = (select_count_context_t*) ctx_base; ASSERT(iterations_left <= MAX_SELECT_COUNT_ITERATIONS); BUMP_REDS(ctx->p, MAX_SELECT_COUNT_ITERATIONS - iterations_left); *ret = erts_make_integer(got, ctx->p); return DB_ERROR_NONE; } -static int select_count_on_trap(match_callbacks_t* ctx_base, +static int select_count_on_trap(traverse_context_t* ctx, Sint slot_ix, Sint got, Binary** mpp, Eterm* ret) { - select_count_context_t* ctx = (select_count_context_t*) ctx_base; return on_simple_trap( - &ets_select_count_continue_exp, - ctx->p, - ctx->tb, - ctx->tid, - ctx->prev_continuation_tptr, + &ets_select_count_continue_exp, ctx, slot_ix, got, mpp, ret); } static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) + Eterm pattern, Eterm *ret, + enum DbIterSafety safety) { - select_count_context_t ctx; + traverse_context_t ctx; Sint iterations_left = MAX_SELECT_COUNT_ITERATIONS; Sint chunk_size = 0; - ctx.base.on_nothing_can_match = select_count_on_nothing_can_match; - ctx.base.on_match_res = select_count_on_match_res; - ctx.base.on_loop_ended = select_count_on_loop_ended; - ctx.base.on_trap = select_count_on_trap; + ctx.on_nothing_can_match = select_count_on_nothing_can_match; + ctx.on_match_res = select_count_on_match_res; + ctx.on_loop_ended = select_count_on_loop_ended; + ctx.on_trap = select_count_on_trap; ctx.p = p; ctx.tb = &tbl->hash; ctx.tid = tid; ctx.prev_continuation_tptr = NULL; + ctx.safety = safety; return match_traverse( - ctx.p, ctx.tb, + &ctx, pattern, NULL, chunk_size, iterations_left, NULL, 0, - &ctx.base, ret); + ret); } /* * This is called when select_count traps */ static int db_select_count_continue_hash(Process* p, DbTable* tbl, - Eterm continuation, Eterm* ret) + Eterm continuation, Eterm* ret, + enum DbIterSafety* safety_p) { - select_count_context_t ctx; + traverse_context_t ctx; Eterm* tptr; Eterm tid; Binary* mp; @@ -1944,24 +1951,26 @@ static int db_select_count_continue_hash(Process* p, DbTable* tbl, Sint chunk_size = 0; *ret = NIL; - if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) { + if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, + &got, safety_p)) { *ret = NIL; return DB_ERROR_BADPARAM; } - ctx.base.on_match_res = select_count_on_match_res; - ctx.base.on_loop_ended = select_count_on_loop_ended; - ctx.base.on_trap = select_count_on_trap; + ctx.on_match_res = select_count_on_match_res; + ctx.on_loop_ended = select_count_on_loop_ended; + ctx.on_trap = select_count_on_trap; ctx.p = p; ctx.tb = &tbl->hash; ctx.tid = tid; ctx.prev_continuation_tptr = tptr; + ctx.safety = *safety_p; return match_traverse_continue( - ctx.p, ctx.tb, chunk_size, + &ctx, chunk_size, MAX_SELECT_COUNT_ITERATIONS, NULL, slot_ix, got, &mp, 0, - &ctx.base, ret); + ret); } #undef MAX_SELECT_COUNT_ITERATIONS @@ -1976,24 +1985,20 @@ static int db_select_count_continue_hash(Process* p, DbTable* tbl, #define MAX_SELECT_DELETE_ITERATIONS 1000 typedef struct { - match_callbacks_t base; - Process* p; - DbTableHash* tb; - Eterm tid; - Eterm* prev_continuation_tptr; + traverse_context_t base; erts_aint_t fixated_by_me; Uint last_pseudo_delete; HashDbTerm* free_us; } select_delete_context_t; -static int select_delete_on_nothing_can_match(match_callbacks_t* ctx_base, +static int select_delete_on_nothing_can_match(traverse_context_t* ctx_base, Eterm* ret) { *ret = make_small(0); return DB_ERROR_NONE; } -static int select_delete_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix, +static int select_delete_on_match_res(traverse_context_t* ctx_base, Sint slot_ix, HashDbTerm*** current_ptr_ptr, Eterm match_res) { @@ -2003,9 +2008,9 @@ static int select_delete_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix, if (match_res != am_true) return 0; - if (NFIXED(ctx->tb) > ctx->fixated_by_me) { /* fixated by others? */ + if (NFIXED(ctx->base.tb) > ctx->fixated_by_me) { /* fixated by others? */ if (slot_ix != ctx->last_pseudo_delete) { - if (!add_fixed_deletion(ctx->tb, slot_ix, ctx->fixated_by_me)) + if (!add_fixed_deletion(ctx->base.tb, slot_ix, ctx->fixated_by_me)) goto do_erase; ctx->last_pseudo_delete = slot_ix; } @@ -2018,46 +2023,43 @@ static int select_delete_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix, del->next = ctx->free_us; ctx->free_us = del; } - erts_atomic_dec_nob(&ctx->tb->common.nitems); + erts_atomic_dec_nob(&ctx->base.tb->common.nitems); return 1; } -static int select_delete_on_loop_ended(match_callbacks_t* ctx_base, +static int select_delete_on_loop_ended(traverse_context_t* ctx_base, Sint slot_ix, Sint got, Sint iterations_left, Binary** mpp, Eterm* ret) { select_delete_context_t* ctx = (select_delete_context_t*) ctx_base; - free_term_list(ctx->tb, ctx->free_us); + free_term_list(ctx->base.tb, ctx->free_us); ctx->free_us = NULL; ASSERT(iterations_left <= MAX_SELECT_DELETE_ITERATIONS); - BUMP_REDS(ctx->p, MAX_SELECT_DELETE_ITERATIONS - iterations_left); + BUMP_REDS(ctx->base.p, MAX_SELECT_DELETE_ITERATIONS - iterations_left); if (got) { - try_shrink(ctx->tb); + try_shrink(ctx->base.tb); } - *ret = erts_make_integer(got, ctx->p); + *ret = erts_make_integer(got, ctx->base.p); return DB_ERROR_NONE; } -static int select_delete_on_trap(match_callbacks_t* ctx_base, +static int select_delete_on_trap(traverse_context_t* ctx_base, Sint slot_ix, Sint got, Binary** mpp, Eterm* ret) { select_delete_context_t* ctx = (select_delete_context_t*) ctx_base; - free_term_list(ctx->tb, ctx->free_us); + free_term_list(ctx->base.tb, ctx->free_us); ctx->free_us = NULL; return on_simple_trap( - &ets_select_delete_continue_exp, - ctx->p, - ctx->tb, - ctx->tid, - ctx->prev_continuation_tptr, + &ets_select_delete_continue_exp, &ctx->base, slot_ix, got, mpp, ret); } static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) + Eterm pattern, Eterm *ret, + enum DbIterSafety safety) { select_delete_context_t ctx; Sint chunk_size = 0; @@ -2066,27 +2068,29 @@ static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid, ctx.base.on_match_res = select_delete_on_match_res; ctx.base.on_loop_ended = select_delete_on_loop_ended; ctx.base.on_trap = select_delete_on_trap; - ctx.p = p; - ctx.tb = &tbl->hash; - ctx.tid = tid; - ctx.prev_continuation_tptr = NULL; - ctx.fixated_by_me = ctx.tb->common.is_thread_safe ? 0 : 1; /* TODO: something nicer */ + ctx.base.p = p; + ctx.base.tb = &tbl->hash; + ctx.base.tid = tid; + ctx.base.prev_continuation_tptr = NULL; + ctx.base.safety = safety; + ctx.fixated_by_me = ctx.base.tb->common.is_thread_safe ? 0 : 1; ctx.last_pseudo_delete = (Uint) -1; ctx.free_us = NULL; return match_traverse( - ctx.p, ctx.tb, + &ctx.base, pattern, NULL, chunk_size, MAX_SELECT_DELETE_ITERATIONS, NULL, 1, - &ctx.base, ret); + ret); } /* * This is called when select_delete traps */ static int db_select_delete_continue_hash(Process* p, DbTable* tbl, - Eterm continuation, Eterm* ret) + Eterm continuation, Eterm* ret, + enum DbIterSafety* safety_p) { select_delete_context_t ctx; Eterm* tptr; @@ -2096,7 +2100,8 @@ static int db_select_delete_continue_hash(Process* p, DbTable* tbl, Sint slot_ix; Sint chunk_size = 0; - if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) { + if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, + &got, safety_p)) { *ret = NIL; return DB_ERROR_BADPARAM; } @@ -2104,19 +2109,20 @@ static int db_select_delete_continue_hash(Process* p, DbTable* tbl, ctx.base.on_match_res = select_delete_on_match_res; ctx.base.on_loop_ended = select_delete_on_loop_ended; ctx.base.on_trap = select_delete_on_trap; - ctx.p = p; - ctx.tb = &tbl->hash; - ctx.tid = tid; - ctx.prev_continuation_tptr = tptr; - ctx.fixated_by_me = ONLY_WRITER(p, ctx.tb) ? 0 : 1; /* TODO: something nicer */ + ctx.base.p = p; + ctx.base.tb = &tbl->hash; + ctx.base.tid = tid; + ctx.base.prev_continuation_tptr = tptr; + ctx.base.safety = *safety_p; + ctx.fixated_by_me = ONLY_WRITER(p, ctx.base.tb) ? 0 : 1; ctx.last_pseudo_delete = (Uint) -1; ctx.free_us = NULL; return match_traverse_continue( - ctx.p, ctx.tb, chunk_size, + &ctx.base, chunk_size, MAX_SELECT_DELETE_ITERATIONS, NULL, slot_ix, got, &mp, 1, - &ctx.base, ret); + ret); } #undef MAX_SELECT_DELETE_ITERATIONS @@ -2130,26 +2136,17 @@ static int db_select_delete_continue_hash(Process* p, DbTable* tbl, #define MAX_SELECT_REPLACE_ITERATIONS 1000 -typedef struct { - match_callbacks_t base; - Process* p; - DbTableHash* tb; - Eterm tid; - Eterm* prev_continuation_tptr; -} select_replace_context_t; - -static int select_replace_on_nothing_can_match(match_callbacks_t* ctx_base, +static int select_replace_on_nothing_can_match(traverse_context_t* ctx_base, Eterm* ret) { *ret = make_small(0); return DB_ERROR_NONE; } -static int select_replace_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix, +static int select_replace_on_match_res(traverse_context_t* ctx, Sint slot_ix, HashDbTerm*** current_ptr_ptr, Eterm match_res) { - select_replace_context_t* ctx = (select_replace_context_t*) ctx_base; DbTableHash* tb = ctx->tb; HashDbTerm* new; HashDbTerm* next; @@ -2175,11 +2172,10 @@ static int select_replace_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix return 0; } -static int select_replace_on_loop_ended(match_callbacks_t* ctx_base, Sint slot_ix, +static int select_replace_on_loop_ended(traverse_context_t* ctx, Sint slot_ix, Sint got, Sint iterations_left, Binary** mpp, Eterm* ret) { - select_replace_context_t* ctx = (select_replace_context_t*) ctx_base; ASSERT(iterations_left <= MAX_SELECT_REPLACE_ITERATIONS); /* the more objects we've replaced, the more reductions we've consumed */ BUMP_REDS(ctx->p, @@ -2189,23 +2185,20 @@ static int select_replace_on_loop_ended(match_callbacks_t* ctx_base, Sint slot_i return DB_ERROR_NONE; } -static int select_replace_on_trap(match_callbacks_t* ctx_base, +static int select_replace_on_trap(traverse_context_t* ctx, Sint slot_ix, Sint got, Binary** mpp, Eterm* ret) { - select_replace_context_t* ctx = (select_replace_context_t*) ctx_base; return on_simple_trap( - &ets_select_replace_continue_exp, - ctx->p, - ctx->tb, - ctx->tid, - ctx->prev_continuation_tptr, + &ets_select_replace_continue_exp, ctx, slot_ix, got, mpp, ret); } -static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) +static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, + Eterm pattern, Eterm *ret, + enum DbIterSafety safety) { - select_replace_context_t ctx; + traverse_context_t ctx; Sint chunk_size = 0; /* Bag implementation presented both semantic consistency and performance issues, @@ -2213,29 +2206,31 @@ static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pat */ ASSERT(!(tbl->hash.common.status & DB_BAG)); - ctx.base.on_nothing_can_match = select_replace_on_nothing_can_match; - ctx.base.on_match_res = select_replace_on_match_res; - ctx.base.on_loop_ended = select_replace_on_loop_ended; - ctx.base.on_trap = select_replace_on_trap; + ctx.on_nothing_can_match = select_replace_on_nothing_can_match; + ctx.on_match_res = select_replace_on_match_res; + ctx.on_loop_ended = select_replace_on_loop_ended; + ctx.on_trap = select_replace_on_trap; ctx.p = p; ctx.tb = &tbl->hash; ctx.tid = tid; ctx.prev_continuation_tptr = NULL; return match_traverse( - ctx.p, ctx.tb, + &ctx, pattern, db_match_keeps_key, chunk_size, MAX_SELECT_REPLACE_ITERATIONS, NULL, 1, - &ctx.base, ret); + ret); } /* * This is called when select_replace traps */ -static int db_select_replace_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) +static int db_select_replace_continue_hash(Process* p, DbTable* tbl, + Eterm continuation, Eterm* ret, + enum DbIterSafety* safety_p) { - select_replace_context_t ctx; + traverse_context_t ctx; Eterm* tptr; Eterm tid ; Binary* mp; @@ -2244,25 +2239,27 @@ static int db_select_replace_continue_hash(Process* p, DbTable* tbl, Eterm conti Sint chunk_size = 0; *ret = NIL; - if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) { + if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, + &got, safety_p)) { *ret = NIL; return DB_ERROR_BADPARAM; } /* Proceed */ - ctx.base.on_match_res = select_replace_on_match_res; - ctx.base.on_loop_ended = select_replace_on_loop_ended; - ctx.base.on_trap = select_replace_on_trap; + ctx.on_match_res = select_replace_on_match_res; + ctx.on_loop_ended = select_replace_on_loop_ended; + ctx.on_trap = select_replace_on_trap; ctx.p = p; ctx.tb = &tbl->hash; ctx.tid = tid; ctx.prev_continuation_tptr = tptr; + ctx.safety = *safety_p; return match_traverse_continue( - ctx.p, ctx.tb, chunk_size, + &ctx, chunk_size, MAX_SELECT_REPLACE_ITERATIONS, NULL, slot_ix, got, &mp, 1, - &ctx.base, ret); + ret); } diff --git a/erts/emulator/beam/erl_db_tree.c b/erts/emulator/beam/erl_db_tree.c index fe57348700..f9ba04f399 100644 --- a/erts/emulator/beam/erl_db_tree.c +++ b/erts/emulator/beam/erl_db_tree.c @@ -397,24 +397,31 @@ static int db_erase_object_tree(DbTable *tbl, Eterm object,Eterm *ret); static int db_slot_tree(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret); static int db_select_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, int reversed, Eterm *ret); + Eterm pattern, int reversed, Eterm *ret, + enum DbIterSafety); static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret); + Eterm pattern, Eterm *ret, enum DbIterSafety); static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, - int reversed, Eterm *ret); + int reversed, Eterm *ret, enum DbIterSafety); static int db_select_continue_tree(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_select_count_continue_tree(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret); + Eterm pattern, Eterm *ret, + enum DbIterSafety); static int db_select_delete_continue_tree(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret); + Eterm pattern, Eterm *ret, + enum DbIterSafety); static int db_select_replace_continue_tree(Process *p, DbTable *tbl, - Eterm continuation, Eterm *ret); + Eterm continuation, Eterm *ret, + enum DbIterSafety*); static int db_take_tree(Process *, DbTable *, Eterm, Eterm *); static void db_print_tree(fmtfn_t to, void *to_arg, int show, DbTable *tbl); @@ -1160,7 +1167,8 @@ int db_select_continue_tree_common(Process *p, static int db_select_continue_tree(Process *p, DbTable *tbl, Eterm continuation, - Eterm *ret) + Eterm *ret, + enum DbIterSafety* safety_p) { DbTableTree *tb = &tbl->tree; return db_select_continue_tree_common(p, &tb->common, @@ -1297,7 +1305,8 @@ int db_select_tree_common(Process *p, DbTable *tb, } static int db_select_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, int reverse, Eterm *ret) + Eterm pattern, int reverse, Eterm *ret, + enum DbIterSafety safety) { return db_select_tree_common(p, tbl, tid, pattern, reverse, ret, &tbl->tree, NULL); @@ -1408,7 +1417,8 @@ int db_select_count_continue_tree_common(Process *p, static int db_select_count_continue_tree(Process *p, DbTable *tbl, Eterm continuation, - Eterm *ret) + Eterm *ret, + enum DbIterSafety* safety_p) { DbTableTree *tb = &tbl->tree; return db_select_count_continue_tree_common(p, tbl, @@ -1527,7 +1537,8 @@ int db_select_count_tree_common(Process *p, DbTable *tb, } static int db_select_count_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) + Eterm pattern, Eterm *ret, + enum DbIterSafety safety) { DbTableTree *tb = &tbl->tree; return db_select_count_tree_common(p, tbl, @@ -1704,7 +1715,7 @@ int db_select_chunk_tree_common(Process *p, DbTable *tb, static int db_select_chunk_tree(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, int reverse, - Eterm *ret) + Eterm *ret, enum DbIterSafety safety) { DbTableTree *tb = &tbl->tree; return db_select_chunk_tree_common(p, tbl, @@ -1813,7 +1824,8 @@ int db_select_delete_continue_tree_common(Process *p, static int db_select_delete_continue_tree(Process *p, DbTable *tbl, Eterm continuation, - Eterm *ret) + Eterm *ret, + enum DbIterSafety* safety_p) { DbTableTree *tb = &tbl->tree; ASSERT(!erts_atomic_read_nob(&tb->is_stack_busy)); @@ -1942,7 +1954,8 @@ int db_select_delete_tree_common(Process *p, DbTable *tbl, } static int db_select_delete_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) + Eterm pattern, Eterm *ret, + enum DbIterSafety safety) { DbTableTree *tb = &tbl->tree; return db_select_delete_tree_common(p, tbl, tid, pattern, ret, @@ -2052,7 +2065,8 @@ int db_select_replace_continue_tree_common(Process *p, static int db_select_replace_continue_tree(Process *p, DbTable *tbl, Eterm continuation, - Eterm *ret) + Eterm *ret, + enum DbIterSafety* safety_p) { return db_select_replace_continue_tree_common(p, tbl, continuation, ret, &tbl->tree, NULL); @@ -2177,7 +2191,8 @@ int db_select_replace_tree_common(Process *p, DbTable *tbl, } static int db_select_replace_tree(Process *p, DbTable *tbl, Eterm tid, - Eterm pattern, Eterm *ret) + Eterm pattern, Eterm *ret, + enum DbIterSafety safety) { return db_select_replace_tree_common(p, tbl, tid, pattern, ret, &tbl->tree, NULL); diff --git a/erts/emulator/beam/erl_db_util.h b/erts/emulator/beam/erl_db_util.h index e1af9210ea..e3d3c0e804 100644 --- a/erts/emulator/beam/erl_db_util.h +++ b/erts/emulator/beam/erl_db_util.h @@ -101,6 +101,14 @@ typedef struct { } u; } DbUpdateHandle; +/* How safe are we from double-hits or missed objects + * when iterating without fixation? + */ +enum DbIterSafety { + ITER_UNSAFE, /* Must fixate to be safe */ + ITER_SAFE_LOCKED, /* Safe while table is locked, not between trap calls */ + ITER_SAFE /* No need to fixate at all */ +}; typedef struct db_table_method { @@ -150,44 +158,53 @@ typedef struct db_table_method Eterm pattern, Sint chunk_size, int reverse, - Eterm* ret); + Eterm* ret, + enum DbIterSafety); int (*db_select)(Process* p, DbTable* tb, /* [in out] */ Eterm tid, Eterm pattern, int reverse, - Eterm* ret); + Eterm* ret, + enum DbIterSafety); int (*db_select_delete)(Process* p, DbTable* tb, /* [in out] */ Eterm tid, Eterm pattern, - Eterm* ret); + Eterm* ret, + enum DbIterSafety); int (*db_select_continue)(Process* p, DbTable* tb, /* [in out] */ Eterm continuation, - Eterm* ret); + Eterm* ret, + enum DbIterSafety*); int (*db_select_delete_continue)(Process* p, DbTable* tb, /* [in out] */ Eterm continuation, - Eterm* ret); + Eterm* ret, + enum DbIterSafety*); int (*db_select_count)(Process* p, DbTable* tb, /* [in out] */ Eterm tid, Eterm pattern, - Eterm* ret); + Eterm* ret, + enum DbIterSafety); int (*db_select_count_continue)(Process* p, DbTable* tb, /* [in out] */ Eterm continuation, - Eterm* ret); + Eterm* ret, + enum DbIterSafety*); int (*db_select_replace)(Process* p, DbTable* tb, /* [in out] */ Eterm tid, Eterm pattern, - Eterm* ret); + Eterm* ret, + enum DbIterSafety); int (*db_select_replace_continue)(Process* p, DbTable* tb, /* [in out] */ Eterm continuation, - Eterm* ret); + Eterm* ret, + enum DbIterSafety*); int (*db_take)(Process *, DbTable *, Eterm, Eterm *); SWord (*db_delete_all_objects)(Process* p, DbTable* db, SWord reds); diff --git a/lib/stdlib/doc/src/ets.xml b/lib/stdlib/doc/src/ets.xml index d2ac6a75b1..2cb677785d 100644 --- a/lib/stdlib/doc/src/ets.xml +++ b/lib/stdlib/doc/src/ets.xml @@ -642,12 +642,11 @@ Error: fun containing local Erlang function calls <p><marker id="info_2_safe_fixed_monotonic_time"/></p> <p><c>Item=safe_fixed|safe_fixed_monotonic_time, Value={FixationTime,Info}|false</c></p> - <p>If the table has been fixed using + <p>If the table is fixed using <seealso marker="#safe_fixtable/2"> <c>safe_fixtable/2</c></seealso>, the call returns a tuple where <c>FixationTime</c> is the - time when the table was first fixed by a process, which either - is or is not one of the processes it is fixed by now.</p> + last time when the table changed from unfixed to fixed.</p> <p>The format and value of <c>FixationTime</c> depends on <c>Item</c>:</p> <taglist> @@ -679,8 +678,15 @@ Error: fun containing local Erlang function calls table is fixed by now. <c>RefCount</c> is the value of the reference counter and it keeps track of how many times the table has been fixed by the process.</p> - <p>If the table never has been fixed, the call returns - <c>false</c>.</p> + <p>Table fixations are not limited to <seealso marker="#safe_fixtable/2"> + <c>safe_fixtable/2</c></seealso>. Temporary fixations may also + be done by for example <seealso marker="#traversal">traversing + functions</seealso> like <c>select</c> and <c>match</c>. Such + table fixations are automatically released before the + corresponding functions returns, but they may be seen by a + concurrent call to + <c>ets:info(T,safe_fixed|safe_fixed_monotonic_time)</c>.</p> + <p>If the table is not fixed at all, the call returns <c>false</c>.</p> </item> <item> <p><c>Item=stats, Value=tuple()</c></p> diff --git a/lib/stdlib/test/ets_SUITE.erl b/lib/stdlib/test/ets_SUITE.erl index 7703198c4c..8561491d50 100644 --- a/lib/stdlib/test/ets_SUITE.erl +++ b/lib/stdlib/test/ets_SUITE.erl @@ -56,6 +56,7 @@ -export([t_match_spec_run/1]). -export([t_bucket_disappears/1]). -export([t_named_select/1]). +-export([select_fixtab_owner_change/1]). -export([otp_5340/1]). -export([otp_6338/1]). -export([otp_6842_select_1000/1]). @@ -130,7 +131,7 @@ all() -> t_insert_list, t_test_ms, t_select_delete, t_select_replace, t_select_replace_next_bug, t_ets_dets, memory, t_select_reverse, t_bucket_disappears, - t_named_select, + t_named_select, select_fixtab_owner_change, select_fail, t_insert_new, t_repair_continuation, otp_5340, otp_6338, otp_6842_select_1000, otp_7665, select_mbuf_trapping, @@ -249,7 +250,64 @@ t_named_select_do(Opts) -> verify_etsmem(EtsMem). +%% Verify select and friends release fixtab as they should +%% even when owneship is changed between traps. +select_fixtab_owner_change(_Config) -> + T = ets:new(xxx, [protected]), + NKeys = 2000, + [ets:insert(T,{K,K band 7}) || K <- lists:seq(1,NKeys)], + %% Buddy and Papa will ping-pong table ownership between them + %% and the aim is to give Buddy the table when he is + %% in the middle of a yielding select* call. + {Buddy,_} = spawn_opt(fun() -> sfoc_buddy_loop(T, 1, undefined) end, + [link,monitor]), + + sfoc_papa_loop(T, Buddy), + + receive {'DOWN', _, process, Buddy, _} -> ok end, + ets:delete(T), + ok. + +sfoc_buddy_loop(T, I, State0) -> + receive + {'ETS-TRANSFER', T, Papa, _} -> + ets:give_away(T, Papa, State0), + case State0 of + done -> + ok; + _ -> + State1 = sfoc_traverse(T, I, State0), + %% Verify no fixation left + {I, false} = {I, ets:info(T, safe_fixed_monotonic_time)}, + sfoc_buddy_loop(T, I+1, State1) + end + end. + +sfoc_papa_loop(T, Buddy) -> + ets:give_away(T, Buddy, "Catch!"), + receive + {'ETS-TRANSFER', T, Buddy, State} -> + case State of + done -> + ok; + _ -> + sfoc_papa_loop(T, Buddy) + end + end. + +sfoc_traverse(T, 1, S) -> + ets:select(T, [{{'$1',7}, [], ['$1']}]), S; +sfoc_traverse(T, 2, S) -> + 0 = ets:select_count(T, [{{'$1',7}, [], [false]}]), S; +sfoc_traverse(T, 3, _) -> + Limit = ets:info(T, size) div 2, + {_, Continuation} = ets:select(T, [{{'$1',7}, [], ['$1']}], + Limit), + Continuation; +sfoc_traverse(_T, 4, Continuation) -> + _ = ets:select(Continuation), + done. %% Check ets:match_spec_run/2. t_match_spec_run(Config) when is_list(Config) -> |