/* * %CopyrightBegin% * * Copyright Ericsson AB 1998-2017. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * %CopyrightEnd% */ /* ** Implementation of unordered ETS tables. ** The tables are implemented as linear dynamic hash tables. ** https://en.wikipedia.org/wiki/Linear_hashing */ /* SMP: ** The hash table supports two different locking "modes", ** coarse grained and fine grained locking. ** ** Coarse grained locking relies entirely on the caller (erl_db.c) to obtain ** the right kind of lock on the entire table depending on operation (reading ** or writing). No further locking is then done by the table itself. ** ** Fine grained locking is supported by this code to allow concurrent updates ** (and reading) to different parts of the table. This works by keeping one ** rw-mtx for every N'th bucket. Even dynamic growing and shrinking by ** rehashing buckets can be done without exclusive table lock. ** ** A table will support fine grained locking if it is created with flag ** DB_FINE_LOCKED set. The table variable is_thread_safe will then indicate ** if operations need to obtain fine grained locks or not. Some operations ** will for example always use exclusive table lock to guarantee ** a higher level of atomicity. */ /* FIXATION: ** Fixating the table, by ets:safe_fixtable or as done by select-operations, ** guarantees two things in current implementation. ** (1) Keys will not *totaly* disappear from the table. A key can thus be used ** as an iterator to find the next key in iteration sequence. Note however ** that this does not mean that (pointers to) table objects are guaranteed ** to be maintained while the table is fixated. A BAG or DBAG may actually ** remove objects as long as there is at least one object left in the table ** with the same key (alive or pseudo-deleted). ** (2) Objects will not be moved between buckets due to table grow/shrink. ** This will guarantee that iterations do not miss keys or get double-hits. ** ** With fine grained locking, a concurrent thread can fixate the table at any ** time. A "dangerous" operation (delete or move) therefore needs to check ** if the table is fixated while write-locking the bucket. */ /* #ifdef DEBUG #define HARDDEBUG 1 #endif */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "sys.h" #include "erl_vm.h" #include "global.h" #include "erl_process.h" #include "error.h" #define ERTS_WANT_DB_INTERNAL__ #include "erl_db.h" #include "bif.h" #include "big.h" #include "export.h" #include "erl_binary.h" #include "erl_db_hash.h" /* * The following symbols can be manipulated to "tune" the linear hash array */ #define GROW_LIMIT(NACTIVE) ((NACTIVE)*1) #define SHRINK_LIMIT(NACTIVE) ((NACTIVE) / 2) /* ** We want the first mandatory segment to be small (to reduce minimal footprint) ** and larger extra segments (to reduce number of alloc/free calls). */ /* Number of slots in first segment */ #define FIRST_SEGSZ_EXP 8 #define FIRST_SEGSZ (1 << FIRST_SEGSZ_EXP) #define FIRST_SEGSZ_MASK (FIRST_SEGSZ - 1) /* Number of slots per extra segment */ #define EXT_SEGSZ_EXP 11 #define EXT_SEGSZ (1 << EXT_SEGSZ_EXP) #define EXT_SEGSZ_MASK (EXT_SEGSZ-1) #define NSEG_1 (ErtsSizeofMember(DbTableHash,first_segtab) / sizeof(struct segment*)) #define NSEG_2 256 /* Size of second segment table */ #define NSEG_INC 128 /* Number of segments to grow after that */ # define DB_USING_FINE_LOCKING(TB) (((TB))->common.type & DB_FINE_LOCKED) #ifdef ETHR_ORDERED_READ_DEPEND #define SEGTAB(tb) ((struct segment**) erts_atomic_read_nob(&(tb)->segtab)) #else #define SEGTAB(tb) \ (DB_USING_FINE_LOCKING(tb) \ ? ((struct segment**) erts_atomic_read_ddrb(&(tb)->segtab)) \ : ((struct segment**) erts_atomic_read_nob(&(tb)->segtab))) #endif #define NACTIVE(tb) ((int)erts_atomic_read_nob(&(tb)->nactive)) #define NITEMS(tb) ((int)erts_atomic_read_nob(&(tb)->common.nitems)) #define SLOT_IX_TO_SEG_IX(i) (((i)+(EXT_SEGSZ-FIRST_SEGSZ)) >> EXT_SEGSZ_EXP) #define BUCKET(tb, i) SEGTAB(tb)[SLOT_IX_TO_SEG_IX(i)]->buckets[(i) & EXT_SEGSZ_MASK] /* * When deleting a table, the number of records to delete. * Approximate number, because we must delete entire buckets. */ #define DELETE_RECORD_LIMIT 10000 /* Calculate slot index from hash value. ** RLOCK_HASH or WLOCK_HASH must be done before. */ static ERTS_INLINE Uint hash_to_ix(DbTableHash* tb, HashValue hval) { Uint mask = (DB_USING_FINE_LOCKING(tb) ? erts_atomic_read_acqb(&tb->szm) : erts_atomic_read_nob(&tb->szm)); Uint ix = hval & mask; if (ix >= erts_atomic_read_nob(&tb->nactive)) { ix &= mask>>1; ASSERT(ix < erts_atomic_read_nob(&tb->nactive)); } return ix; } static ERTS_INLINE int link_fixdel(DbTableHash* tb, FixedDeletion* fixd, erts_aint_t fixated_by_me) { erts_aint_t was_next; erts_aint_t exp_next; was_next = erts_atomic_read_acqb(&tb->fixdel); do { /* Lockless atomic insertion in linked list: */ if (NFIXED(tb) <= fixated_by_me) { erts_db_free(ERTS_ALC_T_DB_FIX_DEL, (DbTable*)tb, fixd, sizeof(FixedDeletion)); return 0; /* raced by unfixer */ } exp_next = was_next; fixd->next = (FixedDeletion*) exp_next; was_next = erts_atomic_cmpxchg_mb(&tb->fixdel, (erts_aint_t) fixd, exp_next); }while (was_next != exp_next); return 1; } /* Remember a slot containing a pseudo-deleted item * Return false if we got raced by unfixing thread * and the object should be deleted for real. */ static int add_fixed_deletion(DbTableHash* tb, int ix, erts_aint_t fixated_by_me) { FixedDeletion* fixd = (FixedDeletion*) erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL, (DbTable *) tb, sizeof(FixedDeletion)); ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion)); fixd->slot = ix; fixd->all = 0; return link_fixdel(tb, fixd, fixated_by_me); } static ERTS_INLINE int is_pseudo_deleted(HashDbTerm* p) { return p->pseudo_deleted; } /* optimised version of make_hash (normal case? atomic key) */ #define MAKE_HASH(term) \ ((is_atom(term) ? (atom_tab(atom_val(term))->slot.bucket.hvalue) : \ make_internal_hash(term, 0)) & MAX_HASH_MASK) # define DB_HASH_LOCK_MASK (DB_HASH_LOCK_CNT-1) # define GET_LOCK(tb,hval) (&(tb)->locks->lck_vec[(hval) & DB_HASH_LOCK_MASK].lck) # define GET_LOCK_MAYBE(tb,hval) ((tb)->common.is_thread_safe ? NULL : GET_LOCK(tb,hval)) /* Fine grained read lock */ static ERTS_INLINE erts_rwmtx_t* RLOCK_HASH(DbTableHash* tb, HashValue hval) { if (tb->common.is_thread_safe) { return NULL; } else { erts_rwmtx_t* lck = GET_LOCK(tb,hval); ASSERT(tb->common.type & DB_FINE_LOCKED); erts_rwmtx_rlock(lck); return lck; } } /* Fine grained write lock */ static ERTS_INLINE erts_rwmtx_t* WLOCK_HASH(DbTableHash* tb, HashValue hval) { if (tb->common.is_thread_safe) { return NULL; } else { erts_rwmtx_t* lck = GET_LOCK(tb,hval); ASSERT(tb->common.type & DB_FINE_LOCKED); erts_rwmtx_rwlock(lck); return lck; } } static ERTS_INLINE void RUNLOCK_HASH(erts_rwmtx_t* lck) { if (lck != NULL) { erts_rwmtx_runlock(lck); } } static ERTS_INLINE void WUNLOCK_HASH(erts_rwmtx_t* lck) { if (lck != NULL) { erts_rwmtx_rwunlock(lck); } } #ifdef ERTS_ENABLE_LOCK_CHECK # define IFN_EXCL(tb,cmd) (((tb)->common.is_thread_safe) || (cmd)) # define IS_HASH_RLOCKED(tb,hval) IFN_EXCL(tb,erts_lc_rwmtx_is_rlocked(GET_LOCK(tb,hval))) # define IS_HASH_WLOCKED(tb,lck) IFN_EXCL(tb,erts_lc_rwmtx_is_rwlocked(lck)) # define IS_TAB_WLOCKED(tb) erts_lc_rwmtx_is_rwlocked(&(tb)->common.rwlock) #else # define IS_HASH_RLOCKED(tb,hval) (1) # define IS_HASH_WLOCKED(tb,hval) (1) # define IS_TAB_WLOCKED(tb) (1) #endif /* Iteration helper ** Returns "next" slot index or 0 if EOT reached. ** Slot READ locks updated accordingly, unlocked if EOT. */ static ERTS_INLINE Sint next_slot(DbTableHash* tb, Uint ix, erts_rwmtx_t** lck_ptr) { ix += DB_HASH_LOCK_CNT; if (ix < NACTIVE(tb)) return ix; RUNLOCK_HASH(*lck_ptr); ix = (ix + 1) & DB_HASH_LOCK_MASK; if (ix != 0) *lck_ptr = RLOCK_HASH(tb,ix); return ix; } /* Same as next_slot but with WRITE locking */ static ERTS_INLINE Sint next_slot_w(DbTableHash* tb, Uint ix, erts_rwmtx_t** lck_ptr) { ix += DB_HASH_LOCK_CNT; if (ix < NACTIVE(tb)) return ix; WUNLOCK_HASH(*lck_ptr); ix = (ix + 1) & DB_HASH_LOCK_MASK; if (ix != 0) *lck_ptr = WLOCK_HASH(tb,ix); return ix; } static ERTS_INLINE void free_term(DbTableHash *tb, HashDbTerm* p) { db_free_term((DbTable*)tb, p, offsetof(HashDbTerm, dbterm)); } static ERTS_INLINE void free_term_list(DbTableHash *tb, HashDbTerm* p) { while (p) { HashDbTerm* next = p->next; free_term(tb, p); p = next; } } /* * Local types */ struct mp_prefound { HashDbTerm** bucket; int ix; }; struct mp_info { int something_can_match; /* The match_spec is not "impossible" */ int key_given; struct mp_prefound dlists[10]; /* Default list of "pre-found" buckets */ struct mp_prefound* lists; /* Buckets to search if keys are given, * = dlists initially */ unsigned num_lists; /* Number of elements in "lists", * = 0 initially */ Binary *mp; /* The compiled match program */ }; /* A table segment */ struct segment { HashDbTerm* buckets[1]; }; #define SIZEOF_SEGMENT(N) \ (offsetof(struct segment,buckets) + sizeof(HashDbTerm*)*(N)) /* An extended segment table */ struct ext_segtab { ErtsThrPrgrLaterOp lop; struct segment** prev_segtab; /* Used when table is shrinking */ int prev_nsegs; /* Size of prev_segtab */ int nsegs; /* Size of this segtab */ struct segment* segtab[1]; /* The segment table */ }; #define SIZEOF_EXT_SEGTAB(NSEGS) \ (offsetof(struct ext_segtab,segtab) + sizeof(struct segment*)*(NSEGS)) static ERTS_INLINE void SET_SEGTAB(DbTableHash* tb, struct segment** segtab) { if (DB_USING_FINE_LOCKING(tb)) erts_atomic_set_wb(&tb->segtab, (erts_aint_t) segtab); else erts_atomic_set_nob(&tb->segtab, (erts_aint_t) segtab); } /* Used by select_replace on analyze_pattern */ typedef int (*extra_match_validator_t)(int keypos, Eterm match, Eterm guard, Eterm body); /* ** Forward decl's (static functions) */ static struct ext_segtab* alloc_ext_segtab(DbTableHash* tb, unsigned seg_ix); static void alloc_seg(DbTableHash *tb); static int free_seg(DbTableHash *tb, int free_records); static HashDbTerm* next_live(DbTableHash *tb, Uint *iptr, erts_rwmtx_t** lck_ptr, HashDbTerm *list); static HashDbTerm* search_list(DbTableHash* tb, Eterm key, HashValue hval, HashDbTerm *list); static void shrink(DbTableHash* tb, int nitems); static void grow(DbTableHash* tb, int nitems); static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2, Uint sz, DbTableHash*); static int analyze_pattern(DbTableHash *tb, Eterm pattern, extra_match_validator_t extra_validator, /* Optional callback */ struct mp_info *mpi); /* * Method interface functions */ static int db_first_hash(Process *p, DbTable *tbl, Eterm *ret); static int db_next_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret); static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret); static int db_get_element_hash(Process *p, DbTable *tbl, Eterm key, int ndex, Eterm *ret); static int db_erase_object_hash(DbTable *tbl, Eterm object,Eterm *ret); static int db_slot_hash(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret); static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, int reverse, Eterm *ret); static int db_select_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, int reverse, Eterm *ret); static int db_select_continue_hash(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret); static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret); static int db_select_count_continue_hash(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret); static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret); static int db_select_delete_continue_hash(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret); static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret); static int db_select_replace_continue_hash(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret); static int db_take_hash(Process *, DbTable *, Eterm, Eterm *); static void db_print_hash(fmtfn_t to, void *to_arg, int show, DbTable *tbl); static int db_free_table_hash(DbTable *tbl); static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds); static void db_foreach_offheap_hash(DbTable *, void (*)(ErlOffHeap *, void *), void *); static SWord db_delete_all_objects_hash(Process* p, DbTable* tbl, SWord reds); #ifdef HARDDEBUG static void db_check_table_hash(DbTableHash *tb); #endif static int db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj, DbUpdateHandle* handle); static void db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle); static ERTS_INLINE void try_shrink(DbTableHash* tb) { int nactive = NACTIVE(tb); int nitems = NITEMS(tb); if (nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive) && !IS_FIXED(tb)) { shrink(tb, nitems); } } /* Is this a live object (not pseodo-deleted) with the specified key? */ static ERTS_INLINE int has_live_key(DbTableHash* tb, HashDbTerm* b, Eterm key, HashValue hval) { if (b->hvalue != hval || is_pseudo_deleted(b)) return 0; else { Eterm itemKey = GETKEY(tb, b->dbterm.tpl); ASSERT(!is_header(itemKey)); return EQ(key, itemKey); } } /* Has this object the specified key? Can be pseudo-deleted. */ static ERTS_INLINE int has_key(DbTableHash* tb, HashDbTerm* b, Eterm key, HashValue hval) { if (b->hvalue != hval) return 0; else { Eterm itemKey = GETKEY(tb, b->dbterm.tpl); ASSERT(!is_header(itemKey)); return EQ(key, itemKey); } } static ERTS_INLINE HashDbTerm* new_dbterm(DbTableHash* tb, Eterm obj) { HashDbTerm* p; if (tb->common.compress) { p = db_store_term_comp(&tb->common, NULL, offsetof(HashDbTerm,dbterm), obj); } else { p = db_store_term(&tb->common, NULL, offsetof(HashDbTerm,dbterm), obj); } return p; } static ERTS_INLINE HashDbTerm* replace_dbterm(DbTableHash* tb, HashDbTerm* old, Eterm obj) { HashDbTerm* ret; ASSERT(old != NULL); if (tb->common.compress) { ret = db_store_term_comp(&tb->common, &(old->dbterm), offsetof(HashDbTerm,dbterm), obj); } else { ret = db_store_term(&tb->common, &(old->dbterm), offsetof(HashDbTerm,dbterm), obj); } return ret; } /* ** External interface */ DbTableMethod db_hash = { db_create_hash, db_first_hash, db_next_hash, db_first_hash, /* last == first */ db_next_hash, /* prev == next */ db_put_hash, db_get_hash, db_get_element_hash, db_member_hash, db_erase_hash, db_erase_object_hash, db_slot_hash, db_select_chunk_hash, db_select_hash, db_select_delete_hash, db_select_continue_hash, /* hmm continue_hash? */ db_select_delete_continue_hash, db_select_count_hash, db_select_count_continue_hash, db_select_replace_hash, db_select_replace_continue_hash, db_take_hash, db_delete_all_objects_hash, db_free_table_hash, db_free_table_continue_hash, db_print_hash, db_foreach_offheap_hash, db_lookup_dbterm_hash, db_finalize_dbterm_hash }; #ifdef DEBUG /* Wait a while to provoke race and get code coverage */ static void DEBUG_WAIT(void) { unsigned long spin = 1UL << 20; while (--spin); } #else # define DEBUG_WAIT() #endif /* Rare case of restoring the rest of the fixdel list when "unfixer" gets interrupted by "fixer" */ static void restore_fixdel(DbTableHash* tb, FixedDeletion* fixdel) { /*int tries = 0;*/ DEBUG_WAIT(); if (erts_atomic_cmpxchg_relb(&tb->fixdel, (erts_aint_t) fixdel, (erts_aint_t) NULL) != (erts_aint_t) NULL) { /* Oboy, must join lists */ FixedDeletion* last = fixdel; erts_aint_t was_tail; erts_aint_t exp_tail; while (last->next != NULL) last = last->next; was_tail = erts_atomic_read_acqb(&tb->fixdel); do { /* Lockless atomic list insertion */ exp_tail = was_tail; last->next = (FixedDeletion*) exp_tail; /*++tries;*/ DEBUG_WAIT(); was_tail = erts_atomic_cmpxchg_relb(&tb->fixdel, (erts_aint_t) fixdel, exp_tail); }while (was_tail != exp_tail); } /*erts_fprintf(stderr,"erl_db_hash: restore_fixdel tries=%d\r\n", tries);*/ } /* ** Table interface routines ie what's called by the bif's */ SWord db_unfix_table_hash(DbTableHash *tb) { FixedDeletion* fixdel; SWord work = 0; ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock) || (erts_lc_rwmtx_is_rlocked(&tb->common.rwlock) && !tb->common.is_thread_safe)); restart: fixdel = (FixedDeletion*) erts_atomic_xchg_mb(&tb->fixdel, (erts_aint_t) NULL); while (fixdel) { FixedDeletion *free_me; do { HashDbTerm **bp; HashDbTerm *b; HashDbTerm *free_us = NULL; erts_rwmtx_t* lck; lck = WLOCK_HASH(tb, fixdel->slot); if (IS_FIXED(tb)) { /* interrupted by fixer */ WUNLOCK_HASH(lck); restore_fixdel(tb,fixdel); if (!IS_FIXED(tb)) { goto restart; /* unfixed again! */ } return work; } if (fixdel->slot < NACTIVE(tb)) { bp = &BUCKET(tb, fixdel->slot); b = *bp; while (b != NULL) { if (is_pseudo_deleted(b)) { HashDbTerm* nxt = b->next; b->next = free_us; free_us = b; work++; b = *bp = nxt; } else { bp = &b->next; b = b->next; } } } /* else slot has been joined and purged by shrink() */ WUNLOCK_HASH(lck); free_term_list(tb, free_us); }while (fixdel->all && fixdel->slot-- > 0); free_me = fixdel; fixdel = fixdel->next; erts_db_free(ERTS_ALC_T_DB_FIX_DEL, (DbTable *) tb, (void *) free_me, sizeof(FixedDeletion)); ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion)); work++; } /* ToDo: Maybe try grow/shrink the table as well */ return work; } int db_create_hash(Process *p, DbTable *tbl) { DbTableHash *tb = &tbl->hash; erts_atomic_init_nob(&tb->szm, FIRST_SEGSZ_MASK); erts_atomic_init_nob(&tb->nactive, FIRST_SEGSZ); erts_atomic_init_nob(&tb->fixdel, (erts_aint_t)NULL); erts_atomic_init_nob(&tb->segtab, (erts_aint_t)NULL); SET_SEGTAB(tb, tb->first_segtab); tb->nsegs = NSEG_1; tb->nslots = FIRST_SEGSZ; tb->first_segtab[0] = (struct segment*) erts_db_alloc(ERTS_ALC_T_DB_SEG, (DbTable *) tb, SIZEOF_SEGMENT(FIRST_SEGSZ)); sys_memset(tb->first_segtab[0], 0, SIZEOF_SEGMENT(FIRST_SEGSZ)); erts_atomic_init_nob(&tb->is_resizing, 0); if (tb->common.type & DB_FINE_LOCKED) { erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER; int i; if (tb->common.type & DB_FREQ_READ) rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ; if (erts_ets_rwmtx_spin_count >= 0) rwmtx_opt.main_spincount = erts_ets_rwmtx_spin_count; tb->locks = (DbTableHashFineLocks*) erts_db_alloc(ERTS_ALC_T_DB_SEG, /* Other type maybe? */ (DbTable *) tb, sizeof(DbTableHashFineLocks)); for (i=0; ilocks->lck_vec[i].lck, &rwmtx_opt, "db_hash_slot", tb->common.the_name, ERTS_LOCK_FLAGS_CATEGORY_DB); } /* This important property is needed to guarantee the two buckets * involved in a grow/shrink operation it protected by the same lock: */ ASSERT(erts_atomic_read_nob(&tb->nactive) % DB_HASH_LOCK_CNT == 0); } else { /* coarse locking */ tb->locks = NULL; } ERTS_THR_MEMORY_BARRIER; return DB_ERROR_NONE; } static int db_first_hash(Process *p, DbTable *tbl, Eterm *ret) { DbTableHash *tb = &tbl->hash; Uint ix = 0; erts_rwmtx_t* lck = RLOCK_HASH(tb,ix); HashDbTerm* list; list = BUCKET(tb,ix); list = next_live(tb, &ix, &lck, list); if (list != NULL) { *ret = db_copy_key(p, tbl, &list->dbterm); RUNLOCK_HASH(lck); } else { *ret = am_EOT; } return DB_ERROR_NONE; } static int db_next_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret) { DbTableHash *tb = &tbl->hash; HashValue hval; Uint ix; HashDbTerm* b; erts_rwmtx_t* lck; hval = MAKE_HASH(key); lck = RLOCK_HASH(tb,hval); ix = hash_to_ix(tb, hval); b = BUCKET(tb, ix); for (;;) { if (b == NULL) { RUNLOCK_HASH(lck); return DB_ERROR_BADKEY; } if (has_key(tb, b, key, hval)) { break; } b = b->next; } /* Key found */ b = next_live(tb, &ix, &lck, b->next); if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) { while (b != 0) { if (!has_live_key(tb, b, key, hval)) { break; } b = next_live(tb, &ix, &lck, b->next); } } if (b == NULL) { *ret = am_EOT; } else { *ret = db_copy_key(p, tbl, &b->dbterm); RUNLOCK_HASH(lck); } return DB_ERROR_NONE; } int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail) { DbTableHash *tb = &tbl->hash; HashValue hval; int ix; Eterm key; HashDbTerm** bp; HashDbTerm* b; HashDbTerm* q; erts_rwmtx_t* lck; int nitems; int ret = DB_ERROR_NONE; key = GETKEY(tb, tuple_val(obj)); hval = MAKE_HASH(key); lck = WLOCK_HASH(tb, hval); ix = hash_to_ix(tb, hval); bp = &BUCKET(tb, ix); b = *bp; for (;;) { if (b == NULL) { goto Lnew; } if (has_key(tb,b,key,hval)) { break; } bp = &b->next; b = b->next; } /* Key found */ if (tb->common.status & DB_SET) { HashDbTerm* bnext = b->next; if (is_pseudo_deleted(b)) { erts_atomic_inc_nob(&tb->common.nitems); b->pseudo_deleted = 0; } else if (key_clash_fail) { ret = DB_ERROR_BADKEY; goto Ldone; } q = replace_dbterm(tb, b, obj); q->next = bnext; ASSERT(q->hvalue == hval); *bp = q; goto Ldone; } else if (key_clash_fail) { /* && (DB_BAG || DB_DUPLICATE_BAG) */ q = b; do { if (!is_pseudo_deleted(q)) { ret = DB_ERROR_BADKEY; goto Ldone; } q = q->next; }while (q != NULL && has_key(tb,q,key,hval)); } else if (tb->common.status & DB_BAG) { HashDbTerm** qp = bp; q = b; do { if (db_eq(&tb->common,obj,&q->dbterm)) { if (is_pseudo_deleted(q)) { erts_atomic_inc_nob(&tb->common.nitems); q->pseudo_deleted = 0; ASSERT(q->hvalue == hval); if (q != b) { /* must move to preserve key insertion order */ *qp = q->next; q->next = b; *bp = q; } } goto Ldone; } qp = &q->next; q = *qp; }while (q != NULL && has_key(tb,q,key,hval)); } /*else DB_DUPLICATE_BAG */ Lnew: q = new_dbterm(tb, obj); q->hvalue = hval; q->pseudo_deleted = 0; q->next = b; *bp = q; nitems = erts_atomic_inc_read_nob(&tb->common.nitems); WUNLOCK_HASH(lck); { int nactive = NACTIVE(tb); if (nitems > GROW_LIMIT(nactive) && !IS_FIXED(tb)) { grow(tb, nitems); } } return DB_ERROR_NONE; Ldone: WUNLOCK_HASH(lck); return ret; } static Eterm get_term_list(Process *p, DbTableHash *tb, Eterm key, HashValue hval, HashDbTerm *b1, HashDbTerm **bend) { HashDbTerm* b2 = b1->next; Eterm copy; Uint sz = b1->dbterm.size + 2; if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) { while (b2 && has_key(tb, b2, key, hval)) { if (!is_pseudo_deleted(b2)) sz += b2->dbterm.size + 2; b2 = b2->next; } } copy = build_term_list(p, b1, b2, sz, tb); if (bend) { *bend = b2; } return copy; } int db_get_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret) { DbTableHash *tb = &tbl->hash; HashValue hval; int ix; HashDbTerm* b; erts_rwmtx_t* lck; hval = MAKE_HASH(key); lck = RLOCK_HASH(tb,hval); ix = hash_to_ix(tb, hval); b = BUCKET(tb, ix); while(b != 0) { if (has_live_key(tb, b, key, hval)) { *ret = get_term_list(p, tb, key, hval, b, NULL); goto done; } b = b->next; } *ret = NIL; done: RUNLOCK_HASH(lck); return DB_ERROR_NONE; } static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret) { DbTableHash *tb = &tbl->hash; HashValue hval; int ix; HashDbTerm* b1; erts_rwmtx_t* lck; hval = MAKE_HASH(key); ix = hash_to_ix(tb, hval); lck = RLOCK_HASH(tb, hval); b1 = BUCKET(tb, ix); while(b1 != 0) { if (has_live_key(tb,b1,key,hval)) { *ret = am_true; goto done; } b1 = b1->next; } *ret = am_false; done: RUNLOCK_HASH(lck); return DB_ERROR_NONE; } static int db_get_element_hash(Process *p, DbTable *tbl, Eterm key, int ndex, Eterm *ret) { DbTableHash *tb = &tbl->hash; HashValue hval; int ix; HashDbTerm* b1; erts_rwmtx_t* lck; int retval; hval = MAKE_HASH(key); lck = RLOCK_HASH(tb, hval); ix = hash_to_ix(tb, hval); b1 = BUCKET(tb, ix); while(b1 != 0) { if (has_live_key(tb,b1,key,hval)) { if (ndex > arityval(b1->dbterm.tpl[0])) { retval = DB_ERROR_BADITEM; goto done; } if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) { HashDbTerm* b; HashDbTerm* b2 = b1->next; Eterm elem_list = NIL; while(b2 != NULL && has_key(tb,b2,key,hval)) { if (ndex > arityval(b2->dbterm.tpl[0]) && !is_pseudo_deleted(b2)) { retval = DB_ERROR_BADITEM; goto done; } b2 = b2->next; } b = b1; while(b != b2) { if (!is_pseudo_deleted(b)) { Eterm *hp; Eterm copy = db_copy_element_from_ets(&tb->common, p, &b->dbterm, ndex, &hp, 2); elem_list = CONS(hp, copy, elem_list); } b = b->next; } *ret = elem_list; } else { Eterm* hp; *ret = db_copy_element_from_ets(&tb->common, p, &b1->dbterm, ndex, &hp, 0); } retval = DB_ERROR_NONE; goto done; } b1 = b1->next; } retval = DB_ERROR_BADKEY; done: RUNLOCK_HASH(lck); return retval; } /* ** NB, this is for the db_erase/2 bif. */ int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret) { DbTableHash *tb = &tbl->hash; HashValue hval; int ix; HashDbTerm** bp; HashDbTerm* b; HashDbTerm* free_us = NULL; erts_rwmtx_t* lck; int nitems_diff = 0; hval = MAKE_HASH(key); lck = WLOCK_HASH(tb,hval); ix = hash_to_ix(tb, hval); bp = &BUCKET(tb, ix); b = *bp; while(b != 0) { if (has_live_key(tb,b,key,hval)) { --nitems_diff; if (nitems_diff == -1 && IS_FIXED(tb) && add_fixed_deletion(tb, ix, 0)) { /* Pseudo remove (no need to keep several of same key) */ b->pseudo_deleted = 1; } else { HashDbTerm* next = b->next; b->next = free_us; free_us = b; b = *bp = next; continue; } } else { if (nitems_diff && !is_pseudo_deleted(b)) break; } bp = &b->next; b = b->next; } WUNLOCK_HASH(lck); if (nitems_diff) { erts_atomic_add_nob(&tb->common.nitems, nitems_diff); try_shrink(tb); } free_term_list(tb, free_us); *ret = am_true; return DB_ERROR_NONE; } /* ** This is for the ets:delete_object BIF */ static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret) { DbTableHash *tb = &tbl->hash; HashValue hval; int ix; HashDbTerm** bp; HashDbTerm* b; HashDbTerm* free_us = NULL; erts_rwmtx_t* lck; int nitems_diff = 0; int nkeys = 0; Eterm key; key = GETKEY(tb, tuple_val(object)); hval = MAKE_HASH(key); lck = WLOCK_HASH(tb,hval); ix = hash_to_ix(tb, hval); bp = &BUCKET(tb, ix); b = *bp; while(b != 0) { if (has_live_key(tb,b,key,hval)) { ++nkeys; if (db_eq(&tb->common,object, &b->dbterm)) { --nitems_diff; if (nkeys==1 && IS_FIXED(tb) && add_fixed_deletion(tb,ix,0)) { b->pseudo_deleted = 1; bp = &b->next; b = b->next; } else { HashDbTerm* next = b->next; b->next = free_us; free_us = b; b = *bp = next; } if (tb->common.status & (DB_DUPLICATE_BAG)) { continue; } else { break; } } } else if (nitems_diff && !is_pseudo_deleted(b)) { break; } bp = &b->next; b = b->next; } WUNLOCK_HASH(lck); if (nitems_diff) { erts_atomic_add_nob(&tb->common.nitems, nitems_diff); try_shrink(tb); } free_term_list(tb, free_us); *ret = am_true; return DB_ERROR_NONE; } static int db_slot_hash(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret) { DbTableHash *tb = &tbl->hash; erts_rwmtx_t* lck; Sint slot; int retval; int nactive; if (is_not_small(slot_term) || ((slot = signed_val(slot_term)) < 0)) { return DB_ERROR_BADPARAM; } lck = RLOCK_HASH(tb, slot); nactive = NACTIVE(tb); if (slot < nactive) { *ret = build_term_list(p, BUCKET(tb, slot), NULL, 0, tb); retval = DB_ERROR_NONE; } else if (slot == nactive) { *ret = am_EOT; retval = DB_ERROR_NONE; } else { retval = DB_ERROR_BADPARAM; } RUNLOCK_HASH(lck); return retval; } /* * Match traversal callbacks */ typedef struct match_callbacks_t_ match_callbacks_t; struct match_callbacks_t_ { /* Called when no match is possible. * context_ptr: Pointer to context * ret: Pointer to traversal function term return. * * Both the direct return value and 'ret' are used as the traversal function return values. */ int (*on_nothing_can_match)(match_callbacks_t* ctx, Eterm* ret); /* Called for each match result. * context_ptr: Pointer to context * slot_ix: Current slot index * current_ptr_ptr: Triple pointer to either the bucket or the 'next' pointer in the previous element; * can be (carefully) used to adjust iteration when deleting or replacing elements. * match_res: The result of running the match program against the current term. * * Should return 1 for successful match, 0 otherwise. */ int (*on_match_res)(match_callbacks_t* ctx, Sint slot_ix, HashDbTerm*** current_ptr_ptr, Eterm match_res); /* Called when either we've matched enough elements in this cycle or EOT was reached. * context_ptr: Pointer to context * slot_ix: Current slot index * got: How many elements have been matched so far * iterations_left: Number of intended iterations (down from an initial max.) left in this traversal cycle * mpp: Double pointer to the compiled match program * ret: Pointer to traversal function term return. * * Both the direct return value and 'ret' are used as the traversal function return values. * If *mpp is set to NULL, it won't be deallocated (useful for trapping.) */ int (*on_loop_ended)(match_callbacks_t* ctx, Sint slot_ix, Sint got, Sint iterations_left, Binary** mpp, Eterm* ret); /* Called when it's time to trap * context_ptr: Pointer to context * slot_ix: Current slot index * got: How many elements have been matched so far * mpp: Double pointer to the compiled match program * ret: Pointer to traversal function term return. * * Both the direct return value and 'ret' are used as the traversal function return values. * If *mpp is set to NULL, it won't be deallocated (useful for trapping.) */ int (*on_trap)(match_callbacks_t* ctx, Sint slot_ix, Sint got, Binary** mpp, Eterm* ret); }; /* * Begin hash table match traversal */ static int match_traverse(Process* p, DbTableHash* tb, Eterm pattern, extra_match_validator_t extra_match_validator, /* Optional */ Sint chunk_size, /* If 0, no chunking */ Sint iterations_left, /* Nr. of iterations left */ Eterm** hpp, /* Heap */ int lock_for_write, /* Set to 1 if we're going to delete or modify existing terms */ match_callbacks_t* ctx, Eterm* ret) { Sint slot_ix; /* Slot index */ HashDbTerm** current_ptr; /* Refers to either the bucket pointer or * the 'next' pointer in the previous term */ HashDbTerm* saved_current; /* Helper to avoid double skip on match */ struct mp_info mpi; unsigned current_list_pos = 0; /* Prefound buckets list index */ Eterm match_res; Sint got = 0; /* Matched terms counter */ erts_rwmtx_t* lck; /* Slot lock */ int ret_value; erts_rwmtx_t* (*lock_hash_function)(DbTableHash*, HashValue) = (lock_for_write ? WLOCK_HASH : RLOCK_HASH); void (*unlock_hash_function)(erts_rwmtx_t*) = (lock_for_write ? WUNLOCK_HASH : RUNLOCK_HASH); Sint (*next_slot_function)(DbTableHash*, Uint, erts_rwmtx_t**) = (lock_for_write ? next_slot_w : next_slot); if ((ret_value = analyze_pattern(tb, pattern, extra_match_validator, &mpi)) != DB_ERROR_NONE) { *ret = NIL; goto done; } if (!mpi.something_can_match) { /* Can't possibly match anything */ ret_value = ctx->on_nothing_can_match(ctx, ret); goto done; } /* * Look for initial slot / bucket */ if (!mpi.key_given) { /* Run this code if pattern is variable or GETKEY(pattern) */ /* is a variable */ slot_ix = 0; lck = lock_hash_function(tb,slot_ix); for (;;) { ASSERT(slot_ix < NACTIVE(tb)); if (*(current_ptr = &BUCKET(tb,slot_ix)) != NULL) { break; } slot_ix = next_slot_function(tb,slot_ix,&lck); if (slot_ix == 0) { ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, &mpi.mp, ret); goto done; } } } else { /* We have at least one */ slot_ix = mpi.lists[current_list_pos].ix; lck = lock_hash_function(tb, slot_ix); current_ptr = mpi.lists[current_list_pos].bucket; ASSERT(*current_ptr == BUCKET(tb,slot_ix)); ++current_list_pos; } /* * Execute traversal cycle */ for(;;) { if (*current_ptr != NULL) { if (!is_pseudo_deleted(*current_ptr)) { match_res = db_match_dbterm(&tb->common, p, mpi.mp, &(*current_ptr)->dbterm, hpp, 2); saved_current = *current_ptr; if (ctx->on_match_res(ctx, slot_ix, ¤t_ptr, match_res)) { ++got; } --iterations_left; if (*current_ptr != saved_current) { /* Don't advance to next, the callback did it already */ continue; } } current_ptr = &((*current_ptr)->next); } else if (mpi.key_given) { /* Key is bound */ unlock_hash_function(lck); if (current_list_pos == mpi.num_lists) { ret_value = ctx->on_loop_ended(ctx, -1, got, iterations_left, &mpi.mp, ret); goto done; } else { slot_ix = mpi.lists[current_list_pos].ix; lck = lock_hash_function(tb, slot_ix); current_ptr = mpi.lists[current_list_pos].bucket; ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix)); ++current_list_pos; } } else { /* Key is variable */ if ((slot_ix = next_slot_function(tb,slot_ix,&lck)) == 0) { slot_ix = -1; break; } if (chunk_size && got >= chunk_size) { unlock_hash_function(lck); break; } if (iterations_left <= 0 || MBUF(p)) { /* * We have either reached our limit, or just created some heap fragments. * Since many heap fragments will make the GC slower, trap and GC now. */ unlock_hash_function(lck); ret_value = ctx->on_trap(ctx, slot_ix, got, &mpi.mp, ret); goto done; } current_ptr = &BUCKET(tb,slot_ix); } } ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, &mpi.mp, ret); done: /* We should only jump directly to this label if * we've already called ctx->nothing_can_match / loop_ended / trap */ if (mpi.mp != NULL) { erts_bin_free(mpi.mp); } if (mpi.lists != mpi.dlists) { erts_free(ERTS_ALC_T_DB_SEL_LIST, (void *) mpi.lists); } return ret_value; } /* * Continue hash table match traversal */ static int match_traverse_continue(Process* p, DbTableHash* tb, Sint chunk_size, /* If 0, no chunking */ Sint iterations_left, /* Nr. of iterations left */ Eterm** hpp, /* Heap */ Sint slot_ix, /* Slot index to resume traversal from */ Sint got, /* Matched terms counter */ Binary** mpp, /* Existing match program */ int lock_for_write, /* Set to 1 if we're going to delete or modify existing terms */ match_callbacks_t* ctx, Eterm* ret) { HashDbTerm** current_ptr; /* Refers to either the bucket pointer or * the 'next' pointer in the previous term */ HashDbTerm* saved_current; /* Helper to avoid double skip on match */ Eterm match_res; erts_rwmtx_t* lck; int ret_value; erts_rwmtx_t* (*lock_hash_function)(DbTableHash*, HashValue) = (lock_for_write ? WLOCK_HASH : RLOCK_HASH); void (*unlock_hash_function)(erts_rwmtx_t*) = (lock_for_write ? WUNLOCK_HASH : RUNLOCK_HASH); Sint (*next_slot_function)(DbTableHash* tb, Uint ix, erts_rwmtx_t** lck_ptr) = (lock_for_write ? next_slot_w : next_slot); if (got < 0) { *ret = NIL; return DB_ERROR_BADPARAM; } if (slot_ix < 0 /* EOT */ || (chunk_size && got >= chunk_size)) { /* Already got all or enough in the match_list */ ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, mpp, ret); goto done; } lck = lock_hash_function(tb, slot_ix); if (slot_ix >= NACTIVE(tb)) { /* Is this possible? */ unlock_hash_function(lck); *ret = NIL; ret_value = DB_ERROR_BADPARAM; goto done; } /* * Resume traversal cycle from where we left */ current_ptr = &BUCKET(tb,slot_ix); for(;;) { if (*current_ptr != NULL) { if (!is_pseudo_deleted(*current_ptr)) { match_res = db_match_dbterm(&tb->common, p, *mpp, &(*current_ptr)->dbterm, hpp, 2); saved_current = *current_ptr; if (ctx->on_match_res(ctx, slot_ix, ¤t_ptr, match_res)) { ++got; } --iterations_left; if (*current_ptr != saved_current) { /* Don't advance to next, the callback did it already */ continue; } } current_ptr = &((*current_ptr)->next); } else { if ((slot_ix=next_slot_function(tb,slot_ix,&lck)) == 0) { slot_ix = -1; break; } if (chunk_size && got >= chunk_size) { unlock_hash_function(lck); break; } if (iterations_left <= 0 || MBUF(p)) { /* * We have either reached our limit, or just created some heap fragments. * Since many heap fragments will make the GC slower, trap and GC now. */ unlock_hash_function(lck); ret_value = ctx->on_trap(ctx, slot_ix, got, mpp, ret); goto done; } current_ptr = &BUCKET(tb,slot_ix); } } ret_value = ctx->on_loop_ended(ctx, slot_ix, got, iterations_left, mpp, ret); done: /* We should only jump directly to this label if * we've already called on_loop_ended / on_trap */ return ret_value; } /* * Common traversal trapping/continuation code; * used by select_count, select_delete and select_replace, * as well as their continuation-handling counterparts. */ static ERTS_INLINE int on_simple_trap(Export* trap_function, Process* p, DbTableHash* tb, Eterm tid, Eterm* prev_continuation_tptr, Sint slot_ix, Sint got, Binary** mpp, Eterm* ret) { Eterm* hp; Eterm egot; Eterm mpb; Eterm continuation; int is_first_trap = (prev_continuation_tptr == NULL); size_t base_halloc_sz = (is_first_trap ? ERTS_MAGIC_REF_THING_SIZE : 0); BUMP_ALL_REDS(p); if (IS_USMALL(0, got)) { hp = HAllocX(p, base_halloc_sz + 5, ERTS_MAGIC_REF_THING_SIZE); egot = make_small(got); } else { hp = HAllocX(p, base_halloc_sz + BIG_UINT_HEAP_SIZE + 5, ERTS_MAGIC_REF_THING_SIZE); egot = uint_to_big(got, hp); hp += BIG_UINT_HEAP_SIZE; } if (is_first_trap) { if (is_atom(tid)) tid = erts_db_make_tid(p, &tb->common); mpb = erts_db_make_match_prog_ref(p, *mpp, &hp); *mpp = NULL; /* otherwise the caller will destroy it */ } else { ASSERT(!is_atom(tid)); mpb = prev_continuation_tptr[3]; } continuation = TUPLE4( hp, tid, make_small(slot_ix), mpb, egot); ERTS_BIF_PREP_TRAP1(*ret, trap_function, p, continuation); return DB_ERROR_NONE; } static ERTS_INLINE int unpack_simple_continuation(Eterm continuation, Eterm** tptr_ptr, Eterm* tid_ptr, Sint* slot_ix_p, Binary** mpp, Sint* got_p) { Eterm* tptr; ASSERT(is_tuple(continuation)); tptr = tuple_val(continuation); if (arityval(*tptr) != 4) return 1; if (! is_small(tptr[2]) || !(is_big(tptr[4]) || is_small(tptr[4]))) { return 1; } *tptr_ptr = tptr; *tid_ptr = tptr[1]; *slot_ix_p = unsigned_val(tptr[2]); *mpp = erts_db_get_match_prog_binary_unchecked(tptr[3]); if (is_big(tptr[4])) { *got_p = big_to_uint32(tptr[4]); } else { *got_p = unsigned_val(tptr[4]); } return 0; } /* * * select / select_chunk match traversal * */ #define MAX_SELECT_CHUNK_ITERATIONS 1000 typedef struct { match_callbacks_t base; Process* p; DbTableHash* tb; Eterm tid; Eterm* hp; Sint chunk_size; Eterm match_list; Eterm* prev_continuation_tptr; } select_chunk_context_t; static int select_chunk_on_nothing_can_match(match_callbacks_t* ctx_base, Eterm* ret) { select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base; *ret = (ctx->chunk_size > 0 ? am_EOT : NIL); return DB_ERROR_NONE; } static int select_chunk_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix, HashDbTerm*** current_ptr_ptr, Eterm match_res) { select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base; if (is_value(match_res)) { ctx->match_list = CONS(ctx->hp, match_res, ctx->match_list); return 1; } return 0; } static int select_chunk_on_loop_ended(match_callbacks_t* ctx_base, Sint slot_ix, Sint got, Sint iterations_left, Binary** mpp, Eterm* ret) { select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base; Eterm mpb; if (iterations_left == MAX_SELECT_CHUNK_ITERATIONS) { /* We didn't get to iterate a single time, which means EOT */ ASSERT(ctx->match_list == NIL); *ret = (ctx->chunk_size > 0 ? am_EOT : NIL); return DB_ERROR_NONE; } else { ASSERT(iterations_left < MAX_SELECT_CHUNK_ITERATIONS); BUMP_REDS(ctx->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left); if (ctx->chunk_size) { Eterm continuation; Eterm rest = NIL; Sint rest_size = 0; if (got > ctx->chunk_size) { /* Split list in return value and 'rest' */ Eterm tmp = ctx->match_list; rest = ctx->match_list; while (got-- > ctx->chunk_size + 1) { tmp = CDR(list_val(tmp)); ++rest_size; } ++rest_size; ctx->match_list = CDR(list_val(tmp)); CDR(list_val(tmp)) = NIL; /* Destructive, the list has never been in 'user space' */ } if (rest != NIL || slot_ix >= 0) { /* Need more calls */ Eterm tid = ctx->tid; ctx->hp = HAllocX(ctx->p, 3 + 7 + ERTS_MAGIC_REF_THING_SIZE, ERTS_MAGIC_REF_THING_SIZE); mpb = erts_db_make_match_prog_ref(ctx->p, *mpp, &ctx->hp); if (is_atom(tid)) tid = erts_db_make_tid(ctx->p, &ctx->tb->common); continuation = TUPLE6( ctx->hp, tid, make_small(slot_ix), make_small(ctx->chunk_size), mpb, rest, make_small(rest_size)); *mpp = NULL; /* Otherwise the caller will destroy it */ ctx->hp += 7; *ret = TUPLE2(ctx->hp, ctx->match_list, continuation); return DB_ERROR_NONE; } else { /* All data is exhausted */ if (ctx->match_list != NIL) { /* No more data to search but still a result to return to the caller */ ctx->hp = HAlloc(ctx->p, 3); *ret = TUPLE2(ctx->hp, ctx->match_list, am_EOT); return DB_ERROR_NONE; } else { /* Reached the end of the ttable with no data to return */ *ret = am_EOT; return DB_ERROR_NONE; } } } *ret = ctx->match_list; return DB_ERROR_NONE; } } static int select_chunk_on_trap(match_callbacks_t* ctx_base, Sint slot_ix, Sint got, Binary** mpp, Eterm* ret) { select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base; Eterm mpb; Eterm continuation; Eterm* hp; BUMP_ALL_REDS(ctx->p); if (ctx->prev_continuation_tptr == NULL) { Eterm tid = ctx->tid; /* First time we're trapping */ hp = HAllocX(ctx->p, 7 + ERTS_MAGIC_REF_THING_SIZE, ERTS_MAGIC_REF_THING_SIZE); if (is_atom(tid)) tid = erts_db_make_tid(ctx->p, &ctx->tb->common); mpb = erts_db_make_match_prog_ref(ctx->p, *mpp, &hp); continuation = TUPLE6( hp, tid, make_small(slot_ix), make_small(ctx->chunk_size), mpb, ctx->match_list, make_small(got)); *mpp = NULL; /* otherwise the caller will destroy it */ } else { /* Not the first time we're trapping; reuse continuation terms */ hp = HAlloc(ctx->p, 7); continuation = TUPLE6( hp, ctx->prev_continuation_tptr[1], make_small(slot_ix), ctx->prev_continuation_tptr[3], ctx->prev_continuation_tptr[4], ctx->match_list, make_small(got)); } ERTS_BIF_PREP_TRAP1(*ret, &ets_select_continue_exp, ctx->p, continuation); return DB_ERROR_NONE; } static int db_select_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, int reverse, Eterm *ret) { return db_select_chunk_hash(p, tbl, tid, pattern, 0, reverse, ret); } static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Sint chunk_size, int reverse, Eterm *ret) { select_chunk_context_t ctx; ctx.base.on_nothing_can_match = select_chunk_on_nothing_can_match; ctx.base.on_match_res = select_chunk_on_match_res; ctx.base.on_loop_ended = select_chunk_on_loop_ended; ctx.base.on_trap = select_chunk_on_trap, ctx.p = p; ctx.tb = &tbl->hash; ctx.tid = tid; ctx.hp = NULL; ctx.chunk_size = chunk_size; ctx.match_list = NIL; ctx.prev_continuation_tptr = NULL; return match_traverse( ctx.p, ctx.tb, pattern, NULL, ctx.chunk_size, MAX_SELECT_CHUNK_ITERATIONS, &ctx.hp, 0, &ctx.base, ret); } /* * * select_continue match traversal * */ static int select_chunk_continue_on_loop_ended(match_callbacks_t* ctx_base, Sint slot_ix, Sint got, Sint iterations_left, Binary** mpp, Eterm* ret) { select_chunk_context_t* ctx = (select_chunk_context_t*) ctx_base; Eterm continuation; Eterm rest = NIL; Eterm* hp; ASSERT(iterations_left <= MAX_SELECT_CHUNK_ITERATIONS); BUMP_REDS(ctx->p, MAX_SELECT_CHUNK_ITERATIONS - iterations_left); if (ctx->chunk_size) { Sint rest_size = 0; if (got > ctx->chunk_size) { /* Cannot write destructively here, the list may have been in user space */ hp = HAlloc(ctx->p, (got - ctx->chunk_size) * 2); while (got-- > ctx->chunk_size) { rest = CONS(hp, CAR(list_val(ctx->match_list)), rest); hp += 2; ctx->match_list = CDR(list_val(ctx->match_list)); ++rest_size; } } if (rest != NIL || slot_ix >= 0) { hp = HAlloc(ctx->p, 3 + 7); continuation = TUPLE6( hp, ctx->prev_continuation_tptr[1], make_small(slot_ix), ctx->prev_continuation_tptr[3], ctx->prev_continuation_tptr[4], rest, make_small(rest_size)); hp += 7; *ret = TUPLE2(hp, ctx->match_list, continuation); return DB_ERROR_NONE; } else { if (ctx->match_list != NIL) { hp = HAlloc(ctx->p, 3); *ret = TUPLE2(hp, ctx->match_list, am_EOT); return DB_ERROR_NONE; } else { *ret = am_EOT; return DB_ERROR_NONE; } } } *ret = ctx->match_list; return DB_ERROR_NONE; } /* * This is called when select traps */ static int db_select_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) { select_chunk_context_t ctx; Eterm* tptr; Eterm tid; Binary* mp; Sint got; Sint slot_ix; Sint chunk_size; Eterm match_list; Sint iterations_left = MAX_SELECT_CHUNK_ITERATIONS; /* Decode continuation. We know it's a tuple but not the arity or anything else */ ASSERT(is_tuple(continuation)); tptr = tuple_val(continuation); if (arityval(*tptr) != 6) goto badparam; if (!is_small(tptr[2]) || !is_small(tptr[3]) || !(is_list(tptr[5]) || tptr[5] == NIL) || !is_small(tptr[6])) goto badparam; if ((chunk_size = signed_val(tptr[3])) < 0) goto badparam; mp = erts_db_get_match_prog_binary(tptr[4]); if (mp == NULL) goto badparam; if ((got = signed_val(tptr[6])) < 0) goto badparam; tid = tptr[1]; slot_ix = signed_val(tptr[2]); match_list = tptr[5]; /* Proceed */ ctx.base.on_match_res = select_chunk_on_match_res; ctx.base.on_loop_ended = select_chunk_continue_on_loop_ended; ctx.base.on_trap = select_chunk_on_trap; ctx.p = p; ctx.tb = &tbl->hash; ctx.tid = tid; ctx.hp = NULL; ctx.chunk_size = chunk_size; ctx.match_list = match_list; ctx.prev_continuation_tptr = tptr; return match_traverse_continue( ctx.p, ctx.tb, ctx.chunk_size, iterations_left, &ctx.hp, slot_ix, got, &mp, 0, &ctx.base, ret); badparam: *ret = NIL; return DB_ERROR_BADPARAM; } #undef MAX_SELECT_CHUNK_ITERATIONS /* * * select_count match traversal * */ #define MAX_SELECT_COUNT_ITERATIONS 1000 typedef struct { match_callbacks_t base; Process* p; DbTableHash* tb; Eterm tid; Eterm* prev_continuation_tptr; } select_count_context_t; static int select_count_on_nothing_can_match(match_callbacks_t* ctx_base, Eterm* ret) { *ret = make_small(0); return DB_ERROR_NONE; } static int select_count_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix, HashDbTerm*** current_ptr_ptr, Eterm match_res) { return (match_res == am_true); } static int select_count_on_loop_ended(match_callbacks_t* ctx_base, Sint slot_ix, Sint got, Sint iterations_left, Binary** mpp, Eterm* ret) { select_count_context_t* ctx = (select_count_context_t*) ctx_base; ASSERT(iterations_left <= MAX_SELECT_COUNT_ITERATIONS); BUMP_REDS(ctx->p, MAX_SELECT_COUNT_ITERATIONS - iterations_left); *ret = erts_make_integer(got, ctx->p); return DB_ERROR_NONE; } static int select_count_on_trap(match_callbacks_t* ctx_base, Sint slot_ix, Sint got, Binary** mpp, Eterm* ret) { select_count_context_t* ctx = (select_count_context_t*) ctx_base; return on_simple_trap( &ets_select_count_continue_exp, ctx->p, ctx->tb, ctx->tid, ctx->prev_continuation_tptr, slot_ix, got, mpp, ret); } static int db_select_count_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { select_count_context_t ctx; Sint iterations_left = MAX_SELECT_COUNT_ITERATIONS; Sint chunk_size = 0; ctx.base.on_nothing_can_match = select_count_on_nothing_can_match; ctx.base.on_match_res = select_count_on_match_res; ctx.base.on_loop_ended = select_count_on_loop_ended; ctx.base.on_trap = select_count_on_trap; ctx.p = p; ctx.tb = &tbl->hash; ctx.tid = tid; ctx.prev_continuation_tptr = NULL; return match_traverse( ctx.p, ctx.tb, pattern, NULL, chunk_size, iterations_left, NULL, 0, &ctx.base, ret); } /* * This is called when select_count traps */ static int db_select_count_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) { select_count_context_t ctx; Eterm* tptr; Eterm tid; Binary* mp; Sint got; Sint slot_ix; Sint chunk_size = 0; *ret = NIL; if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) { *ret = NIL; return DB_ERROR_BADPARAM; } ctx.base.on_match_res = select_count_on_match_res; ctx.base.on_loop_ended = select_count_on_loop_ended; ctx.base.on_trap = select_count_on_trap; ctx.p = p; ctx.tb = &tbl->hash; ctx.tid = tid; ctx.prev_continuation_tptr = tptr; return match_traverse_continue( ctx.p, ctx.tb, chunk_size, MAX_SELECT_COUNT_ITERATIONS, NULL, slot_ix, got, &mp, 0, &ctx.base, ret); } #undef MAX_SELECT_COUNT_ITERATIONS /* * * select_delete match traversal * */ #define MAX_SELECT_DELETE_ITERATIONS 1000 typedef struct { match_callbacks_t base; Process* p; DbTableHash* tb; Eterm tid; Eterm* prev_continuation_tptr; erts_aint_t fixated_by_me; Uint last_pseudo_delete; HashDbTerm* free_us; } select_delete_context_t; static int select_delete_on_nothing_can_match(match_callbacks_t* ctx_base, Eterm* ret) { *ret = make_small(0); return DB_ERROR_NONE; } static int select_delete_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix, HashDbTerm*** current_ptr_ptr, Eterm match_res) { HashDbTerm** current_ptr = *current_ptr_ptr; select_delete_context_t* ctx = (select_delete_context_t*) ctx_base; HashDbTerm* del; if (match_res != am_true) return 0; if (NFIXED(ctx->tb) > ctx->fixated_by_me) { /* fixated by others? */ if (slot_ix != ctx->last_pseudo_delete) { if (!add_fixed_deletion(ctx->tb, slot_ix, ctx->fixated_by_me)) goto do_erase; ctx->last_pseudo_delete = slot_ix; } (*current_ptr)->pseudo_deleted = 1; } else { do_erase: del = *current_ptr; *current_ptr = (*current_ptr)->next; // replace pointer to term using next del->next = ctx->free_us; ctx->free_us = del; } erts_atomic_dec_nob(&ctx->tb->common.nitems); return 1; } static int select_delete_on_loop_ended(match_callbacks_t* ctx_base, Sint slot_ix, Sint got, Sint iterations_left, Binary** mpp, Eterm* ret) { select_delete_context_t* ctx = (select_delete_context_t*) ctx_base; free_term_list(ctx->tb, ctx->free_us); ctx->free_us = NULL; ASSERT(iterations_left <= MAX_SELECT_DELETE_ITERATIONS); BUMP_REDS(ctx->p, MAX_SELECT_DELETE_ITERATIONS - iterations_left); if (got) { try_shrink(ctx->tb); } *ret = erts_make_integer(got, ctx->p); return DB_ERROR_NONE; } static int select_delete_on_trap(match_callbacks_t* ctx_base, Sint slot_ix, Sint got, Binary** mpp, Eterm* ret) { select_delete_context_t* ctx = (select_delete_context_t*) ctx_base; free_term_list(ctx->tb, ctx->free_us); ctx->free_us = NULL; return on_simple_trap( &ets_select_delete_continue_exp, ctx->p, ctx->tb, ctx->tid, ctx->prev_continuation_tptr, slot_ix, got, mpp, ret); } static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { select_delete_context_t ctx; Sint chunk_size = 0; ctx.base.on_nothing_can_match = select_delete_on_nothing_can_match; ctx.base.on_match_res = select_delete_on_match_res; ctx.base.on_loop_ended = select_delete_on_loop_ended; ctx.base.on_trap = select_delete_on_trap; ctx.p = p; ctx.tb = &tbl->hash; ctx.tid = tid; ctx.prev_continuation_tptr = NULL; ctx.fixated_by_me = ctx.tb->common.is_thread_safe ? 0 : 1; /* TODO: something nicer */ ctx.last_pseudo_delete = (Uint) -1; ctx.free_us = NULL; return match_traverse( ctx.p, ctx.tb, pattern, NULL, chunk_size, MAX_SELECT_DELETE_ITERATIONS, NULL, 1, &ctx.base, ret); } /* * This is called when select_delete traps */ static int db_select_delete_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) { select_delete_context_t ctx; Eterm* tptr; Eterm tid; Binary* mp; Sint got; Sint slot_ix; Sint chunk_size = 0; if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) { *ret = NIL; return DB_ERROR_BADPARAM; } ctx.base.on_match_res = select_delete_on_match_res; ctx.base.on_loop_ended = select_delete_on_loop_ended; ctx.base.on_trap = select_delete_on_trap; ctx.p = p; ctx.tb = &tbl->hash; ctx.tid = tid; ctx.prev_continuation_tptr = tptr; ctx.fixated_by_me = ONLY_WRITER(p, ctx.tb) ? 0 : 1; /* TODO: something nicer */ ctx.last_pseudo_delete = (Uint) -1; ctx.free_us = NULL; return match_traverse_continue( ctx.p, ctx.tb, chunk_size, MAX_SELECT_DELETE_ITERATIONS, NULL, slot_ix, got, &mp, 1, &ctx.base, ret); } #undef MAX_SELECT_DELETE_ITERATIONS /* * * select_replace match traversal * */ #define MAX_SELECT_REPLACE_ITERATIONS 1000 typedef struct { match_callbacks_t base; Process* p; DbTableHash* tb; Eterm tid; Eterm* prev_continuation_tptr; } select_replace_context_t; static int select_replace_on_nothing_can_match(match_callbacks_t* ctx_base, Eterm* ret) { *ret = make_small(0); return DB_ERROR_NONE; } static int select_replace_on_match_res(match_callbacks_t* ctx_base, Sint slot_ix, HashDbTerm*** current_ptr_ptr, Eterm match_res) { select_replace_context_t* ctx = (select_replace_context_t*) ctx_base; DbTableHash* tb = ctx->tb; HashDbTerm* new; HashDbTerm* next; HashValue hval; if (is_value(match_res)) { #ifdef DEBUG Eterm key = db_getkey(tb->common.keypos, match_res); ASSERT(is_value(key)); ASSERT(eq(key, GETKEY(tb, (**current_ptr_ptr)->dbterm.tpl))); #endif next = (**current_ptr_ptr)->next; hval = (**current_ptr_ptr)->hvalue; new = new_dbterm(tb, match_res); new->next = next; new->hvalue = hval; new->pseudo_deleted = 0; free_term(tb, **current_ptr_ptr); **current_ptr_ptr = new; /* replace 'next' pointer in previous object */ *current_ptr_ptr = &((**current_ptr_ptr)->next); /* advance to next object */ return 1; } return 0; } static int select_replace_on_loop_ended(match_callbacks_t* ctx_base, Sint slot_ix, Sint got, Sint iterations_left, Binary** mpp, Eterm* ret) { select_replace_context_t* ctx = (select_replace_context_t*) ctx_base; ASSERT(iterations_left <= MAX_SELECT_REPLACE_ITERATIONS); /* the more objects we've replaced, the more reductions we've consumed */ BUMP_REDS(ctx->p, MIN(MAX_SELECT_REPLACE_ITERATIONS * 2, (MAX_SELECT_REPLACE_ITERATIONS - iterations_left) + (int)got)); *ret = erts_make_integer(got, ctx->p); return DB_ERROR_NONE; } static int select_replace_on_trap(match_callbacks_t* ctx_base, Sint slot_ix, Sint got, Binary** mpp, Eterm* ret) { select_replace_context_t* ctx = (select_replace_context_t*) ctx_base; return on_simple_trap( &ets_select_replace_continue_exp, ctx->p, ctx->tb, ctx->tid, ctx->prev_continuation_tptr, slot_ix, got, mpp, ret); } static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, Eterm pattern, Eterm *ret) { select_replace_context_t ctx; Sint chunk_size = 0; /* Bag implementation presented both semantic consistency and performance issues, * unsupported for now */ ASSERT(!(tbl->hash.common.status & DB_BAG)); ctx.base.on_nothing_can_match = select_replace_on_nothing_can_match; ctx.base.on_match_res = select_replace_on_match_res; ctx.base.on_loop_ended = select_replace_on_loop_ended; ctx.base.on_trap = select_replace_on_trap; ctx.p = p; ctx.tb = &tbl->hash; ctx.tid = tid; ctx.prev_continuation_tptr = NULL; return match_traverse( ctx.p, ctx.tb, pattern, db_match_keeps_key, chunk_size, MAX_SELECT_REPLACE_ITERATIONS, NULL, 1, &ctx.base, ret); } /* * This is called when select_replace traps */ static int db_select_replace_continue_hash(Process* p, DbTable* tbl, Eterm continuation, Eterm* ret) { select_replace_context_t ctx; Eterm* tptr; Eterm tid ; Binary* mp; Sint got; Sint slot_ix; Sint chunk_size = 0; *ret = NIL; if (unpack_simple_continuation(continuation, &tptr, &tid, &slot_ix, &mp, &got)) { *ret = NIL; return DB_ERROR_BADPARAM; } /* Proceed */ ctx.base.on_match_res = select_replace_on_match_res; ctx.base.on_loop_ended = select_replace_on_loop_ended; ctx.base.on_trap = select_replace_on_trap; ctx.p = p; ctx.tb = &tbl->hash; ctx.tid = tid; ctx.prev_continuation_tptr = tptr; return match_traverse_continue( ctx.p, ctx.tb, chunk_size, MAX_SELECT_REPLACE_ITERATIONS, NULL, slot_ix, got, &mp, 1, &ctx.base, ret); } static int db_take_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret) { DbTableHash *tb = &tbl->hash; HashDbTerm **bp, *b; HashDbTerm *free_us = NULL; HashValue hval = MAKE_HASH(key); erts_rwmtx_t *lck = WLOCK_HASH(tb, hval); int ix = hash_to_ix(tb, hval); int nitems_diff = 0; *ret = NIL; for (bp = &BUCKET(tb, ix), b = *bp; b; bp = &b->next, b = b->next) { if (has_live_key(tb, b, key, hval)) { HashDbTerm *bend; *ret = get_term_list(p, tb, key, hval, b, &bend); while (b != bend) { --nitems_diff; if (nitems_diff == -1 && IS_FIXED(tb) && add_fixed_deletion(tb, ix, 0)) { /* Pseudo remove (no need to keep several of same key) */ bp = &b->next; b->pseudo_deleted = 1; b = b->next; } else { HashDbTerm* next = b->next; b->next = free_us; free_us = b; b = *bp = next; } } break; } } WUNLOCK_HASH(lck); if (nitems_diff) { erts_atomic_add_nob(&tb->common.nitems, nitems_diff); try_shrink(tb); } free_term_list(tb, free_us); return DB_ERROR_NONE; } /* ** Other interface routines (not directly coupled to one bif) */ void db_initialize_hash(void) { } static SWord db_mark_all_deleted_hash(DbTable *tbl, SWord reds) { const int LOOPS_PER_REDUCTION = 8; DbTableHash *tb = &tbl->hash; FixedDeletion* fixdel; SWord loops = reds * LOOPS_PER_REDUCTION; int i; ERTS_LC_ASSERT(IS_TAB_WLOCKED(tb)); fixdel = (FixedDeletion*) erts_atomic_read_nob(&tb->fixdel); if (fixdel && fixdel->trap) { /* Continue after trap */ ASSERT(fixdel->all); ASSERT(fixdel->slot < NACTIVE(tb)); i = fixdel->slot; } else { /* First call */ fixdel = erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL, (DbTable *) tb, sizeof(FixedDeletion)); ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion)); link_fixdel(tb, fixdel, 0); i = 0; } do { HashDbTerm* b; for (b = BUCKET(tb,i); b; b = b->next) b->pseudo_deleted = 1; } while (++i < NACTIVE(tb) && --loops > 0); if (i < NACTIVE(tb)) { /* Yield */ fixdel->slot = i; fixdel->all = 0; fixdel->trap = 1; return -1; } fixdel->slot = NACTIVE(tb) - 1; fixdel->all = 1; fixdel->trap = 0; erts_atomic_set_nob(&tb->common.nitems, 0); return loops < 0 ? 0 : loops / LOOPS_PER_REDUCTION; } /* Display hash table contents (for dump) */ static void db_print_hash(fmtfn_t to, void *to_arg, int show, DbTable *tbl) { DbTableHash *tb = &tbl->hash; DbHashStats stats; int i; erts_print(to, to_arg, "Buckets: %d\n", NACTIVE(tb)); i = tbl->common.is_thread_safe; /* If crash dumping we set table to thread safe in order to avoid taking any locks */ if (ERTS_IS_CRASH_DUMPING) tbl->common.is_thread_safe = 1; db_calc_stats_hash(&tbl->hash, &stats); tbl->common.is_thread_safe = i; erts_print(to, to_arg, "Chain Length Avg: %f\n", stats.avg_chain_len); erts_print(to, to_arg, "Chain Length Max: %d\n", stats.max_chain_len); erts_print(to, to_arg, "Chain Length Min: %d\n", stats.min_chain_len); erts_print(to, to_arg, "Chain Length Std Dev: %f\n", stats.std_dev_chain_len); erts_print(to, to_arg, "Chain Length Expected Std Dev: %f\n", stats.std_dev_expected); if (IS_FIXED(tb)) erts_print(to, to_arg, "Fixed: %d\n", stats.kept_items); else erts_print(to, to_arg, "Fixed: false\n"); if (show) { for (i = 0; i < NACTIVE(tb); i++) { HashDbTerm* list = BUCKET(tb,i); if (list == NULL) continue; erts_print(to, to_arg, "%d: [", i); while(list != 0) { if (is_pseudo_deleted(list)) erts_print(to, to_arg, "*"); if (tb->common.compress) { Eterm key = GETKEY(tb, list->dbterm.tpl); erts_print(to, to_arg, "key=%T", key); } else { Eterm obj = make_tuple(list->dbterm.tpl); erts_print(to, to_arg, "%T", obj); } if (list->next != 0) erts_print(to, to_arg, ","); list = list->next; } erts_print(to, to_arg, "]\n"); } } } /* release all memory occupied by a single table */ static int db_free_table_hash(DbTable *tbl) { while (db_free_table_continue_hash(tbl, ERTS_SWORD_MAX) < 0) ; return 0; } static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds) { DbTableHash *tb = &tbl->hash; FixedDeletion* fixdel = (FixedDeletion*) erts_atomic_read_acqb(&tb->fixdel); ERTS_LC_ASSERT(IS_TAB_WLOCKED(tb) || (tb->common.status & DB_DELETE)); while (fixdel != NULL) { FixedDeletion *fx = fixdel; fixdel = fx->next; erts_db_free(ERTS_ALC_T_DB_FIX_DEL, (DbTable *) tb, (void *) fx, sizeof(FixedDeletion)); ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion)); if (--reds < 0) { erts_atomic_set_relb(&tb->fixdel, (erts_aint_t)fixdel); return reds; /* Not done */ } } erts_atomic_set_relb(&tb->fixdel, (erts_aint_t)NULL); while(tb->nslots != 0) { reds -= EXT_SEGSZ/64 + free_seg(tb, 1); /* * If we have done enough work, get out here. */ if (reds < 0) { return reds; /* Not done */ } } if (tb->locks != NULL) { int i; for (i=0; ilocks, sizeof(DbTableHashFineLocks)); tb->locks = NULL; } ASSERT(erts_atomic_read_nob(&tb->common.memory_size) == sizeof(DbTable)); return reds; /* Done */ } /* ** Utility routines. (static) */ /* ** For the select functions, analyzes the pattern and determines which ** slots should be searched. Also compiles the match program */ static int analyze_pattern(DbTableHash *tb, Eterm pattern, extra_match_validator_t extra_validator, /* Optional callback */ struct mp_info *mpi) { Eterm *ptpl; Eterm lst, tpl, ttpl; Eterm *matches,*guards, *bodies; Eterm sbuff[30]; Eterm *buff = sbuff; Eterm key = NIL; HashValue hval = NIL; int num_heads = 0; int i; mpi->lists = mpi->dlists; mpi->num_lists = 0; mpi->key_given = 1; mpi->something_can_match = 0; mpi->mp = NULL; for (lst = pattern; is_list(lst); lst = CDR(list_val(lst))) ++num_heads; if (lst != NIL) {/* proper list... */ return DB_ERROR_BADPARAM; } if (num_heads > 10) { buff = erts_alloc(ERTS_ALC_T_DB_TMP, sizeof(Eterm) * num_heads * 3); mpi->lists = erts_alloc(ERTS_ALC_T_DB_SEL_LIST, sizeof(*(mpi->lists)) * num_heads); } matches = buff; guards = buff + num_heads; bodies = buff + (num_heads * 2); i = 0; for(lst = pattern; is_list(lst); lst = CDR(list_val(lst))) { Eterm match; Eterm guard; Eterm body; ttpl = CAR(list_val(lst)); if (!is_tuple(ttpl)) { if (buff != sbuff) { erts_free(ERTS_ALC_T_DB_TMP, buff); } return DB_ERROR_BADPARAM; } ptpl = tuple_val(ttpl); if (ptpl[0] != make_arityval(3U)) { if (buff != sbuff) { erts_free(ERTS_ALC_T_DB_TMP, buff); } return DB_ERROR_BADPARAM; } matches[i] = match = tpl = ptpl[1]; guards[i] = guard = ptpl[2]; bodies[i] = body = ptpl[3]; if(extra_validator != NULL && !extra_validator(tb->common.keypos, match, guard, body)) { if (buff != sbuff) { erts_free(ERTS_ALC_T_DB_TMP, buff); } return DB_ERROR_BADPARAM; } if (!is_list(body) || CDR(list_val(body)) != NIL || CAR(list_val(body)) != am_DollarUnderscore) { } ++i; if (!(mpi->key_given)) { continue; } if (tpl == am_Underscore || db_is_variable(tpl) != -1) { (mpi->key_given) = 0; (mpi->something_can_match) = 1; } else { key = db_getkey(tb->common.keypos, tpl); if (is_value(key)) { if (!db_has_variable(key)) { /* Bound key */ int ix, search_slot; HashDbTerm** bp; erts_rwmtx_t* lck; hval = MAKE_HASH(key); lck = RLOCK_HASH(tb,hval); ix = hash_to_ix(tb, hval); bp = &BUCKET(tb,ix); if (lck == NULL) { search_slot = search_list(tb,key,hval,*bp) != NULL; } else { /* No point to verify if key exist now as there may be concurrent inserters/deleters anyway */ RUNLOCK_HASH(lck); search_slot = 1; } if (search_slot) { int j; for (j=0; ; ++j) { if (j == mpi->num_lists) { mpi->lists[mpi->num_lists].bucket = bp; mpi->lists[mpi->num_lists].ix = ix; ++mpi->num_lists; break; } if (mpi->lists[j].bucket == bp) { ASSERT(mpi->lists[j].ix == ix); break; } ASSERT(mpi->lists[j].ix != ix); } mpi->something_can_match = 1; } } else { mpi->key_given = 0; mpi->something_can_match = 1; } } } } /* * It would be nice not to compile the match_spec if nothing could match, * but then the select calls would not fail like they should on bad * match specs that happen to specify non existent keys etc. */ if ((mpi->mp = db_match_compile(matches, guards, bodies, num_heads, DCOMP_TABLE, NULL)) == NULL) { if (buff != sbuff) { erts_free(ERTS_ALC_T_DB_TMP, buff); } return DB_ERROR_BADPARAM; } if (buff != sbuff) { erts_free(ERTS_ALC_T_DB_TMP, buff); } return DB_ERROR_NONE; } static struct ext_segtab* alloc_ext_segtab(DbTableHash* tb, unsigned seg_ix) { struct segment** old_segtab = SEGTAB(tb); int nsegs = 0; struct ext_segtab* est; ASSERT(seg_ix >= NSEG_1); switch (seg_ix) { case NSEG_1: nsegs = NSEG_2; break; default: nsegs = seg_ix + NSEG_INC; break; } ASSERT(nsegs > tb->nsegs); est = (struct ext_segtab*) erts_db_alloc(ERTS_ALC_T_DB_SEG, (DbTable *) tb, SIZEOF_EXT_SEGTAB(nsegs)); est->nsegs = nsegs; est->prev_segtab = old_segtab; est->prev_nsegs = tb->nsegs; sys_memcpy(est->segtab, old_segtab, tb->nsegs*sizeof(struct segment*)); #ifdef DEBUG sys_memset(&est->segtab[seg_ix], 0, (nsegs-seg_ix)*sizeof(struct segment*)); #endif return est; } /* Extend table with one new segment */ static void alloc_seg(DbTableHash *tb) { int seg_ix = SLOT_IX_TO_SEG_IX(tb->nslots); struct segment** segtab; ASSERT(seg_ix > 0); if (seg_ix == tb->nsegs) { /* New segtab needed */ struct ext_segtab* est = alloc_ext_segtab(tb, seg_ix); SET_SEGTAB(tb, est->segtab); tb->nsegs = est->nsegs; } ASSERT(seg_ix < tb->nsegs); segtab = SEGTAB(tb); segtab[seg_ix] = (struct segment*) erts_db_alloc(ERTS_ALC_T_DB_SEG, (DbTable *) tb, SIZEOF_SEGMENT(EXT_SEGSZ)); sys_memset(segtab[seg_ix], 0, SIZEOF_SEGMENT(EXT_SEGSZ)); tb->nslots += EXT_SEGSZ; } static void dealloc_ext_segtab(void* lop_data) { struct ext_segtab* est = (struct ext_segtab*) lop_data; erts_free(ERTS_ALC_T_DB_SEG, est); } /* Shrink table by freeing the top segment ** free_records: 1=free any records in segment, 0=assume segment is empty */ static int free_seg(DbTableHash *tb, int free_records) { const int seg_ix = SLOT_IX_TO_SEG_IX(tb->nslots) - 1; struct segment** const segtab = SEGTAB(tb); struct segment* const segp = segtab[seg_ix]; Uint seg_sz; int nrecords = 0; ASSERT(segp != NULL); #ifndef DEBUG if (free_records) #endif { int i = (seg_ix == 0) ? FIRST_SEGSZ : EXT_SEGSZ; while (i--) { HashDbTerm* p = segp->buckets[i]; while(p != 0) { HashDbTerm* nxt = p->next; ASSERT(free_records); /* segment not empty as assumed? */ free_term(tb, p); p = nxt; ++nrecords; } } } if (seg_ix >= NSEG_1) { struct ext_segtab* est = ErtsContainerStruct_(segtab,struct ext_segtab,segtab); if (seg_ix == est->prev_nsegs) { /* Dealloc extended segtab */ ASSERT(est->prev_segtab != NULL); SET_SEGTAB(tb, est->prev_segtab); tb->nsegs = est->prev_nsegs; if (!tb->common.is_thread_safe) { /* * Table is doing a graceful shrink operation and we must avoid * deallocating this segtab while it may still be read by other * threads. Schedule deallocation with thread progress to make * sure no lingering threads are still hanging in BUCKET macro * with an old segtab pointer. */ Uint sz = SIZEOF_EXT_SEGTAB(est->nsegs); ASSERT(sz == ERTS_ALC_DBG_BLK_SZ(est)); ERTS_DB_ALC_MEM_UPDATE_(tb, sz, 0); erts_schedule_thr_prgr_later_cleanup_op(dealloc_ext_segtab, est, &est->lop, sz); } else erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable*)tb, est, SIZEOF_EXT_SEGTAB(est->nsegs)); } } seg_sz = (seg_ix == 0) ? FIRST_SEGSZ : EXT_SEGSZ; erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb, segp, SIZEOF_SEGMENT(seg_sz)); #ifdef DEBUG if (seg_ix < tb->nsegs) SEGTAB(tb)[seg_ix] = NULL; #endif tb->nslots -= seg_sz; ASSERT(tb->nslots >= 0); return nrecords; } /* ** Copy terms from ptr1 until ptr2 ** works for ptr1 == ptr2 == 0 => [] ** or ptr2 == 0 ** sz is either precalculated heap size or 0 if not known */ static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2, Uint sz, DbTableHash* tb) { HashDbTerm* ptr; Eterm list = NIL; Eterm copy; Eterm *hp, *hend; if (!sz) { ptr = ptr1; while(ptr != ptr2) { if (!is_pseudo_deleted(ptr)) sz += ptr->dbterm.size + 2; ptr = ptr->next; } } hp = HAlloc(p, sz); hend = hp + sz; ptr = ptr1; while(ptr != ptr2) { if (!is_pseudo_deleted(ptr)) { copy = db_copy_object_from_ets(&tb->common, &ptr->dbterm, &hp, &MSO(p)); list = CONS(hp, copy, list); hp += 2; } ptr = ptr->next; } HRelease(p,hend,hp); return list; } static ERTS_INLINE int begin_resizing(DbTableHash* tb) { if (DB_USING_FINE_LOCKING(tb)) return !erts_atomic_xchg_acqb(&tb->is_resizing, 1); else ERTS_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&tb->common.rwlock)); return 1; } static ERTS_INLINE void done_resizing(DbTableHash* tb) { if (DB_USING_FINE_LOCKING(tb)) erts_atomic_set_relb(&tb->is_resizing, 0); } /* Grow table with one or more new buckets. ** Allocate new segment if needed. */ static void grow(DbTableHash* tb, int nitems) { HashDbTerm** pnext; HashDbTerm** to_pnext; HashDbTerm* p; erts_rwmtx_t* lck; int nactive; int from_ix, to_ix; int szm; int loop_limit = 5; do { if (!begin_resizing(tb)) return; /* already in progress */ nactive = NACTIVE(tb); if (nitems <= GROW_LIMIT(nactive)) { goto abort; /* already done (race) */ } /* Ensure that the slot nactive exists */ if (nactive == tb->nslots) { /* Time to get a new segment */ ASSERT(((nactive-FIRST_SEGSZ) & EXT_SEGSZ_MASK) == 0); alloc_seg(tb); } ASSERT(nactive < tb->nslots); szm = erts_atomic_read_nob(&tb->szm); if (nactive <= szm) { from_ix = nactive & (szm >> 1); } else { ASSERT(nactive == szm+1); from_ix = 0; szm = (szm<<1) | 1; } to_ix = nactive; lck = WLOCK_HASH(tb, from_ix); ERTS_ASSERT(lck == GET_LOCK_MAYBE(tb,to_ix)); /* Now a final double check (with the from_ix lock held) * that we did not get raced by a table fixer. */ if (IS_FIXED(tb)) { WUNLOCK_HASH(lck); goto abort; } erts_atomic_set_nob(&tb->nactive, ++nactive); if (from_ix == 0) { if (DB_USING_FINE_LOCKING(tb)) erts_atomic_set_relb(&tb->szm, szm); else erts_atomic_set_nob(&tb->szm, szm); } done_resizing(tb); /* Finally, let's split the bucket. We try to do it in a smart way to keep link order and avoid unnecessary updates of next-pointers */ pnext = &BUCKET(tb, from_ix); p = *pnext; to_pnext = &BUCKET(tb, to_ix); while (p != NULL) { if (is_pseudo_deleted(p)) { /* rare but possible with fine locking */ *pnext = p->next; free_term(tb, p); p = *pnext; } else { int ix = p->hvalue & szm; if (ix != from_ix) { ASSERT(ix == (from_ix ^ ((szm+1)>>1))); *to_pnext = p; /* Swap "from" and "to": */ from_ix = ix; to_pnext = pnext; } pnext = &p->next; p = *pnext; } } *to_pnext = NULL; WUNLOCK_HASH(lck); }while (--loop_limit && nitems > GROW_LIMIT(nactive)); return; abort: done_resizing(tb); } /* Shrink table by joining top bucket. ** Remove top segment if it gets empty. */ static void shrink(DbTableHash* tb, int nitems) { HashDbTerm** src_bp; HashDbTerm** dst_bp; HashDbTerm** bp; erts_rwmtx_t* lck; int src_ix, dst_ix, low_szm; int nactive; int loop_limit = 5; do { if (!begin_resizing(tb)) return; /* already in progress */ nactive = NACTIVE(tb); if (!(nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive))) { goto abort; /* already done (race) */ } src_ix = nactive - 1; low_szm = erts_atomic_read_nob(&tb->szm) >> 1; dst_ix = src_ix & low_szm; ASSERT(dst_ix < src_ix); ASSERT(nactive > FIRST_SEGSZ); lck = WLOCK_HASH(tb, dst_ix); ERTS_ASSERT(lck == GET_LOCK_MAYBE(tb,src_ix)); /* Double check for racing table fixers */ if (IS_FIXED(tb)) { WUNLOCK_HASH(lck); goto abort; } src_bp = &BUCKET(tb, src_ix); dst_bp = &BUCKET(tb, dst_ix); bp = src_bp; /* * We join lists by appending "dst" at the end of "src" * as we must step through "src" anyway to purge pseudo deleted. */ while(*bp != NULL) { if (is_pseudo_deleted(*bp)) { HashDbTerm* deleted = *bp; *bp = deleted->next; free_term(tb, deleted); } else { bp = &(*bp)->next; } } *bp = *dst_bp; *dst_bp = *src_bp; *src_bp = NULL; nactive = src_ix; erts_atomic_set_nob(&tb->nactive, nactive); if (dst_ix == 0) { erts_atomic_set_relb(&tb->szm, low_szm); } WUNLOCK_HASH(lck); if (tb->nslots - src_ix >= EXT_SEGSZ) { free_seg(tb, 0); } done_resizing(tb); } while (--loop_limit && nactive > FIRST_SEGSZ && nitems < SHRINK_LIMIT(nactive)); return; abort: done_resizing(tb); } /* Search a list of tuples for a matching key */ static HashDbTerm* search_list(DbTableHash* tb, Eterm key, HashValue hval, HashDbTerm *list) { while (list != 0) { if (has_live_key(tb,list,key,hval)) return list; list = list->next; } return 0; } /* This function is called by the next AND the select BIF */ /* It return the next live object in a table, NULL if no more */ /* In-bucket: RLOCKED */ /* Out-bucket: RLOCKED unless NULL */ static HashDbTerm* next_live(DbTableHash *tb, Uint *iptr, erts_rwmtx_t** lck_ptr, HashDbTerm *list) { int i; ERTS_LC_ASSERT(IS_HASH_RLOCKED(tb,*iptr)); for ( ; list != NULL; list = list->next) { if (!is_pseudo_deleted(list)) return list; } i = *iptr; while ((i=next_slot(tb, i, lck_ptr)) != 0) { list = BUCKET(tb,i); while (list != NULL) { if (!is_pseudo_deleted(list)) { *iptr = i; return list; } list = list->next; } } /* *iptr = ??? */ return NULL; } static int db_lookup_dbterm_hash(Process *p, DbTable *tbl, Eterm key, Eterm obj, DbUpdateHandle* handle) { DbTableHash *tb = &tbl->hash; HashValue hval; HashDbTerm **bp, *b; erts_rwmtx_t* lck; int flags = 0; ASSERT(tb->common.status & DB_SET); hval = MAKE_HASH(key); lck = WLOCK_HASH(tb, hval); bp = &BUCKET(tb, hash_to_ix(tb, hval)); b = *bp; for (;;) { if (b == NULL) { break; } if (has_key(tb, b, key, hval)) { if (!is_pseudo_deleted(b)) { goto Ldone; } break; } bp = &b->next; b = *bp; } if (obj == THE_NON_VALUE) { WUNLOCK_HASH(lck); return 0; } { Eterm *objp = tuple_val(obj); int arity = arityval(*objp); Eterm *htop, *hend; ASSERT(arity >= tb->common.keypos); htop = HAlloc(p, arity + 1); hend = htop + arity + 1; sys_memcpy(htop, objp, sizeof(Eterm) * (arity + 1)); htop[tb->common.keypos] = key; obj = make_tuple(htop); if (b == NULL) { HashDbTerm *q = new_dbterm(tb, obj); q->hvalue = hval; q->pseudo_deleted = 0; q->next = NULL; *bp = b = q; flags |= DB_INC_TRY_GROW; } else { HashDbTerm *q, *next = b->next; ASSERT(is_pseudo_deleted(b)); q = replace_dbterm(tb, b, obj); q->next = next; ASSERT(q->hvalue == hval); q->pseudo_deleted = 0; *bp = b = q; erts_atomic_inc_nob(&tb->common.nitems); } HRelease(p, hend, htop); flags |= DB_NEW_OBJECT; } Ldone: handle->tb = tbl; handle->bp = (void **)bp; handle->dbterm = &b->dbterm; handle->flags = flags; handle->new_size = b->dbterm.size; handle->lck = lck; return 1; } /* Must be called after call to db_lookup_dbterm */ static void db_finalize_dbterm_hash(int cret, DbUpdateHandle* handle) { DbTable* tbl = handle->tb; DbTableHash *tb = &tbl->hash; HashDbTerm **bp = (HashDbTerm **) handle->bp; HashDbTerm *b = *bp; erts_rwmtx_t* lck = (erts_rwmtx_t*) handle->lck; HashDbTerm* free_me = NULL; ERTS_LC_ASSERT(IS_HASH_WLOCKED(tb, lck)); /* locked by db_lookup_dbterm_hash */ ASSERT((&b->dbterm == handle->dbterm) == !(tb->common.compress && handle->flags & DB_MUST_RESIZE)); if (handle->flags & DB_NEW_OBJECT && cret != DB_ERROR_NONE) { if (IS_FIXED(tb) && add_fixed_deletion(tb, hash_to_ix(tb, b->hvalue), 0)) { b->pseudo_deleted = 1; } else { *bp = b->next; free_me = b; } WUNLOCK_HASH(lck); erts_atomic_dec_nob(&tb->common.nitems); try_shrink(tb); } else { if (handle->flags & DB_MUST_RESIZE) { db_finalize_resize(handle, offsetof(HashDbTerm,dbterm)); free_me = b; } if (handle->flags & DB_INC_TRY_GROW) { int nactive; int nitems = erts_atomic_inc_read_nob(&tb->common.nitems); WUNLOCK_HASH(lck); nactive = NACTIVE(tb); if (nitems > GROW_LIMIT(nactive) && !IS_FIXED(tb)) { grow(tb, nitems); } } else { WUNLOCK_HASH(lck); } } if (free_me) free_term(tb, free_me); #ifdef DEBUG handle->dbterm = 0; #endif return; } static SWord db_delete_all_objects_hash(Process* p, DbTable* tbl, SWord reds) { if (IS_FIXED(tbl)) { reds = db_mark_all_deleted_hash(tbl, reds); } else { reds = db_free_table_continue_hash(tbl, reds); if (reds < 0) return reds; db_create_hash(p, tbl); erts_atomic_set_nob(&tbl->hash.common.nitems, 0); } return reds; } void db_foreach_offheap_hash(DbTable *tbl, void (*func)(ErlOffHeap *, void *), void * arg) { DbTableHash *tb = &tbl->hash; HashDbTerm* list; int i; int nactive = NACTIVE(tb); for (i = 0; i < nactive; i++) { list = BUCKET(tb,i); while(list != 0) { ErlOffHeap tmp_offheap; tmp_offheap.first = list->dbterm.first_oh; tmp_offheap.overhead = 0; (*func)(&tmp_offheap, arg); list->dbterm.first_oh = tmp_offheap.first; list = list->next; } } } void db_calc_stats_hash(DbTableHash* tb, DbHashStats* stats) { HashDbTerm* b; erts_rwmtx_t* lck; int sum = 0; int sq_sum = 0; int kept_items = 0; int ix; int len; stats->min_chain_len = INT_MAX; stats->max_chain_len = 0; ix = 0; lck = RLOCK_HASH(tb,ix); do { len = 0; for (b = BUCKET(tb,ix); b!=NULL; b=b->next) { len++; if (is_pseudo_deleted(b)) ++kept_items; } sum += len; sq_sum += len*len; if (len < stats->min_chain_len) stats->min_chain_len = len; if (len > stats->max_chain_len) stats->max_chain_len = len; ix = next_slot(tb,ix,&lck); }while (ix); stats->avg_chain_len = (float)sum / NACTIVE(tb); stats->std_dev_chain_len = sqrt((sq_sum - stats->avg_chain_len*sum) / NACTIVE(tb)); /* Expected standard deviation from a good uniform hash function, ie binomial distribution (not taking the linear hashing into acount) */ stats->std_dev_expected = sqrt(stats->avg_chain_len * (1 - 1.0/NACTIVE(tb))); stats->kept_items = kept_items; } /* For testing only */ Eterm erts_ets_hash_sizeof_ext_segtab(void) { return make_small(((SIZEOF_EXT_SEGTAB(0)-1) / sizeof(UWord)) + 1); } #ifdef ERTS_ENABLE_LOCK_COUNT void erts_lcnt_enable_db_hash_lock_count(DbTableHash *tb, int enable) { int i; if(tb->locks == NULL) { return; } for(i = 0; i < DB_HASH_LOCK_CNT; i++) { erts_lcnt_ref_t *ref = &tb->locks->lck_vec[i].lck.lcnt; if(enable) { erts_lcnt_install_new_lock_info(ref, "db_hash_slot", tb->common.the_name, ERTS_LOCK_TYPE_RWMUTEX | ERTS_LOCK_FLAGS_CATEGORY_DB); } else { erts_lcnt_uninstall(ref); } } } #endif /* ERTS_ENABLE_LOCK_COUNT */