aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_db_hash.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_db_hash.c')
-rw-r--r--erts/emulator/beam/erl_db_hash.c2574
1 files changed, 1376 insertions, 1198 deletions
diff --git a/erts/emulator/beam/erl_db_hash.c b/erts/emulator/beam/erl_db_hash.c
index 390369fdb9..47d304f860 100644
--- a/erts/emulator/beam/erl_db_hash.c
+++ b/erts/emulator/beam/erl_db_hash.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 1998-2016. All Rights Reserved.
+ * Copyright Ericsson AB 1998-2017. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -84,25 +84,28 @@
#include "erl_db_hash.h"
-#ifdef MYDEBUG /* Will fail test case ets_SUITE:memory */
-# define IF_DEBUG(x) x
-# define MY_ASSERT(x) ASSERT(x)
-#else
-# define IF_DEBUG(x)
-# define MY_ASSERT(x)
-#endif
-
/*
* The following symbols can be manipulated to "tune" the linear hash array
*/
-#define CHAIN_LEN 6 /* Medium bucket chain len */
+#define GROW_LIMIT(NACTIVE) ((NACTIVE)*1)
+#define SHRINK_LIMIT(NACTIVE) ((NACTIVE) / 2)
+
+/*
+** We want the first mandatory segment to be small (to reduce minimal footprint)
+** and larger extra segments (to reduce number of alloc/free calls).
+*/
-/* Number of slots per segment */
-#define SEGSZ_EXP 8
-#define SEGSZ (1 << SEGSZ_EXP)
-#define SEGSZ_MASK (SEGSZ-1)
+/* Number of slots in first segment */
+#define FIRST_SEGSZ_EXP 8
+#define FIRST_SEGSZ (1 << FIRST_SEGSZ_EXP)
+#define FIRST_SEGSZ_MASK (FIRST_SEGSZ - 1)
-#define NSEG_1 2 /* Size of first segment table (must be at least 2) */
+/* Number of slots per extra segment */
+#define EXT_SEGSZ_EXP 11
+#define EXT_SEGSZ (1 << EXT_SEGSZ_EXP)
+#define EXT_SEGSZ_MASK (EXT_SEGSZ-1)
+
+#define NSEG_1 (ErtsSizeofMember(DbTableHash,first_segtab) / sizeof(struct segment*))
#define NSEG_2 256 /* Size of second segment table */
#define NSEG_INC 128 /* Number of segments to grow after that */
@@ -123,7 +126,9 @@
#define NACTIVE(tb) ((int)erts_smp_atomic_read_nob(&(tb)->nactive))
#define NITEMS(tb) ((int)erts_smp_atomic_read_nob(&(tb)->common.nitems))
-#define BUCKET(tb, i) SEGTAB(tb)[(i) >> SEGSZ_EXP]->buckets[(i) & SEGSZ_MASK]
+#define SLOT_IX_TO_SEG_IX(i) (((i)+(EXT_SEGSZ-FIRST_SEGSZ)) >> EXT_SEGSZ_EXP)
+
+#define BUCKET(tb, i) SEGTAB(tb)[SLOT_IX_TO_SEG_IX(i)]->buckets[(i) & EXT_SEGSZ_MASK]
/*
* When deleting a table, the number of records to delete.
@@ -184,11 +189,12 @@ static ERTS_INLINE int add_fixed_deletion(DbTableHash* tb, int ix,
/* optimised version of make_hash (normal case? atomic key) */
#define MAKE_HASH(term) \
((is_atom(term) ? (atom_tab(atom_val(term))->slot.bucket.hvalue) : \
- make_internal_hash(term)) % MAX_HASH)
+ make_internal_hash(term, 0)) % MAX_HASH)
#ifdef ERTS_SMP
# define DB_HASH_LOCK_MASK (DB_HASH_LOCK_CNT-1)
# define GET_LOCK(tb,hval) (&(tb)->locks->lck_vec[(hval) & DB_HASH_LOCK_MASK].lck)
+# define GET_LOCK_MAYBE(tb,hval) ((tb)->common.is_thread_safe ? NULL : GET_LOCK(tb,hval))
/* Fine grained read lock */
static ERTS_INLINE erts_smp_rwmtx_t* RLOCK_HASH(DbTableHash* tb, HashValue hval)
@@ -318,27 +324,23 @@ struct mp_info {
/* A table segment */
struct segment {
- HashDbTerm* buckets[SEGSZ];
-#ifdef MYDEBUG
- int is_ext_segment;
-#endif
+ HashDbTerm* buckets[1];
};
+#define SIZEOF_SEGMENT(N) \
+ (offsetof(struct segment,buckets) + sizeof(HashDbTerm*)*(N))
-/* A segment that also contains a segment table */
-struct ext_segment {
- struct segment s; /* The segment itself. Must be first */
-
+/* An extended segment table */
+struct ext_segtab {
+#ifdef ERTS_SMP
+ ErtsThrPrgrLaterOp lop;
+#endif
struct segment** prev_segtab; /* Used when table is shrinking */
- int nsegs; /* Size of segtab */
+ int prev_nsegs; /* Size of prev_segtab */
+ int nsegs; /* Size of this segtab */
struct segment* segtab[1]; /* The segment table */
};
-#define SIZEOF_EXTSEG(NSEGS) \
- (offsetof(struct ext_segment,segtab) + sizeof(struct segment*)*(NSEGS))
-
-#if defined(DEBUG) || defined(VALGRIND)
-# define EXTSEG(SEGTAB_PTR) \
- ((struct ext_segment*) (((char*)(SEGTAB_PTR)) - offsetof(struct ext_segment,segtab)))
-#endif
+#define SIZEOF_EXT_SEGTAB(NSEGS) \
+ (offsetof(struct ext_segtab,segtab) + sizeof(struct segment*)*(NSEGS))
static ERTS_INLINE void SET_SEGTAB(DbTableHash* tb,
@@ -348,53 +350,28 @@ static ERTS_INLINE void SET_SEGTAB(DbTableHash* tb,
erts_smp_atomic_set_wb(&tb->segtab, (erts_aint_t) segtab);
else
erts_smp_atomic_set_nob(&tb->segtab, (erts_aint_t) segtab);
-#ifdef VALGRIND
- tb->top_ptr_to_segment_with_active_segtab = EXTSEG(segtab);
-#endif
}
-
-/* How the table segments relate to each other:
-
- ext_segment: ext_segment: "plain" segment
- #=================# #================# #=============#
- | bucket[0] |<--+ +------->| bucket[256] | +->| bucket[512] |
- | bucket[1] | | | | [257] | | | [513] |
- : : | | : : | : :
- | bucket[255] | | | | [511] | | | [767] |
- |-----------------| | | |----------------| | #=============#
- | prev_segtab=NULL| | | +--<---prev_segtab | |
- | nsegs = 2 | | | | | nsegs = 256 | |
-+->| segtab[0] -->-------+---|---|--<---segtab[0] |<-+ |
-| | segtab[1] -->-----------+---|--<---segtab[1] | | |
-| #=================# | | segtab[2] -->-----|--+ ext_segment:
-| | : : | #================#
-+----------------<---------------+ | segtab[255] ->----|----->| bucket[255*256]|
- #================# | | |
- | : :
- | |----------------|
- +----<---prev_segtab |
- : :
-*/
-
+/* Used by select_replace on analyze_pattern */
+typedef int (*extra_match_validator_t)(int keypos, Eterm match, Eterm guard, Eterm body);
/*
** Forward decl's (static functions)
*/
-static struct ext_segment* alloc_ext_seg(DbTableHash* tb, unsigned seg_ix,
- struct segment** old_segtab);
-static int alloc_seg(DbTableHash *tb);
+static struct ext_segtab* alloc_ext_segtab(DbTableHash* tb, unsigned seg_ix);
+static void alloc_seg(DbTableHash *tb);
static int free_seg(DbTableHash *tb, int free_records);
static HashDbTerm* next(DbTableHash *tb, Uint *iptr, erts_smp_rwmtx_t** lck_ptr,
HashDbTerm *list);
static HashDbTerm* search_list(DbTableHash* tb, Eterm key,
HashValue hval, HashDbTerm *list);
-static void shrink(DbTableHash* tb, int nactive);
-static void grow(DbTableHash* tb, int nactive);
+static void shrink(DbTableHash* tb, int nitems);
+static void grow(DbTableHash* tb, int nitems);
static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2,
Uint sz, DbTableHash*);
-static int analyze_pattern(DbTableHash *tb, Eterm pattern,
- struct mp_info *mpi);
+static int analyze_pattern(DbTableHash *tb, Eterm pattern,
+ extra_match_validator_t extra_validator, /* Optional callback */
+ struct mp_info *mpi);
/*
* Method interface functions
@@ -418,24 +395,29 @@ static int db_erase_object_hash(DbTable *tbl, Eterm object,Eterm *ret);
static int db_slot_hash(Process *p, DbTable *tbl,
Eterm slot_term, Eterm *ret);
-static int db_select_chunk_hash(Process *p, DbTable *tbl,
+static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, Sint chunk_size,
int reverse, Eterm *ret);
-static int db_select_hash(Process *p, DbTable *tbl,
+static int db_select_hash(Process *p, DbTable *tbl, Eterm tid,
Eterm pattern, int reverse, Eterm *ret);
-static int db_select_count_hash(Process *p, DbTable *tbl,
- Eterm pattern, Eterm *ret);
-static int db_select_delete_hash(Process *p, DbTable *tbl,
- Eterm pattern, Eterm *ret);
-
-static int db_select_continue_hash(Process *p, DbTable *tbl,
+static int db_select_continue_hash(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret);
-static int db_select_count_continue_hash(Process *p, DbTable *tbl,
+static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret);
+static int db_select_count_continue_hash(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret);
+static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret);
static int db_select_delete_continue_hash(Process *p, DbTable *tbl,
Eterm continuation, Eterm *ret);
+
+static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid,
+ Eterm pattern, Eterm *ret);
+static int db_select_replace_continue_hash(Process *p, DbTable *tbl,
+ Eterm continuation, Eterm *ret);
+
static int db_take_hash(Process *, DbTable *, Eterm, Eterm *);
static void db_print_hash(fmtfn_t to,
void *to_arg,
@@ -443,7 +425,7 @@ static void db_print_hash(fmtfn_t to,
DbTable *tbl);
static int db_free_table_hash(DbTable *tbl);
-static int db_free_table_continue_hash(DbTable *tbl);
+static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds);
static void db_foreach_offheap_hash(DbTable *,
@@ -463,9 +445,10 @@ db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle);
static ERTS_INLINE void try_shrink(DbTableHash* tb)
{
int nactive = NACTIVE(tb);
- if (nactive > SEGSZ && NITEMS(tb) < (nactive * CHAIN_LEN)
+ int nitems = NITEMS(tb);
+ if (nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive)
&& !IS_FIXED(tb)) {
- shrink(tb, nactive);
+ shrink(tb, nitems);
}
}
@@ -547,17 +530,14 @@ DbTableMethod db_hash =
db_select_delete_continue_hash,
db_select_count_hash,
db_select_count_continue_hash,
+ db_select_replace_hash,
+ db_select_replace_continue_hash,
db_take_hash,
db_delete_all_objects_hash,
db_free_table_hash,
db_free_table_continue_hash,
db_print_hash,
db_foreach_offheap_hash,
-#ifdef HARDDEBUG
- db_check_table_hash,
-#else
- NULL,
-#endif
db_lookup_dbterm_hash,
db_finalize_dbterm_hash
};
@@ -605,9 +585,10 @@ static void restore_fixdel(DbTableHash* tb, FixedDeletion* fixdel)
** Table interface routines ie what's called by the bif's
*/
-void db_unfix_table_hash(DbTableHash *tb)
+SWord db_unfix_table_hash(DbTableHash *tb)
{
FixedDeletion* fixdel;
+ SWord work = 0;
ERTS_SMP_LC_ASSERT(erts_smp_lc_rwmtx_is_rwlocked(&tb->common.rwlock)
|| (erts_smp_lc_rwmtx_is_rlocked(&tb->common.rwlock)
@@ -628,7 +609,7 @@ restart:
if (!IS_FIXED(tb)) {
goto restart; /* unfixed again! */
}
- return;
+ return work;
}
if (ix < NACTIVE(tb)) {
bp = &BUCKET(tb, ix);
@@ -638,6 +619,7 @@ restart:
if (b->hvalue == INVALID_HASH) {
*bp = b->next;
free_term(tb, b);
+ work++;
b = *bp;
} else {
bp = &b->next;
@@ -653,25 +635,32 @@ restart:
(void *) fx,
sizeof(FixedDeletion));
ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion));
+ work++;
}
/* ToDo: Maybe try grow/shrink the table as well */
+
+ return work;
}
int db_create_hash(Process *p, DbTable *tbl)
{
DbTableHash *tb = &tbl->hash;
- erts_smp_atomic_init_nob(&tb->szm, SEGSZ_MASK);
- erts_smp_atomic_init_nob(&tb->nactive, SEGSZ);
+ erts_smp_atomic_init_nob(&tb->szm, FIRST_SEGSZ_MASK);
+ erts_smp_atomic_init_nob(&tb->nactive, FIRST_SEGSZ);
erts_smp_atomic_init_nob(&tb->fixdel, (erts_aint_t)NULL);
erts_smp_atomic_init_nob(&tb->segtab, (erts_aint_t)NULL);
- SET_SEGTAB(tb, alloc_ext_seg(tb,0,NULL)->segtab);
+ SET_SEGTAB(tb, tb->first_segtab);
tb->nsegs = NSEG_1;
- tb->nslots = SEGSZ;
+ tb->nslots = FIRST_SEGSZ;
+ tb->first_segtab[0] = (struct segment*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
+ (DbTable *) tb,
+ SIZEOF_SEGMENT(FIRST_SEGSZ));
+ sys_memset(tb->first_segtab[0], 0, SIZEOF_SEGMENT(FIRST_SEGSZ));
- erts_smp_atomic_init_nob(&tb->is_resizing, 0);
#ifdef ERTS_SMP
+ erts_smp_atomic_init_nob(&tb->is_resizing, 0);
if (tb->common.type & DB_FINE_LOCKED) {
erts_smp_rwmtx_opt_t rwmtx_opt = ERTS_SMP_RWMTX_OPT_DEFAULT_INITER;
int i;
@@ -683,10 +672,10 @@ int db_create_hash(Process *p, DbTable *tbl)
(DbTable *) tb,
sizeof(DbTableHashFineLocks));
for (i=0; i<DB_HASH_LOCK_CNT; ++i) {
- erts_smp_rwmtx_init_opt_x(&tb->locks->lck_vec[i].lck, &rwmtx_opt,
- "db_hash_slot", make_small(i));
+ erts_smp_rwmtx_init_opt(&tb->locks->lck_vec[i].lck, &rwmtx_opt,
+ "db_hash_slot", tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB);
}
- /* This important property is needed to guarantee that the buckets
+ /* This important property is needed to guarantee the two buckets
* involved in a grow/shrink operation it protected by the same lock:
*/
ASSERT(erts_smp_atomic_read_nob(&tb->nactive) % DB_HASH_LOCK_CNT == 0);
@@ -862,11 +851,10 @@ Lnew:
WUNLOCK_HASH(lck);
{
int nactive = NACTIVE(tb);
- if (nitems > nactive * (CHAIN_LEN+1) && !IS_FIXED(tb)) {
- grow(tb, nactive);
+ if (nitems > GROW_LIMIT(nactive) && !IS_FIXED(tb)) {
+ grow(tb, nitems);
}
}
- CHECK_TABLES();
return DB_ERROR_NONE;
Ldone:
@@ -891,7 +879,6 @@ get_term_list(Process *p, DbTableHash *tb, Eterm key, HashValue hval,
}
}
copy = build_term_list(p, b1, b2, sz, tb);
- CHECK_TABLES();
if (bend) {
*bend = b2;
}
@@ -923,70 +910,6 @@ done:
RUNLOCK_HASH(lck);
return DB_ERROR_NONE;
}
-
-int db_get_element_array(DbTable *tbl,
- Eterm key,
- int ndex,
- Eterm *ret,
- int *num_ret)
-{
- DbTableHash *tb = &tbl->hash;
- HashValue hval;
- int ix;
- HashDbTerm* b1;
- int num = 0;
- int retval;
- erts_smp_rwmtx_t* lck;
-
- ASSERT(!IS_FIXED(tbl)); /* no support for fixed tables here */
-
- hval = MAKE_HASH(key);
- lck = RLOCK_HASH(tb, hval);
- ix = hash_to_ix(tb, hval);
- b1 = BUCKET(tb, ix);
-
- while(b1 != 0) {
- if (has_live_key(tb,b1,key,hval)) {
- if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) {
- HashDbTerm* b;
- HashDbTerm* b2 = b1->next;
-
- while(b2 != NULL && has_live_key(tb,b2,key,hval)) {
- if (ndex > arityval(b2->dbterm.tpl[0])) {
- retval = DB_ERROR_BADITEM;
- goto done;
- }
- b2 = b2->next;
- }
-
- b = b1;
- while(b != b2) {
- if (num < *num_ret) {
- ret[num++] = b->dbterm.tpl[ndex];
- } else {
- retval = DB_ERROR_NONE;
- goto done;
- }
- b = b->next;
- }
- *num_ret = num;
- }
- else {
- ASSERT(*num_ret > 0);
- ret[0] = b1->dbterm.tpl[ndex];
- *num_ret = 1;
- }
- retval = DB_ERROR_NONE;
- goto done;
- }
- b1 = b1->next;
- }
- retval = DB_ERROR_BADKEY;
-done:
- RUNLOCK_HASH(lck);
- return retval;
-}
-
static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret)
{
@@ -1079,54 +1002,6 @@ done:
}
/*
- * Very internal interface, removes elements of arity two from
- * BAG. Used for the PID meta table
- */
-int db_erase_bag_exact2(DbTable *tbl, Eterm key, Eterm value)
-{
- DbTableHash *tb = &tbl->hash;
- HashValue hval;
- int ix;
- HashDbTerm** bp;
- HashDbTerm* b;
- erts_smp_rwmtx_t* lck;
- int found = 0;
-
- hval = MAKE_HASH(key);
- lck = WLOCK_HASH(tb,hval);
- ix = hash_to_ix(tb, hval);
- bp = &BUCKET(tb, ix);
- b = *bp;
-
- ASSERT(!IS_FIXED(tb));
- ASSERT((tb->common.status & DB_BAG));
- ASSERT(!tb->common.compress);
-
- while(b != 0) {
- if (has_live_key(tb,b,key,hval)) {
- found = 1;
- if ((arityval(b->dbterm.tpl[0]) == 2) &&
- EQ(value, b->dbterm.tpl[2])) {
- *bp = b->next;
- free_term(tb, b);
- erts_smp_atomic_dec_nob(&tb->common.nitems);
- b = *bp;
- break;
- }
- } else if (found) {
- break;
- }
- bp = &b->next;
- b = b->next;
- }
- WUNLOCK_HASH(lck);
- if (found) {
- try_shrink(tb);
- }
- return DB_ERROR_NONE;
-}
-
-/*
** NB, this is for the db_erase/2 bif.
*/
int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret)
@@ -1273,816 +1148,1096 @@ static BIF_RETTYPE bif_trap1(Export *bif,
{
BIF_TRAP1(bif, p, p1);
}
-
+
+
/*
- * Continue collecting select matches, this may happen either due to a trap
- * or when the user calls ets:select/1
+ * Match traversal callbacks
*/
-static int db_select_continue_hash(Process *p,
- DbTable *tbl,
- Eterm continuation,
- Eterm *ret)
-{
- DbTableHash *tb = &tbl->hash;
- Sint slot_ix;
- Sint save_slot_ix;
- Sint chunk_size;
- int all_objects;
- Binary *mp;
- int num_left = 1000;
- HashDbTerm *current = 0;
- Eterm match_list;
- Eterm *hp;
- Eterm match_res;
- Sint got;
- Eterm *tptr;
- erts_smp_rwmtx_t* lck;
-#define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0);
+/* Called when no match is possible.
+ * context_ptr: Pointer to context
+ * ret: Pointer to traversal function term return.
+ *
+ * Both the direct return value and 'ret' are used as the traversal function return values.
+ */
+typedef int (*mtraversal_on_nothing_can_match_t)(void* context_ptr, Eterm* ret);
+
+/* Called for each match result.
+ * context_ptr: Pointer to context
+ * slot_ix: Current slot index
+ * current_ptr_ptr: Triple pointer to either the bucket or the 'next' pointer in the previous element;
+ * can be (carefully) used to adjust iteration when deleting or replacing elements.
+ * match_res: The result of running the match program against the current term.
+ *
+ * Should return 1 for successful match, 0 otherwise.
+ */
+typedef int (*mtraversal_on_match_res_t)(void* context_ptr, Sint slot_ix, HashDbTerm*** current_ptr_ptr,
+ Eterm match_res);
+
+/* Called when either we've matched enough elements in this cycle or EOT was reached.
+ * context_ptr: Pointer to context
+ * slot_ix: Current slot index
+ * got: How many elements have been matched so far
+ * iterations_left: Number of intended iterations (down from an initial max.) left in this traversal cycle
+ * mpp: Double pointer to the compiled match program
+ * ret: Pointer to traversal function term return.
+ *
+ * Both the direct return value and 'ret' are used as the traversal function return values.
+ * If *mpp is set to NULL, it won't be deallocated (useful for trapping.)
+ */
+typedef int (*mtraversal_on_loop_ended_t)(void* context_ptr, Sint slot_ix, Sint got,
+ Sint iterations_left, Binary** mpp, Eterm* ret);
+
+/* Called when it's time to trap
+ * context_ptr: Pointer to context
+ * slot_ix: Current slot index
+ * got: How many elements have been matched so far
+ * mpp: Double pointer to the compiled match program
+ * ret: Pointer to traversal function term return.
+ *
+ * Both the direct return value and 'ret' are used as the traversal function return values.
+ * If *mpp is set to NULL, it won't be deallocated (useful for trapping.)
+ */
+typedef int (*mtraversal_on_trap_t)(void* context_ptr, Sint slot_ix, Sint got, Binary** mpp, Eterm* ret);
- /* Decode continuation. We know it's a tuple but not the arity or anything else */
+/*
+ * Begin hash table match traversal
+ */
+static int match_traverse(Process* p, DbTableHash* tb,
+ Eterm pattern,
+ extra_match_validator_t extra_match_validator, /* Optional */
+ Sint chunk_size, /* If 0, no chunking */
+ Sint iterations_left, /* Nr. of iterations left */
+ Eterm** hpp, /* Heap */
+ int lock_for_write, /* Set to 1 if we're going to delete or
+ modify existing terms */
+ mtraversal_on_nothing_can_match_t on_nothing_can_match,
+ mtraversal_on_match_res_t on_match_res,
+ mtraversal_on_loop_ended_t on_loop_ended,
+ mtraversal_on_trap_t on_trap,
+ void* context_ptr, /* State for callbacks above */
+ Eterm* ret)
+{
+ Sint slot_ix; /* Slot index */
+ HashDbTerm** current_ptr; /* Refers to either the bucket pointer or
+ * the 'next' pointer in the previous term
+ */
+ HashDbTerm* saved_current; /* Helper to avoid double skip on match */
+ struct mp_info mpi;
+ unsigned current_list_pos = 0; /* Prefound buckets list index */
+ Eterm match_res;
+ Sint got = 0; /* Matched terms counter */
+ erts_smp_rwmtx_t* lck; /* Slot lock */
+ int ret_value;
+#ifdef ERTS_SMP
+ erts_smp_rwmtx_t* (*lock_hash_function)(DbTableHash*, HashValue)
+ = (lock_for_write ? WLOCK_HASH : RLOCK_HASH);
+ void (*unlock_hash_function)(erts_smp_rwmtx_t*)
+ = (lock_for_write ? WUNLOCK_HASH : RUNLOCK_HASH);
+#else
+ #define lock_hash_function(tb, hval) NULL
+ #define unlock_hash_function(lck) ((void)lck)
+#endif
+ Sint (*next_slot_function)(DbTableHash*, Uint, erts_smp_rwmtx_t**)
+ = (lock_for_write ? next_slot_w : next_slot);
- tptr = tuple_val(continuation);
+ if ((ret_value = analyze_pattern(tb, pattern, extra_match_validator, &mpi))
+ != DB_ERROR_NONE)
+ {
+ *ret = NIL;
+ goto done;
+ }
- if (arityval(*tptr) != 6)
- RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
-
- if (!is_small(tptr[2]) || !is_small(tptr[3]) || !is_binary(tptr[4]) ||
- !(is_list(tptr[5]) || tptr[5] == NIL) || !is_small(tptr[6]))
- RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
- if ((chunk_size = signed_val(tptr[3])) < 0)
- RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
- if (!(thing_subtag(*binary_val(tptr[4])) == REFC_BINARY_SUBTAG))
- RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
- mp = ((ProcBin *) binary_val(tptr[4]))->val;
- if (!IsMatchProgBinary(mp))
- RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
- all_objects = mp->flags & BIN_FLAG_ALL_OBJECTS;
- match_list = tptr[5];
- if ((got = signed_val(tptr[6])) < 0)
- RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
+ if (!mpi.something_can_match) {
+ /* Can't possibly match anything */
+ ret_value = on_nothing_can_match(context_ptr, ret);
+ goto done;
+ }
- slot_ix = signed_val(tptr[2]);
- if (slot_ix < 0 /* EOT */
- || (chunk_size && got >= chunk_size)) {
- goto done; /* Already got all or enough in the match_list */
+ if (mpi.all_objects) {
+ mpi.mp->intern.flags |= BIN_FLAG_ALL_OBJECTS;
}
- lck = RLOCK_HASH(tb,slot_ix);
- if (slot_ix >= NACTIVE(tb)) {
- RUNLOCK_HASH(lck);
- RET_TO_BIF(NIL,DB_ERROR_BADPARAM);
+ /*
+ * Look for initial slot / bucket
+ */
+ if (!mpi.key_given) {
+ /* Run this code if pattern is variable or GETKEY(pattern) */
+ /* is a variable */
+ slot_ix = 0;
+ lck = lock_hash_function(tb,slot_ix);
+ for (;;) {
+ ASSERT(slot_ix < NACTIVE(tb));
+ if (*(current_ptr = &BUCKET(tb,slot_ix)) != NULL) {
+ break;
+ }
+ slot_ix = next_slot_function(tb,slot_ix,&lck);
+ if (slot_ix == 0) {
+ ret_value = on_loop_ended(context_ptr, slot_ix, got, iterations_left, &mpi.mp, ret);
+ goto done;
+ }
+ }
+ } else {
+ /* We have at least one */
+ slot_ix = mpi.lists[current_list_pos].ix;
+ lck = lock_hash_function(tb, slot_ix);
+ current_ptr = mpi.lists[current_list_pos].bucket;
+ ASSERT(*current_ptr == BUCKET(tb,slot_ix));
+ ++current_list_pos;
}
- while ((current = BUCKET(tb,slot_ix)) == NULL) {
- slot_ix = next_slot(tb, slot_ix, &lck);
- if (slot_ix == 0) {
- slot_ix = -1; /* EOT */
- goto done;
- }
- }
+ /*
+ * Execute traversal cycle
+ */
for(;;) {
- if (current->hvalue != INVALID_HASH &&
- (match_res = db_match_dbterm(&tb->common, p, mp, all_objects,
- &current->dbterm, &hp, 2),
- is_value(match_res))) {
+ if (*current_ptr != NULL) {
+ if ((*current_ptr)->hvalue != INVALID_HASH) {
+ match_res = db_match_dbterm(&tb->common, p, mpi.mp, 0,
+ &(*current_ptr)->dbterm, hpp, 2);
+ saved_current = *current_ptr;
+ if (on_match_res(context_ptr, slot_ix, &current_ptr, match_res)) {
+ ++got;
+ }
+ --iterations_left;
+ if (*current_ptr != saved_current) {
+ /* Don't advance to next, the callback did it already */
+ continue;
+ }
+ }
+ current_ptr = &((*current_ptr)->next);
+ }
+ else if (mpi.key_given) { /* Key is bound */
+ unlock_hash_function(lck);
+ if (current_list_pos == mpi.num_lists) {
+ ret_value = on_loop_ended(context_ptr, -1, got, iterations_left, &mpi.mp, ret);
+ goto done;
+ } else {
+ slot_ix = mpi.lists[current_list_pos].ix;
+ lck = lock_hash_function(tb, slot_ix);
+ current_ptr = mpi.lists[current_list_pos].bucket;
+ ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix));
+ ++current_list_pos;
+ }
+ }
+ else { /* Key is variable */
+ if ((slot_ix = next_slot_function(tb,slot_ix,&lck)) == 0) {
+ slot_ix = -1;
+ break;
+ }
+ if (chunk_size && got >= chunk_size) {
+ unlock_hash_function(lck);
+ break;
+ }
+ if (iterations_left <= 0) {
+ unlock_hash_function(lck);
+ ret_value = on_trap(context_ptr, slot_ix, got, &mpi.mp, ret);
+ goto done;
+ }
+ current_ptr = &BUCKET(tb,slot_ix);
+ }
+ }
- match_list = CONS(hp, match_res, match_list);
- ++got;
- }
+ ret_value = on_loop_ended(context_ptr, slot_ix, got, iterations_left, &mpi.mp, ret);
- --num_left;
- save_slot_ix = slot_ix;
- if ((current = next(tb, (Uint*)&slot_ix, &lck, current)) == NULL) {
- slot_ix = -1; /* EOT */
- break;
- }
- if (slot_ix != save_slot_ix) {
- if (chunk_size && got >= chunk_size) {
- RUNLOCK_HASH(lck);
- break;
- }
- if (num_left <= 0 || MBUF(p)) {
- /*
- * We have either reached our limit, or just created some heap fragments.
- * Since many heap fragments will make the GC slower, trap and GC now.
- */
- RUNLOCK_HASH(lck);
- goto trap;
- }
- }
- }
done:
- BUMP_REDS(p, 1000 - num_left);
- if (chunk_size) {
- Eterm continuation;
- Eterm rest = NIL;
- Sint rest_size = 0;
-
- if (got > chunk_size) { /* Cannot write destructively here,
- the list may have
- been in user space */
- rest = NIL;
- hp = HAlloc(p, (got - chunk_size) * 2);
- while (got-- > chunk_size) {
- rest = CONS(hp, CAR(list_val(match_list)), rest);
- hp += 2;
- match_list = CDR(list_val(match_list));
- ++rest_size;
- }
- }
- if (rest != NIL || slot_ix >= 0) {
- hp = HAlloc(p,3+7);
- continuation = TUPLE6(hp, tptr[1], make_small(slot_ix),
- tptr[3], tptr[4], rest,
- make_small(rest_size));
- hp += 7;
- RET_TO_BIF(TUPLE2(hp, match_list, continuation),DB_ERROR_NONE);
- } else {
- if (match_list != NIL) {
- hp = HAlloc(p, 3);
- RET_TO_BIF(TUPLE2(hp, match_list, am_EOT),DB_ERROR_NONE);
- } else {
- RET_TO_BIF(am_EOT, DB_ERROR_NONE);
- }
- }
+ /* We should only jump directly to this label if
+ * we've already called on_nothing_can_match / on_loop_ended / on_trap
+ */
+ if (mpi.mp != NULL) {
+ erts_bin_free(mpi.mp);
}
- RET_TO_BIF(match_list,DB_ERROR_NONE);
-
-trap:
- BUMP_ALL_REDS(p);
-
- hp = HAlloc(p,7);
- continuation = TUPLE6(hp, tptr[1], make_small(slot_ix), tptr[3],
- tptr[4], match_list, make_small(got));
- RET_TO_BIF(bif_trap1(&ets_select_continue_exp, p,
- continuation),
- DB_ERROR_NONE);
-
-#undef RET_TO_BIF
-
-}
+ if (mpi.lists != mpi.dlists) {
+ erts_free(ERTS_ALC_T_DB_SEL_LIST,
+ (void *) mpi.lists);
+ }
+ return ret_value;
-static int db_select_hash(Process *p, DbTable *tbl,
- Eterm pattern, int reverse,
- Eterm *ret)
-{
- return db_select_chunk_hash(p, tbl, pattern, 0, reverse, ret);
+#ifndef SMP
+#undef lock_hash_function
+#undef unlock_hash_function
+#endif
}
-static int db_select_chunk_hash(Process *p, DbTable *tbl,
- Eterm pattern, Sint chunk_size,
- int reverse, /* not used */
- Eterm *ret)
+/*
+ * Continue hash table match traversal
+ */
+static int match_traverse_continue(Process* p, DbTableHash* tb,
+ Sint chunk_size, /* If 0, no chunking */
+ Sint iterations_left, /* Nr. of iterations left */
+ Eterm** hpp, /* Heap */
+ Sint slot_ix, /* Slot index to resume traversal from */
+ Sint got, /* Matched terms counter */
+ Binary** mpp, /* Existing match program */
+ int lock_for_write, /* Set to 1 if we're going to delete or
+ modify existing terms */
+ mtraversal_on_match_res_t on_match_res,
+ mtraversal_on_loop_ended_t on_loop_ended,
+ mtraversal_on_trap_t on_trap,
+ void* context_ptr, /* For callbacks */
+ Eterm* ret)
{
- DbTableHash *tb = &tbl->hash;
- struct mp_info mpi;
- Sint slot_ix;
- HashDbTerm *current = 0;
- unsigned current_list_pos = 0;
- Eterm match_list;
+ int all_objects = (*mpp)->intern.flags & BIN_FLAG_ALL_OBJECTS;
+ HashDbTerm** current_ptr; /* Refers to either the bucket pointer or
+ * the 'next' pointer in the previous term
+ */
+ HashDbTerm* saved_current; /* Helper to avoid double skip on match */
Eterm match_res;
- Eterm *hp;
- int num_left = 1000;
- Uint got = 0;
- Eterm continuation;
- int errcode;
- Eterm mpb;
erts_smp_rwmtx_t* lck;
+ int ret_value;
+#ifdef ERTS_SMP
+ erts_smp_rwmtx_t* (*lock_hash_function)(DbTableHash*, HashValue)
+ = (lock_for_write ? WLOCK_HASH : RLOCK_HASH);
+ void (*unlock_hash_function)(erts_smp_rwmtx_t*)
+ = (lock_for_write ? WUNLOCK_HASH : RUNLOCK_HASH);
+#else
+ #define lock_hash_function(tb, hval) NULL
+ #define unlock_hash_function(lck) ((void)lck)
+#endif
+ Sint (*next_slot_function)(DbTableHash* tb, Uint ix, erts_smp_rwmtx_t** lck_ptr)
+ = (lock_for_write ? next_slot_w : next_slot);
-
-#define RET_TO_BIF(Term,RetVal) do { \
- if (mpi.mp != NULL) { \
- erts_bin_free(mpi.mp); \
- } \
- if (mpi.lists != mpi.dlists) { \
- erts_free(ERTS_ALC_T_DB_SEL_LIST, \
- (void *) mpi.lists); \
- } \
- *ret = (Term); \
- return RetVal; \
- } while(0)
-
-
- if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) {
- RET_TO_BIF(NIL,errcode);
+ if (got < 0) {
+ *ret = NIL;
+ return DB_ERROR_BADPARAM;
}
- if (!mpi.something_can_match) {
- if (chunk_size) {
- RET_TO_BIF(am_EOT, DB_ERROR_NONE); /* We're done */
- }
- RET_TO_BIF(NIL, DB_ERROR_NONE);
- /* can't possibly match anything */
+ if (slot_ix < 0 /* EOT */
+ || (chunk_size && got >= chunk_size))
+ {
+ /* Already got all or enough in the match_list */
+ ret_value = on_loop_ended(context_ptr, slot_ix, got, iterations_left, mpp, ret);
+ goto done;
}
- if (!mpi.key_given) {
- /* Run this code if pattern is variable or GETKEY(pattern) */
- /* is a variable */
- slot_ix = 0;
- lck = RLOCK_HASH(tb,slot_ix);
- for (;;) {
- ASSERT(slot_ix < NACTIVE(tb));
- if ((current = BUCKET(tb,slot_ix)) != NULL) {
- break;
- }
- slot_ix = next_slot(tb,slot_ix,&lck);
- if (slot_ix == 0) {
- if (chunk_size) {
- RET_TO_BIF(am_EOT, DB_ERROR_NONE); /* We're done */
- }
- RET_TO_BIF(NIL,DB_ERROR_NONE);
- }
- }
- } else {
- /* We have at least one */
- slot_ix = mpi.lists[current_list_pos].ix;
- lck = RLOCK_HASH(tb, slot_ix);
- current = *(mpi.lists[current_list_pos].bucket);
- ASSERT(current == BUCKET(tb,slot_ix));
- ++current_list_pos;
+ lck = lock_hash_function(tb, slot_ix);
+ if (slot_ix >= NACTIVE(tb)) { /* Is this possible? */
+ unlock_hash_function(lck);
+ *ret = NIL;
+ ret_value = DB_ERROR_BADPARAM;
+ goto done;
}
- match_list = NIL;
-
+ /*
+ * Resume traversal cycle from where we left
+ */
+ current_ptr = &BUCKET(tb,slot_ix);
for(;;) {
- if (current != NULL) {
- if (current->hvalue != INVALID_HASH) {
- match_res = db_match_dbterm(&tb->common, p, mpi.mp, 0,
- &current->dbterm, &hp, 2);
- if (is_value(match_res)) {
- match_list = CONS(hp, match_res, match_list);
- ++got;
- }
- }
- current = current->next;
- }
- else if (mpi.key_given) { /* Key is bound */
- RUNLOCK_HASH(lck);
- if (current_list_pos == mpi.num_lists) {
- slot_ix = -1; /* EOT */
- goto done;
- } else {
- slot_ix = mpi.lists[current_list_pos].ix;
- lck = RLOCK_HASH(tb, slot_ix);
- current = *(mpi.lists[current_list_pos].bucket);
- ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix));
- ++current_list_pos;
- }
- }
- else { /* Key is variable */
- --num_left;
-
- if ((slot_ix=next_slot(tb,slot_ix,&lck)) == 0) {
- slot_ix = -1;
- break;
- }
- if (chunk_size && got >= chunk_size) {
- RUNLOCK_HASH(lck);
- break;
- }
- if (num_left <= 0 || MBUF(p)) {
- /*
- * We have either reached our limit, or just created some heap fragments.
- * Since many heap fragments will make the GC slower, trap and GC now.
- */
- RUNLOCK_HASH(lck);
- goto trap;
- }
- current = BUCKET(tb,slot_ix);
+ if (*current_ptr != NULL) {
+ if ((*current_ptr)->hvalue != INVALID_HASH) {
+ match_res = db_match_dbterm(&tb->common, p, *mpp, all_objects,
+ &(*current_ptr)->dbterm, hpp, 2);
+ saved_current = *current_ptr;
+ if (on_match_res(context_ptr, slot_ix, &current_ptr, match_res)) {
+ ++got;
+ }
+ --iterations_left;
+ if (*current_ptr != saved_current) {
+ /* Don't advance to next, the callback did it already */
+ continue;
+ }
+ }
+ current_ptr = &((*current_ptr)->next);
+ }
+ else {
+ if ((slot_ix=next_slot_function(tb,slot_ix,&lck)) == 0) {
+ slot_ix = -1;
+ break;
+ }
+ if (chunk_size && got >= chunk_size) {
+ unlock_hash_function(lck);
+ break;
+ }
+ if (iterations_left <= 0) {
+ unlock_hash_function(lck);
+ ret_value = on_trap(context_ptr, slot_ix, got, mpp, ret);
+ goto done;
+ }
+ current_ptr = &BUCKET(tb,slot_ix);
}
}
-done:
- BUMP_REDS(p, 1000 - num_left);
- if (chunk_size) {
- Eterm continuation;
- Eterm rest = NIL;
- Sint rest_size = 0;
-
- if (mpi.all_objects)
- (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS;
- if (got > chunk_size) { /* Split list in return value and 'rest' */
- Eterm tmp = match_list;
- rest = match_list;
- while (got-- > chunk_size + 1) {
- tmp = CDR(list_val(tmp));
- ++rest_size;
- }
- ++rest_size;
- match_list = CDR(list_val(tmp));
- CDR(list_val(tmp)) = NIL; /* Destructive, the list has never
- been in 'user space' */
- }
- if (rest != NIL || slot_ix >= 0) { /* Need more calls */
- hp = HAlloc(p,3+7+PROC_BIN_SIZE);
- mpb =db_make_mp_binary(p,(mpi.mp),&hp);
- if (mpi.all_objects)
- (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS;
- continuation = TUPLE6(hp, tb->common.id,make_small(slot_ix),
- make_small(chunk_size),
- mpb, rest,
- make_small(rest_size));
- mpi.mp = NULL; /*otherwise the return macro will destroy it */
- hp += 7;
- RET_TO_BIF(TUPLE2(hp, match_list, continuation),DB_ERROR_NONE);
- } else { /* All data is exhausted */
- if (match_list != NIL) { /* No more data to search but still a
- result to return to the caller */
- hp = HAlloc(p, 3);
- RET_TO_BIF(TUPLE2(hp, match_list, am_EOT),DB_ERROR_NONE);
- } else { /* Reached the end of the ttable with no data to return */
- RET_TO_BIF(am_EOT, DB_ERROR_NONE);
- }
- }
- }
- RET_TO_BIF(match_list,DB_ERROR_NONE);
-trap:
- BUMP_ALL_REDS(p);
- if (mpi.all_objects)
- (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS;
- hp = HAlloc(p,7+PROC_BIN_SIZE);
- mpb =db_make_mp_binary(p,(mpi.mp),&hp);
- continuation = TUPLE6(hp, tb->common.id, make_small(slot_ix),
- make_small(chunk_size),
- mpb, match_list,
- make_small(got));
- mpi.mp = NULL; /*otherwise the return macro will destroy it */
- RET_TO_BIF(bif_trap1(&ets_select_continue_exp, p,
- continuation),
- DB_ERROR_NONE);
-#undef RET_TO_BIF
+ ret_value = on_loop_ended(context_ptr, slot_ix, got, iterations_left, mpp, ret);
+
+done:
+ /* We should only jump directly to this label if
+ * we've already called on_loop_ended / on_trap
+ */
+ return ret_value;
+#ifndef SMP
+#undef lock_hash_function
+#undef unlock_hash_function
+#endif
}
-static int db_select_count_hash(Process *p,
- DbTable *tbl,
- Eterm pattern,
- Eterm *ret)
+
+/*
+ * Common traversal trapping/continuation code;
+ * used by select_count, select_delete and select_replace,
+ * as well as their continuation-handling counterparts.
+ */
+
+static ERTS_INLINE int on_mtraversal_simple_trap(Export* trap_function,
+ Process* p,
+ DbTableHash* tb,
+ Eterm tid,
+ Eterm* prev_continuation_tptr,
+ Sint slot_ix,
+ Sint got,
+ Binary** mpp,
+ Eterm* ret)
{
- DbTableHash *tb = &tbl->hash;
- struct mp_info mpi;
- Uint slot_ix = 0;
- HashDbTerm* current = NULL;
- unsigned current_list_pos = 0;
- Eterm *hp;
- int num_left = 1000;
- Uint got = 0;
- Eterm continuation;
- int errcode;
+ Eterm* hp;
Eterm egot;
Eterm mpb;
- erts_smp_rwmtx_t* lck;
-
-#define RET_TO_BIF(Term,RetVal) do { \
- if (mpi.mp != NULL) { \
- erts_bin_free(mpi.mp); \
- } \
- if (mpi.lists != mpi.dlists) { \
- erts_free(ERTS_ALC_T_DB_SEL_LIST, \
- (void *) mpi.lists); \
- } \
- *ret = (Term); \
- return RetVal; \
- } while(0)
-
-
- if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) {
- RET_TO_BIF(NIL,errcode);
- }
-
- if (!mpi.something_can_match) {
- RET_TO_BIF(make_small(0), DB_ERROR_NONE);
- /* can't possibly match anything */
- }
-
- if (!mpi.key_given) {
- /* Run this code if pattern is variable or GETKEY(pattern) */
- /* is a variable */
- slot_ix = 0;
- lck = RLOCK_HASH(tb,slot_ix);
- current = BUCKET(tb,slot_ix);
- } else {
- /* We have at least one */
- slot_ix = mpi.lists[current_list_pos].ix;
- lck = RLOCK_HASH(tb, slot_ix);
- current = *(mpi.lists[current_list_pos].bucket);
- ASSERT(current == BUCKET(tb,slot_ix));
- ++current_list_pos;
- }
+ Eterm continuation;
+ int is_first_trap = (prev_continuation_tptr == NULL);
+ size_t base_halloc_sz = (is_first_trap ? ERTS_MAGIC_REF_THING_SIZE : 0);
- for(;;) {
- if (current != NULL) {
- if (current->hvalue != INVALID_HASH) {
- if (db_match_dbterm(&tb->common, p, mpi.mp, 0,
- &current->dbterm, NULL,0) == am_true) {
- ++got;
- }
- --num_left;
- }
- current = current->next;
- }
- else { /* next bucket */
- if (mpi.key_given) { /* Key is bound */
- RUNLOCK_HASH(lck);
- if (current_list_pos == mpi.num_lists) {
- goto done;
- } else {
- slot_ix = mpi.lists[current_list_pos].ix;
- lck = RLOCK_HASH(tb, slot_ix);
- current = *(mpi.lists[current_list_pos].bucket);
- ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix));
- ++current_list_pos;
- }
- }
- else {
- if ((slot_ix=next_slot(tb,slot_ix,&lck)) == 0) {
- goto done;
- }
- if (num_left <= 0) {
- RUNLOCK_HASH(lck);
- goto trap;
- }
- current = BUCKET(tb,slot_ix);
- }
- }
- }
-done:
- BUMP_REDS(p, 1000 - num_left);
- RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE);
-trap:
BUMP_ALL_REDS(p);
if (IS_USMALL(0, got)) {
- hp = HAlloc(p, PROC_BIN_SIZE + 5);
+ hp = HAlloc(p, base_halloc_sz + 5);
egot = make_small(got);
}
else {
- hp = HAlloc(p, BIG_UINT_HEAP_SIZE + PROC_BIN_SIZE + 5);
+ hp = HAlloc(p, base_halloc_sz + BIG_UINT_HEAP_SIZE + 5);
egot = uint_to_big(got, hp);
hp += BIG_UINT_HEAP_SIZE;
}
- mpb = db_make_mp_binary(p,mpi.mp,&hp);
- continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix),
- mpb,
- egot);
- mpi.mp = NULL; /*otherwise the return macro will destroy it */
- RET_TO_BIF(bif_trap1(&ets_select_count_continue_exp, p,
- continuation),
- DB_ERROR_NONE);
-#undef RET_TO_BIF
+ if (is_first_trap) {
+ mpb = erts_db_make_match_prog_ref(p, *mpp, &hp);
+ *mpp = NULL; /* otherwise the caller will destroy it */
+ }
+ else {
+ mpb = prev_continuation_tptr[3];
+ }
+
+ continuation = TUPLE4(
+ hp,
+ tid,
+ make_small(slot_ix),
+ mpb,
+ egot);
+ *ret = bif_trap1(trap_function, p, continuation);
+ return DB_ERROR_NONE;
}
-static int db_select_delete_hash(Process *p,
- DbTable *tbl,
- Eterm pattern,
- Eterm *ret)
+static ERTS_INLINE int unpack_simple_mtraversal_continuation(Eterm continuation,
+ Eterm** tptr_ptr,
+ Eterm* tid_ptr,
+ Sint* slot_ix_p,
+ Binary** mpp,
+ Sint* got_p)
{
- DbTableHash *tb = &tbl->hash;
- struct mp_info mpi;
- Uint slot_ix = 0;
- HashDbTerm **current = NULL;
- unsigned current_list_pos = 0;
- Eterm *hp;
- int num_left = 1000;
- Uint got = 0;
- Eterm continuation;
- int errcode;
- Uint last_pseudo_delete = (Uint)-1;
- Eterm mpb;
- Eterm egot;
-#ifdef ERTS_SMP
- erts_aint_t fixated_by_me = tb->common.is_thread_safe ? 0 : 1; /* ToDo: something nicer */
-#else
- erts_aint_t fixated_by_me = 0;
-#endif
- erts_smp_rwmtx_t* lck;
-
-#define RET_TO_BIF(Term,RetVal) do { \
- if (mpi.mp != NULL) { \
- erts_bin_free(mpi.mp); \
- } \
- if (mpi.lists != mpi.dlists) { \
- erts_free(ERTS_ALC_T_DB_SEL_LIST, \
- (void *) mpi.lists); \
- } \
- *ret = (Term); \
- return RetVal; \
- } while(0)
-
+ Eterm* tptr;
+ ASSERT(is_tuple(continuation));
+ tptr = tuple_val(continuation);
+ if (arityval(*tptr) != 4)
+ return 1;
- if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) {
- RET_TO_BIF(NIL,errcode);
+ if (! is_small(tptr[2]) || !(is_big(tptr[4]) || is_small(tptr[4]))) {
+ return 1;
}
- if (!mpi.something_can_match) {
- RET_TO_BIF(make_small(0), DB_ERROR_NONE);
- /* can't possibly match anything */
+ *tptr_ptr = tptr;
+ *tid_ptr = tptr[1];
+ *slot_ix_p = unsigned_val(tptr[2]);
+ *mpp = erts_db_get_match_prog_binary_unchecked(tptr[3]);
+ if (is_big(tptr[4])) {
+ *got_p = big_to_uint32(tptr[4]);
}
+ else {
+ *got_p = unsigned_val(tptr[4]);
+ }
+ return 0;
+}
- if (!mpi.key_given) {
- /* Run this code if pattern is variable or GETKEY(pattern) */
- /* is a variable */
- lck = WLOCK_HASH(tb,slot_ix);
- current = &BUCKET(tb,slot_ix);
- } else {
- /* We have at least one */
- slot_ix = mpi.lists[current_list_pos].ix;
- lck = WLOCK_HASH(tb, slot_ix);
- current = mpi.lists[current_list_pos++].bucket;
- ASSERT(*current == BUCKET(tb,slot_ix));
+
+/*
+ *
+ * select / select_chunk match traversal
+ *
+ */
+
+#define MAX_SELECT_CHUNK_ITERATIONS 1000
+
+typedef struct {
+ Process* p;
+ DbTableHash* tb;
+ Eterm tid;
+ Eterm* hp;
+ Sint chunk_size;
+ Eterm match_list;
+ Eterm* prev_continuation_tptr;
+} mtraversal_select_chunk_context_t;
+
+static int mtraversal_select_chunk_on_nothing_can_match(void* context_ptr, Eterm* ret) {
+ mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr;
+ *ret = (sc_context_ptr->chunk_size > 0 ? am_EOT : NIL);
+ return DB_ERROR_NONE;
+}
+
+static int mtraversal_select_chunk_on_match_res(void* context_ptr, Sint slot_ix,
+ HashDbTerm*** current_ptr_ptr,
+ Eterm match_res)
+{
+ mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr;
+ if (is_value(match_res)) {
+ sc_context_ptr->match_list = CONS(sc_context_ptr->hp, match_res, sc_context_ptr->match_list);
+ return 1;
}
+ return 0;
+}
+static int mtraversal_select_chunk_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got,
+ Sint iterations_left, Binary** mpp, Eterm* ret)
+{
+ mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr;
+ Eterm mpb;
- for(;;) {
- if ((*current) == NULL) {
- if (mpi.key_given) { /* Key is bound */
- WUNLOCK_HASH(lck);
- if (current_list_pos == mpi.num_lists) {
- goto done;
- } else {
- slot_ix = mpi.lists[current_list_pos].ix;
- lck = WLOCK_HASH(tb, slot_ix);
- current = mpi.lists[current_list_pos].bucket;
- ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix));
- ++current_list_pos;
- }
- } else {
- if ((slot_ix=next_slot_w(tb,slot_ix,&lck)) == 0) {
- goto done;
- }
- if (num_left <= 0) {
- WUNLOCK_HASH(lck);
- goto trap;
- }
- current = &BUCKET(tb,slot_ix);
- }
- }
- else if ((*current)->hvalue == INVALID_HASH) {
- current = &((*current)->next);
- }
- else {
- int did_erase = 0;
- if (db_match_dbterm(&tb->common, p, mpi.mp, 0,
- &(*current)->dbterm, NULL, 0) == am_true) {
- HashDbTerm *del;
- if (NFIXED(tb) > fixated_by_me) { /* fixated by others? */
- if (slot_ix != last_pseudo_delete) {
- if (!add_fixed_deletion(tb, slot_ix, fixated_by_me))
- goto do_erase;
- last_pseudo_delete = slot_ix;
- }
- (*current)->hvalue = INVALID_HASH;
- } else {
- do_erase:
- del = *current;
- *current = (*current)->next;
- free_term(tb, del);
- did_erase = 1;
- }
- erts_smp_atomic_dec_nob(&tb->common.nitems);
- ++got;
- }
- --num_left;
- if (!did_erase) {
- current = &((*current)->next);
- }
- }
+ if (iterations_left == MAX_SELECT_CHUNK_ITERATIONS) {
+ /* We didn't get to iterate a single time, which means EOT */
+ ASSERT(sc_context_ptr->match_list == NIL);
+ *ret = (sc_context_ptr->chunk_size > 0 ? am_EOT : NIL);
+ return DB_ERROR_NONE;
}
-done:
- BUMP_REDS(p, 1000 - num_left);
- if (got) {
- try_shrink(tb);
+ else {
+ ASSERT(iterations_left < MAX_SELECT_CHUNK_ITERATIONS);
+ BUMP_REDS(sc_context_ptr->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left);
+ if (sc_context_ptr->chunk_size) {
+ Eterm continuation;
+ Eterm rest = NIL;
+ Sint rest_size = 0;
+
+ if (got > sc_context_ptr->chunk_size) { /* Split list in return value and 'rest' */
+ Eterm tmp = sc_context_ptr->match_list;
+ rest = sc_context_ptr->match_list;
+ while (got-- > sc_context_ptr->chunk_size + 1) {
+ tmp = CDR(list_val(tmp));
+ ++rest_size;
+ }
+ ++rest_size;
+ sc_context_ptr->match_list = CDR(list_val(tmp));
+ CDR(list_val(tmp)) = NIL; /* Destructive, the list has never
+ been in 'user space' */
+ }
+ if (rest != NIL || slot_ix >= 0) { /* Need more calls */
+ sc_context_ptr->hp = HAlloc(sc_context_ptr->p, 3 + 7 + ERTS_MAGIC_REF_THING_SIZE);
+ mpb = erts_db_make_match_prog_ref(sc_context_ptr->p, *mpp, &sc_context_ptr->hp);
+ continuation = TUPLE6(
+ sc_context_ptr->hp,
+ sc_context_ptr->tid,
+ make_small(slot_ix),
+ make_small(sc_context_ptr->chunk_size),
+ mpb, rest,
+ make_small(rest_size));
+ *mpp = NULL; /* Otherwise the caller will destroy it */
+ sc_context_ptr->hp += 7;
+ *ret = TUPLE2(sc_context_ptr->hp, sc_context_ptr->match_list, continuation);
+ return DB_ERROR_NONE;
+ } else { /* All data is exhausted */
+ if (sc_context_ptr->match_list != NIL) { /* No more data to search but still a
+ result to return to the caller */
+ sc_context_ptr->hp = HAlloc(sc_context_ptr->p, 3);
+ *ret = TUPLE2(sc_context_ptr->hp, sc_context_ptr->match_list, am_EOT);
+ return DB_ERROR_NONE;
+ } else { /* Reached the end of the ttable with no data to return */
+ *ret = am_EOT;
+ return DB_ERROR_NONE;
+ }
+ }
+ }
+ *ret = sc_context_ptr->match_list;
+ return DB_ERROR_NONE;
}
- RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE);
-trap:
- BUMP_ALL_REDS(p);
- if (IS_USMALL(0, got)) {
- hp = HAlloc(p, PROC_BIN_SIZE + 5);
- egot = make_small(got);
+}
+
+static int mtraversal_select_chunk_on_trap(void* context_ptr, Sint slot_ix, Sint got,
+ Binary** mpp, Eterm* ret)
+{
+ mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr;
+ Eterm mpb;
+ Eterm continuation;
+ Eterm* hp;
+
+ BUMP_ALL_REDS(sc_context_ptr->p);
+
+ if (sc_context_ptr->prev_continuation_tptr == NULL) {
+ /* First time we're trapping */
+ hp = HAlloc(sc_context_ptr->p, 7 + ERTS_MAGIC_REF_THING_SIZE);
+ mpb = erts_db_make_match_prog_ref(sc_context_ptr->p, *mpp, &hp);
+ continuation = TUPLE6(
+ hp,
+ sc_context_ptr->tid,
+ make_small(slot_ix),
+ make_small(sc_context_ptr->chunk_size),
+ mpb,
+ sc_context_ptr->match_list,
+ make_small(got));
+ *mpp = NULL; /* otherwise the caller will destroy it */
}
else {
- hp = HAlloc(p, BIG_UINT_HEAP_SIZE + PROC_BIN_SIZE + 5);
- egot = uint_to_big(got, hp);
- hp += BIG_UINT_HEAP_SIZE;
- }
- mpb = db_make_mp_binary(p,mpi.mp,&hp);
- continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix),
- mpb,
- egot);
- mpi.mp = NULL; /*otherwise the return macro will destroy it */
- RET_TO_BIF(bif_trap1(&ets_select_delete_continue_exp, p,
- continuation),
- DB_ERROR_NONE);
+ /* Not the first time we're trapping; reuse continuation terms */
+ hp = HAlloc(sc_context_ptr->p, 7);
+ continuation = TUPLE6(
+ hp,
+ sc_context_ptr->prev_continuation_tptr[1],
+ make_small(slot_ix),
+ sc_context_ptr->prev_continuation_tptr[3],
+ sc_context_ptr->prev_continuation_tptr[4],
+ sc_context_ptr->match_list,
+ make_small(got));
+ }
+ *ret = bif_trap1(&ets_select_continue_exp, sc_context_ptr->p, continuation);
+ return DB_ERROR_NONE;
+}
-#undef RET_TO_BIF
+static int db_select_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, int reverse, Eterm *ret) {
+ return db_select_chunk_hash(p, tbl, tid, pattern, 0, reverse, ret);
+}
+static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size,
+ int reverse, Eterm *ret)
+{
+ mtraversal_select_chunk_context_t sc_context;
+ sc_context.p = p;
+ sc_context.tb = &tbl->hash;
+ sc_context.tid = tid;
+ sc_context.hp = NULL;
+ sc_context.chunk_size = chunk_size;
+ sc_context.match_list = NIL;
+ sc_context.prev_continuation_tptr = NULL;
+
+ return match_traverse(
+ sc_context.p, sc_context.tb,
+ pattern, NULL,
+ sc_context.chunk_size,
+ MAX_SELECT_CHUNK_ITERATIONS,
+ &sc_context.hp, 0,
+ mtraversal_select_chunk_on_nothing_can_match,
+ mtraversal_select_chunk_on_match_res,
+ mtraversal_select_chunk_on_loop_ended,
+ mtraversal_select_chunk_on_trap,
+ &sc_context, ret);
}
+
/*
-** This is called when select_delete traps
-*/
-static int db_select_delete_continue_hash(Process *p,
- DbTable *tbl,
- Eterm continuation,
- Eterm *ret)
+ *
+ * select_continue match traversal
+ *
+ */
+
+static int mtraversal_select_chunk_continue_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got,
+ Sint iterations_left, Binary** mpp, Eterm* ret)
{
- DbTableHash *tb = &tbl->hash;
- Uint slot_ix;
- Uint last_pseudo_delete = (Uint)-1;
- HashDbTerm **current = NULL;
- Eterm *hp;
- int num_left = 1000;
- Uint got;
- Eterm *tptr;
- Binary *mp;
- Eterm egot;
- int fixated_by_me = ONLY_WRITER(p,tb) ? 0 : 1; /* ToDo: something nicer */
- erts_smp_rwmtx_t* lck;
+ mtraversal_select_chunk_context_t* sc_context_ptr = (mtraversal_select_chunk_context_t*) context_ptr;
+ Eterm continuation;
+ Eterm rest = NIL;
+ Eterm* hp;
+
+ ASSERT(iterations_left <= MAX_SELECT_CHUNK_ITERATIONS);
+ BUMP_REDS(sc_context_ptr->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left);
+ if (sc_context_ptr->chunk_size) {
+ Sint rest_size = 0;
+ if (got > sc_context_ptr->chunk_size) {
+ /* Cannot write destructively here,
+ the list may have
+ been in user space */
+ hp = HAlloc(sc_context_ptr->p, (got - sc_context_ptr->chunk_size) * 2);
+ while (got-- > sc_context_ptr->chunk_size) {
+ rest = CONS(hp, CAR(list_val(sc_context_ptr->match_list)), rest);
+ hp += 2;
+ sc_context_ptr->match_list = CDR(list_val(sc_context_ptr->match_list));
+ ++rest_size;
+ }
+ }
+ if (rest != NIL || slot_ix >= 0) {
+ hp = HAlloc(sc_context_ptr->p, 3 + 7);
+ continuation = TUPLE6(
+ hp,
+ sc_context_ptr->prev_continuation_tptr[1],
+ make_small(slot_ix),
+ sc_context_ptr->prev_continuation_tptr[3],
+ sc_context_ptr->prev_continuation_tptr[4],
+ rest,
+ make_small(rest_size));
+ hp += 7;
+ *ret = TUPLE2(hp, sc_context_ptr->match_list, continuation);
+ return DB_ERROR_NONE;
+ } else {
+ if (sc_context_ptr->match_list != NIL) {
+ hp = HAlloc(sc_context_ptr->p, 3);
+ *ret = TUPLE2(hp, sc_context_ptr->match_list, am_EOT);
+ return DB_ERROR_NONE;
+ } else {
+ *ret = am_EOT;
+ return DB_ERROR_NONE;
+ }
+ }
+ }
+ *ret = sc_context_ptr->match_list;
+ return DB_ERROR_NONE;
+}
-#define RET_TO_BIF(Term,RetVal) do { \
- *ret = (Term); \
- return RetVal; \
- } while(0)
+/*
+ * This is called when select traps
+ */
+static int db_select_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) {
+ mtraversal_select_chunk_context_t sc_context = {0};
+ Eterm* tptr;
+ Eterm tid;
+ Binary* mp;
+ Sint got;
+ Sint slot_ix;
+ Sint chunk_size;
+ Eterm match_list;
+ Sint iterations_left = MAX_SELECT_CHUNK_ITERATIONS;
-
+ /* Decode continuation. We know it's a tuple but not the arity or anything else */
+ ASSERT(is_tuple(continuation));
tptr = tuple_val(continuation);
- slot_ix = unsigned_val(tptr[2]);
- mp = ((ProcBin *) binary_val(tptr[3]))->val;
- if (is_big(tptr[4])) {
- got = big_to_uint32(tptr[4]);
- } else {
- got = unsigned_val(tptr[4]);
- }
-
- lck = WLOCK_HASH(tb,slot_ix);
- if (slot_ix >= NACTIVE(tb)) {
- WUNLOCK_HASH(lck);
- goto done;
- }
- current = &BUCKET(tb,slot_ix);
- for(;;) {
- if ((*current) == NULL) {
- if ((slot_ix=next_slot_w(tb,slot_ix,&lck)) == 0) {
- goto done;
- }
- if (num_left <= 0) {
- WUNLOCK_HASH(lck);
- goto trap;
- }
- current = &BUCKET(tb,slot_ix);
- }
- else if ((*current)->hvalue == INVALID_HASH) {
- current = &((*current)->next);
- }
- else {
- int did_erase = 0;
- if (db_match_dbterm(&tb->common, p, mp, 0,
- &(*current)->dbterm, NULL, 0) == am_true) {
- HashDbTerm *del;
- if (NFIXED(tb) > fixated_by_me) { /* fixated by others? */
- if (slot_ix != last_pseudo_delete) {
- if (!add_fixed_deletion(tb, slot_ix, fixated_by_me))
- goto do_erase;
- last_pseudo_delete = slot_ix;
- }
- (*current)->hvalue = INVALID_HASH;
- } else {
- do_erase:
- del = *current;
- *current = (*current)->next;
- free_term(tb, del);
- did_erase = 1;
- }
- erts_smp_atomic_dec_nob(&tb->common.nitems);
- ++got;
- }
-
- --num_left;
- if (!did_erase) {
- current = &((*current)->next);
- }
- }
- }
-done:
- BUMP_REDS(p, 1000 - num_left);
- if (got) {
- try_shrink(tb);
- }
- RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE);
-trap:
- BUMP_ALL_REDS(p);
- if (IS_USMALL(0, got)) {
- hp = HAlloc(p, 5);
- egot = make_small(got);
- }
- else {
- hp = HAlloc(p, BIG_UINT_HEAP_SIZE + 5);
- egot = uint_to_big(got, hp);
- hp += BIG_UINT_HEAP_SIZE;
- }
- continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix),
- tptr[3],
- egot);
- RET_TO_BIF(bif_trap1(&ets_select_delete_continue_exp, p,
- continuation),
- DB_ERROR_NONE);
+ if (arityval(*tptr) != 6)
+ goto badparam;
-#undef RET_TO_BIF
+ if (!is_small(tptr[2]) || !is_small(tptr[3]) ||
+ !(is_list(tptr[5]) || tptr[5] == NIL) || !is_small(tptr[6]))
+ goto badparam;
+ if ((chunk_size = signed_val(tptr[3])) < 0)
+ goto badparam;
+ mp = erts_db_get_match_prog_binary(tptr[4]);
+ if (mp == NULL)
+ goto badparam;
+
+ if ((got = signed_val(tptr[6])) < 0)
+ goto badparam;
+
+ tid = tptr[1];
+ slot_ix = signed_val(tptr[2]);
+ match_list = tptr[5];
+
+ /* Proceed */
+ sc_context.p = p;
+ sc_context.tb = &tbl->hash;
+ sc_context.tid = tid;
+ sc_context.hp = NULL;
+ sc_context.chunk_size = chunk_size;
+ sc_context.match_list = match_list;
+ sc_context.prev_continuation_tptr = tptr;
+
+ return match_traverse_continue(
+ sc_context.p, sc_context.tb, sc_context.chunk_size,
+ iterations_left, &sc_context.hp, slot_ix, got, &mp, 0,
+ mtraversal_select_chunk_on_match_res, /* Reuse callback */
+ mtraversal_select_chunk_continue_on_loop_ended,
+ mtraversal_select_chunk_on_trap, /* Reuse callback */
+ &sc_context, ret);
+
+badparam:
+ *ret = NIL;
+ return DB_ERROR_BADPARAM;
}
-
+
+#undef MAX_SELECT_CHUNK_ITERATIONS
+
+
/*
-** This is called when select_count traps
-*/
-static int db_select_count_continue_hash(Process *p,
- DbTable *tbl,
- Eterm continuation,
- Eterm *ret)
+ *
+ * select_count match traversal
+ *
+ */
+
+#define MAX_SELECT_COUNT_ITERATIONS 1000
+
+typedef struct {
+ Process* p;
+ DbTableHash* tb;
+ Eterm tid;
+ Eterm* hp;
+ Eterm* prev_continuation_tptr;
+} mtraversal_select_count_context_t;
+
+static int mtraversal_select_count_on_nothing_can_match(void* context_ptr, Eterm* ret) {
+ *ret = make_small(0);
+ return DB_ERROR_NONE;
+}
+
+static int mtraversal_select_count_on_match_res(void* context_ptr, Sint slot_ix,
+ HashDbTerm*** current_ptr_ptr,
+ Eterm match_res)
{
- DbTableHash *tb = &tbl->hash;
- Uint slot_ix;
- HashDbTerm* current;
- Eterm *hp;
- int num_left = 1000;
- Uint got;
- Eterm *tptr;
- Binary *mp;
- Eterm egot;
- erts_smp_rwmtx_t* lck;
+ return (match_res == am_true);
+}
-#define RET_TO_BIF(Term,RetVal) do { \
- *ret = (Term); \
- return RetVal; \
- } while(0)
+static int mtraversal_select_count_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got,
+ Sint iterations_left, Binary** mpp, Eterm* ret)
+{
+ mtraversal_select_count_context_t* scnt_context_ptr = (mtraversal_select_count_context_t*) context_ptr;
+ ASSERT(iterations_left <= MAX_SELECT_COUNT_ITERATIONS);
+ BUMP_REDS(scnt_context_ptr->p, MAX_SELECT_COUNT_ITERATIONS - iterations_left);
+ *ret = erts_make_integer(got, scnt_context_ptr->p);
+ return DB_ERROR_NONE;
+}
-
- tptr = tuple_val(continuation);
- slot_ix = unsigned_val(tptr[2]);
- mp = ((ProcBin *) binary_val(tptr[3]))->val;
- if (is_big(tptr[4])) {
- got = big_to_uint32(tptr[4]);
- } else {
- got = unsigned_val(tptr[4]);
- }
-
+static int mtraversal_select_count_on_trap(void* context_ptr, Sint slot_ix, Sint got,
+ Binary** mpp, Eterm* ret)
+{
+ mtraversal_select_count_context_t* scnt_context_ptr = (mtraversal_select_count_context_t*) context_ptr;
+ return on_mtraversal_simple_trap(
+ &ets_select_count_continue_exp,
+ scnt_context_ptr->p,
+ scnt_context_ptr->tb,
+ scnt_context_ptr->tid,
+ scnt_context_ptr->prev_continuation_tptr,
+ slot_ix, got, mpp, ret);
+}
- lck = RLOCK_HASH(tb, slot_ix);
- if (slot_ix >= NACTIVE(tb)) { /* Is this posible? */
- RUNLOCK_HASH(lck);
- goto done;
- }
- current = BUCKET(tb,slot_ix);
-
- for(;;) {
- if (current != NULL) {
- if (current->hvalue == INVALID_HASH) {
- current = current->next;
- continue;
- }
- if (db_match_dbterm(&tb->common, p, mp, 0, &current->dbterm,
- NULL, 0) == am_true) {
- ++got;
- }
- --num_left;
- current = current->next;
- }
- else { /* next bucket */
- if ((slot_ix = next_slot(tb,slot_ix,&lck)) == 0) {
- goto done;
- }
- if (num_left <= 0) {
- RUNLOCK_HASH(lck);
- goto trap;
- }
- current = BUCKET(tb,slot_ix);
- }
- }
-done:
- BUMP_REDS(p, 1000 - num_left);
- RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE);
-trap:
- BUMP_ALL_REDS(p);
- if (IS_USMALL(0, got)) {
- hp = HAlloc(p, 5);
- egot = make_small(got);
+static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) {
+ mtraversal_select_count_context_t scnt_context = {0};
+ Sint iterations_left = MAX_SELECT_COUNT_ITERATIONS;
+ Sint chunk_size = 0;
+
+ scnt_context.p = p;
+ scnt_context.tb = &tbl->hash;
+ scnt_context.tid = tid;
+ scnt_context.hp = NULL;
+ scnt_context.prev_continuation_tptr = NULL;
+
+ return match_traverse(
+ scnt_context.p, scnt_context.tb,
+ pattern, NULL,
+ chunk_size, iterations_left, NULL, 0,
+ mtraversal_select_count_on_nothing_can_match,
+ mtraversal_select_count_on_match_res,
+ mtraversal_select_count_on_loop_ended,
+ mtraversal_select_count_on_trap,
+ &scnt_context, ret);
+}
+
+/*
+ * This is called when select_count traps
+ */
+static int db_select_count_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) {
+ mtraversal_select_count_context_t scnt_context = {0};
+ Eterm* tptr;
+ Eterm tid;
+ Binary* mp;
+ Sint got;
+ Sint slot_ix;
+ Sint chunk_size = 0;
+ *ret = NIL;
+
+ if (unpack_simple_mtraversal_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
+ *ret = NIL;
+ return DB_ERROR_BADPARAM;
+ }
+
+ scnt_context.p = p;
+ scnt_context.tb = &tbl->hash;
+ scnt_context.tid = tid;
+ scnt_context.hp = NULL;
+ scnt_context.prev_continuation_tptr = tptr;
+
+ return match_traverse_continue(
+ scnt_context.p, scnt_context.tb, chunk_size,
+ MAX_SELECT_COUNT_ITERATIONS,
+ NULL, slot_ix, got, &mp, 0,
+ mtraversal_select_count_on_match_res, /* Reuse callback */
+ mtraversal_select_count_on_loop_ended, /* Reuse callback */
+ mtraversal_select_count_on_trap, /* Reuse callback */
+ &scnt_context, ret);
+}
+
+#undef MAX_SELECT_COUNT_ITERATIONS
+
+
+/*
+ *
+ * select_delete match traversal
+ *
+ */
+
+#define MAX_SELECT_DELETE_ITERATIONS 1000
+
+typedef struct {
+ Process* p;
+ DbTableHash* tb;
+ Eterm tid;
+ Eterm* hp;
+ Eterm* prev_continuation_tptr;
+ erts_aint_t fixated_by_me;
+ Uint last_pseudo_delete;
+} mtraversal_select_delete_context_t;
+
+static int mtraversal_select_delete_on_nothing_can_match(void* context_ptr, Eterm* ret) {
+ *ret = make_small(0);
+ return DB_ERROR_NONE;
+}
+
+static int mtraversal_select_delete_on_match_res(void* context_ptr, Sint slot_ix,
+ HashDbTerm*** current_ptr_ptr,
+ Eterm match_res)
+{
+ HashDbTerm** current_ptr = *current_ptr_ptr;
+ mtraversal_select_delete_context_t* sd_context_ptr = (mtraversal_select_delete_context_t*) context_ptr;
+ HashDbTerm* del;
+ if (match_res != am_true)
+ return 0;
+
+ if (NFIXED(sd_context_ptr->tb) > sd_context_ptr->fixated_by_me) { /* fixated by others? */
+ if (slot_ix != sd_context_ptr->last_pseudo_delete) {
+ if (!add_fixed_deletion(sd_context_ptr->tb, slot_ix, sd_context_ptr->fixated_by_me))
+ goto do_erase;
+ sd_context_ptr->last_pseudo_delete = slot_ix;
+ }
+ (*current_ptr)->hvalue = INVALID_HASH;
}
else {
- hp = HAlloc(p, BIG_UINT_HEAP_SIZE + 5);
- egot = uint_to_big(got, hp);
- hp += BIG_UINT_HEAP_SIZE;
+ do_erase:
+ del = *current_ptr;
+ *current_ptr = (*current_ptr)->next; // replace pointer to term using next
+ free_term(sd_context_ptr->tb, del);
}
- continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix),
- tptr[3],
- egot);
- RET_TO_BIF(bif_trap1(&ets_select_count_continue_exp, p,
- continuation),
- DB_ERROR_NONE);
+ erts_smp_atomic_dec_nob(&sd_context_ptr->tb->common.nitems);
-#undef RET_TO_BIF
+ return 1;
+}
+static int mtraversal_select_delete_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got,
+ Sint iterations_left, Binary** mpp, Eterm* ret)
+{
+ mtraversal_select_delete_context_t* sd_context_ptr = (mtraversal_select_delete_context_t*) context_ptr;
+ ASSERT(iterations_left <= MAX_SELECT_DELETE_ITERATIONS);
+ BUMP_REDS(sd_context_ptr->p, MAX_SELECT_DELETE_ITERATIONS - iterations_left);
+ if (got) {
+ try_shrink(sd_context_ptr->tb);
+ }
+ *ret = erts_make_integer(got, sd_context_ptr->p);
+ return DB_ERROR_NONE;
}
-
+
+static int mtraversal_select_delete_on_trap(void* context_ptr, Sint slot_ix, Sint got,
+ Binary** mpp, Eterm* ret)
+{
+ mtraversal_select_delete_context_t* sd_context_ptr = (mtraversal_select_delete_context_t*) context_ptr;
+ return on_mtraversal_simple_trap(
+ &ets_select_delete_continue_exp,
+ sd_context_ptr->p,
+ sd_context_ptr->tb,
+ sd_context_ptr->tid,
+ sd_context_ptr->prev_continuation_tptr,
+ slot_ix, got, mpp, ret);
+}
+
+static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) {
+ mtraversal_select_delete_context_t sd_context = {0};
+ Sint chunk_size = 0;
+
+ sd_context.p = p;
+ sd_context.tb = &tbl->hash;
+ sd_context.tid = tid;
+ sd_context.hp = NULL;
+ sd_context.prev_continuation_tptr = NULL;
+#ifdef ERTS_SMP
+ sd_context.fixated_by_me = sd_context.tb->common.is_thread_safe ? 0 : 1; /* TODO: something nicer */
+#else
+ sd_context.fixated_by_me = 0;
+#endif
+ sd_context.last_pseudo_delete = (Uint) -1;
+
+ return match_traverse(
+ sd_context.p, sd_context.tb,
+ pattern, NULL,
+ chunk_size,
+ MAX_SELECT_DELETE_ITERATIONS, NULL, 1,
+ mtraversal_select_delete_on_nothing_can_match,
+ mtraversal_select_delete_on_match_res,
+ mtraversal_select_delete_on_loop_ended,
+ mtraversal_select_delete_on_trap,
+ &sd_context, ret);
+}
+
+/*
+ * This is called when select_delete traps
+ */
+static int db_select_delete_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) {
+ mtraversal_select_delete_context_t sd_context = {0};
+ Eterm* tptr;
+ Eterm tid;
+ Binary* mp;
+ Sint got;
+ Sint slot_ix;
+ Sint chunk_size = 0;
+
+ if (unpack_simple_mtraversal_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
+ *ret = NIL;
+ return DB_ERROR_BADPARAM;
+ }
+
+ sd_context.p = p;
+ sd_context.tb = &tbl->hash;
+ sd_context.tid = tid;
+ sd_context.hp = NULL;
+ sd_context.prev_continuation_tptr = tptr;
+ sd_context.fixated_by_me = ONLY_WRITER(p, sd_context.tb) ? 0 : 1; /* TODO: something nicer */
+ sd_context.last_pseudo_delete = (Uint) -1;
+
+ return match_traverse_continue(
+ sd_context.p, sd_context.tb, chunk_size,
+ MAX_SELECT_DELETE_ITERATIONS,
+ NULL, slot_ix, got, &mp, 1,
+ mtraversal_select_delete_on_match_res, /* Reuse callback */
+ mtraversal_select_delete_on_loop_ended, /* Reuse callback */
+ mtraversal_select_delete_on_trap, /* Reuse callback */
+ &sd_context, ret);
+}
+
+#undef MAX_SELECT_DELETE_ITERATIONS
+
+
+/*
+ *
+ * select_replace match traversal
+ *
+ */
+
+#define MAX_SELECT_REPLACE_ITERATIONS 1000
+
+typedef struct {
+ Process* p;
+ DbTableHash* tb;
+ Eterm tid;
+ Eterm* hp;
+ Eterm* prev_continuation_tptr;
+} mtraversal_select_replace_context_t;
+
+static int mtraversal_select_replace_on_nothing_can_match(void* context_ptr, Eterm* ret) {
+ *ret = make_small(0);
+ return DB_ERROR_NONE;
+}
+
+static int mtraversal_select_replace_on_match_res(void* context_ptr, Sint slot_ix,
+ HashDbTerm*** current_ptr_ptr,
+ Eterm match_res)
+{
+ mtraversal_select_replace_context_t* sr_context_ptr = (mtraversal_select_replace_context_t*) context_ptr;
+ DbTableHash* tb = sr_context_ptr->tb;
+ HashDbTerm* new;
+ HashDbTerm* next;
+ HashValue hval;
+
+ if (is_value(match_res)) {
+#ifdef DEBUG
+ Eterm key = db_getkey(tb->common.keypos, match_res);
+ ASSERT(is_value(key));
+ ASSERT(eq(key, GETKEY(tb, (**current_ptr_ptr)->dbterm.tpl)));
+#endif
+ next = (**current_ptr_ptr)->next;
+ hval = (**current_ptr_ptr)->hvalue;
+ new = new_dbterm(tb, match_res);
+ new->next = next;
+ new->hvalue = hval;
+ free_term(tb, **current_ptr_ptr);
+ **current_ptr_ptr = new; /* replace 'next' pointer in previous object */
+ *current_ptr_ptr = &((**current_ptr_ptr)->next); /* advance to next object */
+ return 1;
+ }
+ return 0;
+}
+
+static int mtraversal_select_replace_on_loop_ended(void* context_ptr, Sint slot_ix, Sint got,
+ Sint iterations_left, Binary** mpp, Eterm* ret)
+{
+ mtraversal_select_replace_context_t* sr_context_ptr = (mtraversal_select_replace_context_t*) context_ptr;
+ ASSERT(iterations_left <= MAX_SELECT_REPLACE_ITERATIONS);
+ /* the more objects we've replaced, the more reductions we've consumed */
+ BUMP_REDS(sr_context_ptr->p,
+ MIN(MAX_SELECT_REPLACE_ITERATIONS * 2,
+ (MAX_SELECT_REPLACE_ITERATIONS - iterations_left) + (int)got));
+ *ret = erts_make_integer(got, sr_context_ptr->p);
+ return DB_ERROR_NONE;
+}
+
+static int mtraversal_select_replace_on_trap(void* context_ptr, Sint slot_ix, Sint got,
+ Binary** mpp, Eterm* ret)
+{
+ mtraversal_select_replace_context_t* sr_context_ptr = (mtraversal_select_replace_context_t*) context_ptr;
+ return on_mtraversal_simple_trap(
+ &ets_select_replace_continue_exp,
+ sr_context_ptr->p,
+ sr_context_ptr->tb,
+ sr_context_ptr->tid,
+ sr_context_ptr->prev_continuation_tptr,
+ slot_ix, got, mpp, ret);
+}
+
+static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret)
+{
+ mtraversal_select_replace_context_t sr_context = {0};
+ Sint chunk_size = 0;
+
+ /* Bag implementation presented both semantic consistency and performance issues,
+ * unsupported for now
+ */
+ ASSERT(!(tbl->hash.common.status & DB_BAG));
+
+ sr_context.p = p;
+ sr_context.tb = &tbl->hash;
+ sr_context.tid = tid;
+ sr_context.hp = NULL;
+ sr_context.prev_continuation_tptr = NULL;
+
+ return match_traverse(
+ sr_context.p, sr_context.tb,
+ pattern, db_match_keeps_key,
+ chunk_size,
+ MAX_SELECT_REPLACE_ITERATIONS, NULL, 1,
+ mtraversal_select_replace_on_nothing_can_match,
+ mtraversal_select_replace_on_match_res,
+ mtraversal_select_replace_on_loop_ended,
+ mtraversal_select_replace_on_trap,
+ &sr_context, ret);
+}
+
+/*
+ * This is called when select_replace traps
+ */
+static int db_select_replace_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret)
+{
+ mtraversal_select_replace_context_t sr_context = {0};
+ Eterm* tptr;
+ Eterm tid ;
+ Binary* mp;
+ Sint got;
+ Sint slot_ix;
+ Sint chunk_size = 0;
+ *ret = NIL;
+
+ if (unpack_simple_mtraversal_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) {
+ *ret = NIL;
+ return DB_ERROR_BADPARAM;
+ }
+
+ /* Proceed */
+ sr_context.p = p;
+ sr_context.tb = &tbl->hash;
+ sr_context.tid = tid;
+ sr_context.hp = NULL;
+ sr_context.prev_continuation_tptr = tptr;
+
+ return match_traverse_continue(
+ sr_context.p, sr_context.tb, chunk_size,
+ MAX_SELECT_REPLACE_ITERATIONS,
+ NULL, slot_ix, got, &mp, 1,
+ mtraversal_select_replace_on_match_res, /* Reuse callback */
+ mtraversal_select_replace_on_loop_ended, /* Reuse callback */
+ mtraversal_select_replace_on_trap, /* Reuse callback */
+ &sr_context, ret);
+}
+
+
static int db_take_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
{
DbTableHash *tb = &tbl->hash;
@@ -2123,6 +2278,7 @@ static int db_take_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret)
return DB_ERROR_NONE;
}
+
/*
** Other interface routines (not directly coupled to one bif)
*/
@@ -2219,19 +2375,17 @@ static void db_print_hash(fmtfn_t to, void *to_arg, int show, DbTable *tbl)
/* release all memory occupied by a single table */
static int db_free_table_hash(DbTable *tbl)
{
- while (!db_free_table_continue_hash(tbl))
+ while (db_free_table_continue_hash(tbl, ERTS_SWORD_MAX) < 0)
;
return 0;
}
-static int db_free_table_continue_hash(DbTable *tbl)
+static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds)
{
DbTableHash *tb = &tbl->hash;
- int done;
FixedDeletion* fixdel = (FixedDeletion*) erts_smp_atomic_read_acqb(&tb->fixdel);
- ERTS_SMP_LC_ASSERT(IS_TAB_WLOCKED(tb));
+ ERTS_SMP_LC_ASSERT(IS_TAB_WLOCKED(tb) || (tb->common.status & DB_DELETE));
- done = 0;
while (fixdel != NULL) {
FixedDeletion *fx = fixdel;
@@ -2241,22 +2395,21 @@ static int db_free_table_continue_hash(DbTable *tbl)
(void *) fx,
sizeof(FixedDeletion));
ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion));
- if (++done >= 2*DELETE_RECORD_LIMIT) {
+ if (--reds < 0) {
erts_smp_atomic_set_relb(&tb->fixdel, (erts_aint_t)fixdel);
- return 0; /* Not done */
+ return reds; /* Not done */
}
}
erts_smp_atomic_set_relb(&tb->fixdel, (erts_aint_t)NULL);
- done /= 2;
while(tb->nslots != 0) {
- free_seg(tb, 1);
+ reds -= EXT_SEGSZ/64 + free_seg(tb, 1);
/*
* If we have done enough work, get out here.
*/
- if (++done >= (DELETE_RECORD_LIMIT / CHAIN_LEN / SEGSZ)) {
- return 0; /* Not done */
+ if (reds < 0) {
+ return reds; /* Not done */
}
}
#ifdef ERTS_SMP
@@ -2271,7 +2424,7 @@ static int db_free_table_continue_hash(DbTable *tbl)
}
#endif
ASSERT(erts_smp_atomic_read_nob(&tb->common.memory_size) == sizeof(DbTable));
- return 1; /* Done */
+ return reds; /* Done */
}
@@ -2284,7 +2437,8 @@ static int db_free_table_continue_hash(DbTable *tbl)
** slots should be searched. Also compiles the match program
*/
static int analyze_pattern(DbTableHash *tb, Eterm pattern,
- struct mp_info *mpi)
+ extra_match_validator_t extra_validator, /* Optional callback */
+ struct mp_info *mpi)
{
Eterm *ptpl;
Eterm lst, tpl, ttpl;
@@ -2322,7 +2476,10 @@ static int analyze_pattern(DbTableHash *tb, Eterm pattern,
i = 0;
for(lst = pattern; is_list(lst); lst = CDR(list_val(lst))) {
- Eterm body;
+ Eterm match;
+ Eterm guard;
+ Eterm body;
+
ttpl = CAR(list_val(lst));
if (!is_tuple(ttpl)) {
if (buff != sbuff) {
@@ -2337,9 +2494,17 @@ static int analyze_pattern(DbTableHash *tb, Eterm pattern,
}
return DB_ERROR_BADPARAM;
}
- matches[i] = tpl = ptpl[1];
- guards[i] = ptpl[2];
+ matches[i] = match = tpl = ptpl[1];
+ guards[i] = guard = ptpl[2];
bodies[i] = body = ptpl[3];
+
+ if(extra_validator != NULL && !extra_validator(tb->common.keypos, match, guard, body)) {
+ if (buff != sbuff) {
+ erts_free(ERTS_ALC_T_DB_TMP, buff);
+ }
+ return DB_ERROR_BADPARAM;
+ }
+
if (!is_list(body) || CDR(list_val(body)) != NIL ||
CAR(list_val(body)) != am_DollarUnderscore) {
mpi->all_objects = 0;
@@ -2414,90 +2579,81 @@ static int analyze_pattern(DbTableHash *tb, Eterm pattern,
return DB_ERROR_NONE;
}
-static struct ext_segment* alloc_ext_seg(DbTableHash* tb, unsigned seg_ix,
- struct segment** old_segtab)
+static struct ext_segtab* alloc_ext_segtab(DbTableHash* tb, unsigned seg_ix)
{
- int nsegs;
- struct ext_segment* eseg;
+ struct segment** old_segtab = SEGTAB(tb);
+ int nsegs = 0;
+ struct ext_segtab* est;
+ ASSERT(seg_ix >= NSEG_1);
switch (seg_ix) {
- case 0: nsegs = NSEG_1; break;
- case 1: nsegs = NSEG_2; break;
- default: nsegs = seg_ix + NSEG_INC; break;
- }
- eseg = (struct ext_segment*) erts_db_alloc_fnf(ERTS_ALC_T_DB_SEG,
- (DbTable *) tb,
- SIZEOF_EXTSEG(nsegs));
- ASSERT(eseg != NULL);
- sys_memset(&eseg->s, 0, sizeof(struct segment));
- IF_DEBUG(eseg->s.is_ext_segment = 1);
- eseg->prev_segtab = old_segtab;
- eseg->nsegs = nsegs;
- if (old_segtab) {
- ASSERT(nsegs > tb->nsegs);
- sys_memcpy(eseg->segtab, old_segtab, tb->nsegs*sizeof(struct segment*));
- }
+ case NSEG_1: nsegs = NSEG_2; break;
+ default: nsegs = seg_ix + NSEG_INC; break;
+ }
+ ASSERT(nsegs > tb->nsegs);
+ est = (struct ext_segtab*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
+ (DbTable *) tb,
+ SIZEOF_EXT_SEGTAB(nsegs));
+ est->nsegs = nsegs;
+ est->prev_segtab = old_segtab;
+ est->prev_nsegs = tb->nsegs;
+ sys_memcpy(est->segtab, old_segtab, tb->nsegs*sizeof(struct segment*));
#ifdef DEBUG
- sys_memset(&eseg->segtab[seg_ix], 0, (nsegs-seg_ix)*sizeof(struct segment*));
+ sys_memset(&est->segtab[seg_ix], 0, (nsegs-seg_ix)*sizeof(struct segment*));
#endif
- eseg->segtab[seg_ix] = &eseg->s;
- return eseg;
+ return est;
}
/* Extend table with one new segment
*/
-static int alloc_seg(DbTableHash *tb)
+static void alloc_seg(DbTableHash *tb)
{
- int seg_ix = tb->nslots >> SEGSZ_EXP;
-
- if (seg_ix+1 == tb->nsegs) { /* New segtab needed (extended segment) */
- struct segment** segtab = SEGTAB(tb);
- struct ext_segment* seg = alloc_ext_seg(tb, seg_ix, segtab);
- if (seg == NULL) return 0;
- segtab[seg_ix] = &seg->s;
- /* We don't use the new segtab until next call (see "shrink race") */
- }
- else { /* Just a new plain segment */
- struct segment** segtab;
- if (seg_ix == tb->nsegs) { /* Time to start use segtab from last call */
- struct ext_segment* eseg;
- eseg = (struct ext_segment*) SEGTAB(tb)[seg_ix-1];
- MY_ASSERT(eseg!=NULL && eseg->s.is_ext_segment);
- SET_SEGTAB(tb, eseg->segtab);
- tb->nsegs = eseg->nsegs;
- }
- ASSERT(seg_ix < tb->nsegs);
- segtab = SEGTAB(tb);
- ASSERT(segtab[seg_ix] == NULL);
- segtab[seg_ix] = (struct segment*) erts_db_alloc_fnf(ERTS_ALC_T_DB_SEG,
- (DbTable *) tb,
- sizeof(struct segment));
- if (segtab[seg_ix] == NULL) return 0;
- sys_memset(segtab[seg_ix], 0, sizeof(struct segment));
- }
- tb->nslots += SEGSZ;
- return 1;
+ int seg_ix = SLOT_IX_TO_SEG_IX(tb->nslots);
+ struct segment** segtab;
+
+ ASSERT(seg_ix > 0);
+ if (seg_ix == tb->nsegs) { /* New segtab needed */
+ struct ext_segtab* est = alloc_ext_segtab(tb, seg_ix);
+ SET_SEGTAB(tb, est->segtab);
+ tb->nsegs = est->nsegs;
+ }
+ ASSERT(seg_ix < tb->nsegs);
+ segtab = SEGTAB(tb);
+ segtab[seg_ix] = (struct segment*) erts_db_alloc(ERTS_ALC_T_DB_SEG,
+ (DbTable *) tb,
+ SIZEOF_SEGMENT(EXT_SEGSZ));
+ sys_memset(segtab[seg_ix], 0, SIZEOF_SEGMENT(EXT_SEGSZ));
+ tb->nslots += EXT_SEGSZ;
}
+#ifdef ERTS_SMP
+static void dealloc_ext_segtab(void* lop_data)
+{
+ struct ext_segtab* est = (struct ext_segtab*) lop_data;
+
+ erts_free(ERTS_ALC_T_DB_SEG, est);
+}
+#endif
+
/* Shrink table by freeing the top segment
** free_records: 1=free any records in segment, 0=assume segment is empty
*/
static int free_seg(DbTableHash *tb, int free_records)
{
- const int seg_ix = (tb->nslots >> SEGSZ_EXP) - 1;
+ const int seg_ix = SLOT_IX_TO_SEG_IX(tb->nslots) - 1;
struct segment** const segtab = SEGTAB(tb);
- struct ext_segment* const top = (struct ext_segment*) segtab[seg_ix];
- int bytes;
+ struct segment* const segp = segtab[seg_ix];
+ Uint seg_sz;
int nrecords = 0;
- ASSERT(top != NULL);
+ ASSERT(segp != NULL);
#ifndef DEBUG
if (free_records)
#endif
{
- int i;
- for (i=0; i<SEGSZ; ++i) {
- HashDbTerm* p = top->s.buckets[i];
+ int i = (seg_ix == 0) ? FIRST_SEGSZ : EXT_SEGSZ;
+ while (i--) {
+ HashDbTerm* p = segp->buckets[i];
while(p != 0) {
HashDbTerm* nxt = p->next;
ASSERT(free_records); /* segment not empty as assumed? */
@@ -2507,55 +2663,46 @@ static int free_seg(DbTableHash *tb, int free_records)
}
}
}
-
- /* The "shrink race":
- * We must avoid deallocating an extended segment while its segtab may
- * still be used by other threads.
- * The trick is to stop use a segtab one call earlier. That is, stop use
- * a segtab when the segment above it is deallocated. When the segtab is
- * later deallocated, it has not been used for a very long time.
- * It is even theoretically safe as we have by then rehashed the entire
- * segment, seizing *all* locks, so there cannot exist any retarded threads
- * still hanging in BUCKET macro with an old segtab pointer.
- * For this to work, we must of course allocate a new segtab one call
- * earlier in alloc_seg() as well. And this is also the reason why
- * the minimum size of the first segtab is 2 and not 1 (NSEG_1).
- */
- if (seg_ix == tb->nsegs-1 || seg_ix==0) { /* Dealloc extended segment */
- MY_ASSERT(top->s.is_ext_segment);
- ASSERT(segtab != top->segtab || seg_ix==0);
- bytes = SIZEOF_EXTSEG(top->nsegs);
- }
- else { /* Dealloc plain segment */
- struct ext_segment* newtop = (struct ext_segment*) segtab[seg_ix-1];
- MY_ASSERT(!top->s.is_ext_segment);
-
- if (segtab == newtop->segtab) { /* New top segment is extended */
- MY_ASSERT(newtop->s.is_ext_segment);
- if (newtop->prev_segtab != NULL) {
- /* Time to use a smaller segtab */
- SET_SEGTAB(tb, newtop->prev_segtab);
- tb->nsegs = seg_ix;
- ASSERT(tb->nsegs == EXTSEG(SEGTAB(tb))->nsegs);
- }
- else {
- ASSERT(NSEG_1 > 2 && seg_ix==1);
- }
- }
- bytes = sizeof(struct segment);
+ if (seg_ix >= NSEG_1) {
+ struct ext_segtab* est = ErtsContainerStruct_(segtab,struct ext_segtab,segtab);
+
+ if (seg_ix == est->prev_nsegs) { /* Dealloc extended segtab */
+ ASSERT(est->prev_segtab != NULL);
+ SET_SEGTAB(tb, est->prev_segtab);
+ tb->nsegs = est->prev_nsegs;
+
+#ifdef ERTS_SMP
+ if (!tb->common.is_thread_safe) {
+ /*
+ * Table is doing a graceful shrink operation and we must avoid
+ * deallocating this segtab while it may still be read by other
+ * threads. Schedule deallocation with thread progress to make
+ * sure no lingering threads are still hanging in BUCKET macro
+ * with an old segtab pointer.
+ */
+ Uint sz = SIZEOF_EXT_SEGTAB(est->nsegs);
+ ASSERT(sz == ERTS_ALC_DBG_BLK_SZ(est));
+ ERTS_DB_ALC_MEM_UPDATE_(tb, sz, 0);
+ erts_schedule_thr_prgr_later_cleanup_op(dealloc_ext_segtab,
+ est,
+ &est->lop,
+ sz);
+ }
+ else
+#endif
+ erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable*)tb, est,
+ SIZEOF_EXT_SEGTAB(est->nsegs));
+ }
}
+ seg_sz = (seg_ix == 0) ? FIRST_SEGSZ : EXT_SEGSZ;
+ erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb, segp, SIZEOF_SEGMENT(seg_sz));
- erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb,
- (void*)top, bytes);
#ifdef DEBUG
- if (seg_ix > 0) {
- segtab[seg_ix] = NULL;
- } else {
- SET_SEGTAB(tb, NULL);
- }
+ if (seg_ix < tb->nsegs)
+ SEGTAB(tb)[seg_ix] = NULL;
#endif
- tb->nslots -= SEGSZ;
+ tb->nslots -= seg_sz;
ASSERT(tb->nslots >= 0);
return nrecords;
}
@@ -2604,104 +2751,111 @@ static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2,
static ERTS_INLINE int
begin_resizing(DbTableHash* tb)
{
+#ifdef ERTS_SMP
if (DB_USING_FINE_LOCKING(tb))
- return !erts_smp_atomic_xchg_acqb(&tb->is_resizing, 1);
- else {
- if (erts_smp_atomic_read_nob(&tb->is_resizing))
- return 0;
- erts_smp_atomic_set_nob(&tb->is_resizing, 1);
- return 1;
- }
+ return !erts_atomic_xchg_acqb(&tb->is_resizing, 1);
+ else
+ ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock));
+#endif
+ return 1;
}
static ERTS_INLINE void
done_resizing(DbTableHash* tb)
{
+#ifdef ERTS_SMP
if (DB_USING_FINE_LOCKING(tb))
- erts_smp_atomic_set_relb(&tb->is_resizing, 0);
- else
- erts_smp_atomic_set_nob(&tb->is_resizing, 0);
+ erts_atomic_set_relb(&tb->is_resizing, 0);
+#endif
}
-/* Grow table with one new bucket.
+/* Grow table with one or more new buckets.
** Allocate new segment if needed.
*/
-static void grow(DbTableHash* tb, int nactive)
+static void grow(DbTableHash* tb, int nitems)
{
HashDbTerm** pnext;
HashDbTerm** to_pnext;
HashDbTerm* p;
erts_smp_rwmtx_t* lck;
- int from_ix;
+ int nactive;
+ int from_ix, to_ix;
int szm;
+ int loop_limit = 5;
- if (!begin_resizing(tb))
- return; /* already in progress */
- if (NACTIVE(tb) != nactive) {
- goto abort; /* already done (race) */
- }
-
- /* Ensure that the slot nactive exists */
- if (nactive == tb->nslots) {
- /* Time to get a new segment */
- ASSERT((nactive & SEGSZ_MASK) == 0);
- if (!alloc_seg(tb)) goto abort;
- }
- ASSERT(nactive < tb->nslots);
+ do {
+ if (!begin_resizing(tb))
+ return; /* already in progress */
+ nactive = NACTIVE(tb);
+ if (nitems <= GROW_LIMIT(nactive)) {
+ goto abort; /* already done (race) */
+ }
- szm = erts_smp_atomic_read_nob(&tb->szm);
- if (nactive <= szm) {
- from_ix = nactive & (szm >> 1);
- } else {
- ASSERT(nactive == szm+1);
- from_ix = 0;
- szm = (szm<<1) | 1;
- }
+ /* Ensure that the slot nactive exists */
+ if (nactive == tb->nslots) {
+ /* Time to get a new segment */
+ ASSERT(((nactive-FIRST_SEGSZ) & EXT_SEGSZ_MASK) == 0);
+ alloc_seg(tb);
+ }
+ ASSERT(nactive < tb->nslots);
- lck = WLOCK_HASH(tb, from_ix);
- /* Now a final double check (with the from_ix lock held)
- * that we did not get raced by a table fixer.
- */
- if (IS_FIXED(tb)) {
- WUNLOCK_HASH(lck);
- goto abort;
- }
- erts_smp_atomic_inc_nob(&tb->nactive);
- if (from_ix == 0) {
- if (DB_USING_FINE_LOCKING(tb))
- erts_smp_atomic_set_relb(&tb->szm, szm);
- else
- erts_smp_atomic_set_nob(&tb->szm, szm);
- }
- done_resizing(tb);
+ szm = erts_smp_atomic_read_nob(&tb->szm);
+ if (nactive <= szm) {
+ from_ix = nactive & (szm >> 1);
+ } else {
+ ASSERT(nactive == szm+1);
+ from_ix = 0;
+ szm = (szm<<1) | 1;
+ }
+ to_ix = nactive;
+
+ lck = WLOCK_HASH(tb, from_ix);
+ ERTS_SMP_ASSERT(lck == GET_LOCK_MAYBE(tb,to_ix));
+ /* Now a final double check (with the from_ix lock held)
+ * that we did not get raced by a table fixer.
+ */
+ if (IS_FIXED(tb)) {
+ WUNLOCK_HASH(lck);
+ goto abort;
+ }
+ erts_smp_atomic_set_nob(&tb->nactive, ++nactive);
+ if (from_ix == 0) {
+ if (DB_USING_FINE_LOCKING(tb))
+ erts_smp_atomic_set_relb(&tb->szm, szm);
+ else
+ erts_smp_atomic_set_nob(&tb->szm, szm);
+ }
+ done_resizing(tb);
+
+ /* Finally, let's split the bucket. We try to do it in a smart way
+ to keep link order and avoid unnecessary updates of next-pointers */
+ pnext = &BUCKET(tb, from_ix);
+ p = *pnext;
+ to_pnext = &BUCKET(tb, to_ix);
+ while (p != NULL) {
+ if (p->hvalue == INVALID_HASH) { /* rare but possible with fine locking */
+ *pnext = p->next;
+ free_term(tb, p);
+ p = *pnext;
+ }
+ else {
+ int ix = p->hvalue & szm;
+ if (ix != from_ix) {
+ ASSERT(ix == (from_ix ^ ((szm+1)>>1)));
+ *to_pnext = p;
+ /* Swap "from" and "to": */
+ from_ix = ix;
+ to_pnext = pnext;
+ }
+ pnext = &p->next;
+ p = *pnext;
+ }
+ }
+ *to_pnext = NULL;
+ WUNLOCK_HASH(lck);
- /* Finally, let's split the bucket. We try to do it in a smart way
- to keep link order and avoid unnecessary updates of next-pointers */
- pnext = &BUCKET(tb, from_ix);
- p = *pnext;
- to_pnext = &BUCKET(tb, nactive);
- while (p != NULL) {
- if (p->hvalue == INVALID_HASH) { /* rare but possible with fine locking */
- *pnext = p->next;
- free_term(tb, p);
- p = *pnext;
- }
- else {
- int ix = p->hvalue & szm;
- if (ix != from_ix) {
- ASSERT(ix == (from_ix ^ ((szm+1)>>1)));
- *to_pnext = p;
- /* Swap "from" and "to": */
- from_ix = ix;
- to_pnext = pnext;
- }
- pnext = &p->next;
- p = *pnext;
- }
- }
- *to_pnext = NULL;
+ }while (--loop_limit && nitems > GROW_LIMIT(nactive));
- WUNLOCK_HASH(lck);
return;
abort:
@@ -2712,60 +2866,78 @@ abort:
/* Shrink table by joining top bucket.
** Remove top segment if it gets empty.
*/
-static void shrink(DbTableHash* tb, int nactive)
-{
- if (!begin_resizing(tb))
- return; /* already in progress */
- if (NACTIVE(tb) == nactive) {
- erts_smp_rwmtx_t* lck;
- int src_ix = nactive - 1;
- int low_szm = erts_smp_atomic_read_nob(&tb->szm) >> 1;
- int dst_ix = src_ix & low_szm;
-
- ASSERT(dst_ix < src_ix);
- ASSERT(nactive > SEGSZ);
- lck = WLOCK_HASH(tb, dst_ix);
- /* Double check for racing table fixers */
- if (!IS_FIXED(tb)) {
- HashDbTerm** src_bp = &BUCKET(tb, src_ix);
- HashDbTerm** dst_bp = &BUCKET(tb, dst_ix);
- HashDbTerm** bp = src_bp;
-
- /* Q: Why join lists by appending "dst" at the end of "src"?
- A: Must step through "src" anyway to purge pseudo deleted. */
- while(*bp != NULL) {
- if ((*bp)->hvalue == INVALID_HASH) {
- HashDbTerm* deleted = *bp;
- *bp = deleted->next;
- free_term(tb, deleted);
- } else {
- bp = &(*bp)->next;
- }
- }
- *bp = *dst_bp;
- *dst_bp = *src_bp;
- *src_bp = NULL;
-
- erts_smp_atomic_set_nob(&tb->nactive, src_ix);
- if (dst_ix == 0) {
- erts_smp_atomic_set_relb(&tb->szm, low_szm);
- }
- WUNLOCK_HASH(lck);
-
- if (tb->nslots - src_ix >= SEGSZ) {
- free_seg(tb, 0);
- }
- }
- else {
- WUNLOCK_HASH(lck);
- }
+static void shrink(DbTableHash* tb, int nitems)
+{
+ HashDbTerm** src_bp;
+ HashDbTerm** dst_bp;
+ HashDbTerm** bp;
+ erts_smp_rwmtx_t* lck;
+ int src_ix, dst_ix, low_szm;
+ int nactive;
+ int loop_limit = 5;
- }
- /*else already done */
+ do {
+ if (!begin_resizing(tb))
+ return; /* already in progress */
+ nactive = NACTIVE(tb);
+ if (!(nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive))) {
+ goto abort; /* already done (race) */
+ }
+ src_ix = nactive - 1;
+ low_szm = erts_smp_atomic_read_nob(&tb->szm) >> 1;
+ dst_ix = src_ix & low_szm;
+
+ ASSERT(dst_ix < src_ix);
+ ASSERT(nactive > FIRST_SEGSZ);
+ lck = WLOCK_HASH(tb, dst_ix);
+ ERTS_SMP_ASSERT(lck == GET_LOCK_MAYBE(tb,src_ix));
+ /* Double check for racing table fixers */
+ if (IS_FIXED(tb)) {
+ WUNLOCK_HASH(lck);
+ goto abort;
+ }
+
+ src_bp = &BUCKET(tb, src_ix);
+ dst_bp = &BUCKET(tb, dst_ix);
+ bp = src_bp;
+
+ /*
+ * We join lists by appending "dst" at the end of "src"
+ * as we must step through "src" anyway to purge pseudo deleted.
+ */
+ while(*bp != NULL) {
+ if ((*bp)->hvalue == INVALID_HASH) {
+ HashDbTerm* deleted = *bp;
+ *bp = deleted->next;
+ free_term(tb, deleted);
+ } else {
+ bp = &(*bp)->next;
+ }
+ }
+ *bp = *dst_bp;
+ *dst_bp = *src_bp;
+ *src_bp = NULL;
+
+ nactive = src_ix;
+ erts_smp_atomic_set_nob(&tb->nactive, nactive);
+ if (dst_ix == 0) {
+ erts_smp_atomic_set_relb(&tb->szm, low_szm);
+ }
+ WUNLOCK_HASH(lck);
+
+ if (tb->nslots - src_ix >= EXT_SEGSZ) {
+ free_seg(tb, 0);
+ }
+ done_resizing(tb);
+
+ } while (--loop_limit
+ && nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive));
+ return;
+
+abort:
done_resizing(tb);
}
-
/* Search a list of tuples for a matching key */
static HashDbTerm* search_list(DbTableHash* tb, Eterm key,
@@ -2931,8 +3103,8 @@ db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle)
WUNLOCK_HASH(lck);
nactive = NACTIVE(tb);
- if (nitems > nactive * (CHAIN_LEN + 1) && !IS_FIXED(tb)) {
- grow(tb, nactive);
+ if (nitems > GROW_LIMIT(nactive) && !IS_FIXED(tb)) {
+ grow(tb, nitems);
}
} else {
WUNLOCK_HASH(lck);
@@ -3016,24 +3188,30 @@ void db_calc_stats_hash(DbTableHash* tb, DbHashStats* stats)
stats->std_dev_expected = sqrt(stats->avg_chain_len * (1 - 1.0/NACTIVE(tb)));
stats->kept_items = kept_items;
}
-#ifdef HARDDEBUG
-void db_check_table_hash(DbTable *tbl)
+/* For testing only */
+Eterm erts_ets_hash_sizeof_ext_segtab(void)
{
- DbTableHash *tb = &tbl->hash;
- HashDbTerm* list;
- int j;
-
- for (j = 0; j < tb->nactive; j++) {
- if ((list = BUCKET(tb,j)) != 0) {
- while (list != 0) {
- if (!is_tuple(make_tuple(list->dbterm.tpl))) {
- erts_exit(ERTS_ERROR_EXIT, "Bad term in slot %d of ets table", j);
- }
- list = list->next;
- }
- }
- }
+ return make_small(((SIZEOF_EXT_SEGTAB(0)-1) / sizeof(UWord)) + 1);
}
-#endif
+#ifdef ERTS_ENABLE_LOCK_COUNT
+void erts_lcnt_enable_db_hash_lock_count(DbTableHash *tb, int enable) {
+ int i;
+
+ if(tb->locks == NULL) {
+ return;
+ }
+
+ for(i = 0; i < DB_HASH_LOCK_CNT; i++) {
+ erts_lcnt_ref_t *ref = &tb->locks->lck_vec[i].lck.lcnt;
+
+ if(enable) {
+ erts_lcnt_install_new_lock_info(ref, "db_hash_slot", tb->common.the_name,
+ ERTS_LOCK_TYPE_RWMUTEX | ERTS_LOCK_FLAGS_CATEGORY_DB);
+ } else {
+ erts_lcnt_uninstall(ref);
+ }
+ }
+}
+#endif /* ERTS_ENABLE_LOCK_COUNT */