/* * %CopyrightBegin% * * Copyright Ericsson AB 1998-2011. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in * compliance with the License. You should have received a copy of the * Erlang Public License along with this software. If not, it can be * retrieved online at http://www.erlang.org/. * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See * the License for the specific language governing rights and limitations * under the License. * * %CopyrightEnd% */ /* ** Implementation of unordered ETS tables. ** The tables are implemented as linear dynamic hash tables. */ /* SMP: ** The hash table supports two different locking "modes", ** coarse grained and fine grained locking. ** ** Coarse grained locking relies entirely on the caller (erl_db.c) to obtain ** the right kind of lock on the entire table depending on operation (reading ** or writing). No further locking is then done by the table itself. ** ** Fine grained locking is supported by this code to allow concurrent updates ** (and reading) to different parts of the table. This works by keeping one ** rw-mtx for every N'th bucket. Even dynamic growing and shrinking by ** rehashing buckets can be done without exclusive table lock. ** ** A table will support fine grained locking if it is created with flag ** DB_FINE_LOCKED set. The table variable is_thread_safe will then indicate ** if operations need to obtain fine grained locks or not. Some operations ** will for example always use exclusive table lock to guarantee ** a higher level of atomicy. */ /* FIXATION: ** Fixating the table, by ets:safe_fixtable or as done by select-operations, ** guarantees two things in current implementation. ** (1) Keys will not *totaly* disappear from the table. A key can thus be used ** as an iterator to find the next key in iteration sequence. Note however ** that this does not mean that (pointers to) table objects are guaranteed ** to be maintained while the table is fixated. A BAG or DBAG may actually ** remove objects as long as there is at least one object left in the table ** with the same key (alive or pseudo-deleted). ** (2) Objects will not be moved between buckets due to table grow/shrink. ** This will guarantee that iterations do not miss keys or get double-hits. ** ** With fine grained locking, a concurrent thread can fixate the table at any ** time. A "dangerous" operation (delete or move) therefore needs to check ** if the table is fixated while write-locking the bucket. */ /* #ifdef DEBUG #define HARDDEBUG 1 #endif */ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "sys.h" #include "erl_vm.h" #include "global.h" #include "erl_process.h" #include "error.h" #define ERTS_WANT_DB_INTERNAL__ #include "erl_db.h" #include "bif.h" #include "big.h" #include "export.h" #include "erl_binary.h" #include "erl_db_hash.h" #ifdef MYDEBUG /* Will fail test case ets_SUITE:memory */ # define IF_DEBUG(x) x # define MY_ASSERT(x) ASSERT(x) #else # define IF_DEBUG(x) # define MY_ASSERT(x) #endif /* * The following symbols can be manipulated to "tune" the linear hash array */ #define CHAIN_LEN 6 /* Medium bucket chain len */ /* Number of slots per segment */ #define SEGSZ_EXP 8 #define SEGSZ (1 << SEGSZ_EXP) #define SEGSZ_MASK (SEGSZ-1) #define NSEG_1 2 /* Size of first segment table (must be at least 2) */ #define NSEG_2 256 /* Size of second segment table */ #define NSEG_INC 128 /* Number of segments to grow after that */ #define SEGTAB(tb) ((struct segment**)erts_smp_atomic_read_acqb(&(tb)->segtab)) #define NACTIVE(tb) ((int)erts_smp_atomic_read(&(tb)->nactive)) #define NITEMS(tb) ((int)erts_smp_atomic_read(&(tb)->common.nitems)) #define BUCKET(tb, i) SEGTAB(tb)[(i) >> SEGSZ_EXP]->buckets[(i) & SEGSZ_MASK] /* * When deleting a table, the number of records to delete. * Approximate number, because we must delete entire buckets. */ #define DELETE_RECORD_LIMIT 10000 /* Calculate slot index from hash value. ** RLOCK_HASH or WLOCK_HASH must be done before. */ static ERTS_INLINE Uint hash_to_ix(DbTableHash* tb, HashValue hval) { Uint mask = erts_smp_atomic_read_acqb(&tb->szm); Uint ix = hval & mask; if (ix >= erts_smp_atomic_read(&tb->nactive)) { ix &= mask>>1; ASSERT(ix < erts_smp_atomic_read(&tb->nactive)); } return ix; } /* Remember a slot containing a pseudo-deleted item (INVALID_HASH) */ static ERTS_INLINE void add_fixed_deletion(DbTableHash* tb, int ix) { erts_aint_t was_next; erts_aint_t exp_next; FixedDeletion* fixd = (FixedDeletion*) erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL, (DbTable *) tb, sizeof(FixedDeletion)); ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion)); fixd->slot = ix; was_next = erts_smp_atomic_read(&tb->fixdel); do { /* Lockless atomic insertion in linked list: */ exp_next = was_next; fixd->next = (FixedDeletion*) exp_next; was_next = erts_smp_atomic_cmpxchg(&tb->fixdel, (erts_aint_t) fixd, exp_next); }while (was_next != exp_next); } #define MAX_HASH 0xEFFFFFFFUL #define INVALID_HASH 0xFFFFFFFFUL /* optimised version of make_hash (normal case? atomic key) */ #define MAKE_HASH(term) \ ((is_atom(term) ? (atom_tab(atom_val(term))->slot.bucket.hvalue) : \ make_hash2(term)) % MAX_HASH) #ifdef ERTS_SMP # define DB_HASH_LOCK_MASK (DB_HASH_LOCK_CNT-1) # define GET_LOCK(tb,hval) (&(tb)->locks->lck_vec[(hval) & DB_HASH_LOCK_MASK].lck) /* Fine grained read lock */ static ERTS_INLINE erts_smp_rwmtx_t* RLOCK_HASH(DbTableHash* tb, HashValue hval) { if (tb->common.is_thread_safe) { return NULL; } else { erts_smp_rwmtx_t* lck = GET_LOCK(tb,hval); ASSERT(tb->common.type & DB_FINE_LOCKED); erts_smp_rwmtx_rlock(lck); return lck; } } /* Fine grained write lock */ static ERTS_INLINE erts_smp_rwmtx_t* WLOCK_HASH(DbTableHash* tb, HashValue hval) { if (tb->common.is_thread_safe) { return NULL; } else { erts_smp_rwmtx_t* lck = GET_LOCK(tb,hval); ASSERT(tb->common.type & DB_FINE_LOCKED); erts_smp_rwmtx_rwlock(lck); return lck; } } static ERTS_INLINE void RUNLOCK_HASH(erts_smp_rwmtx_t* lck) { if (lck != NULL) { erts_smp_rwmtx_runlock(lck); } } static ERTS_INLINE void WUNLOCK_HASH(erts_smp_rwmtx_t* lck) { if (lck != NULL) { erts_smp_rwmtx_rwunlock(lck); } } #else /* ERTS_SMP */ # define RLOCK_HASH(tb,hval) NULL # define WLOCK_HASH(tb,hval) NULL # define RUNLOCK_HASH(lck) ((void)lck) # define WUNLOCK_HASH(lck) ((void)lck) #endif /* ERTS_SMP */ #ifdef ERTS_ENABLE_LOCK_CHECK # define IFN_EXCL(tb,cmd) (((tb)->common.is_thread_safe) || (cmd)) # define IS_HASH_RLOCKED(tb,hval) IFN_EXCL(tb,erts_smp_lc_rwmtx_is_rlocked(GET_LOCK(tb,hval))) # define IS_HASH_WLOCKED(tb,lck) IFN_EXCL(tb,erts_smp_lc_rwmtx_is_rwlocked(lck)) # define IS_TAB_WLOCKED(tb) erts_smp_lc_rwmtx_is_rwlocked(&(tb)->common.rwlock) #else # define IS_HASH_RLOCKED(tb,hval) (1) # define IS_HASH_WLOCKED(tb,hval) (1) # define IS_TAB_WLOCKED(tb) (1) #endif /* Iteration helper ** Returns "next" slot index or 0 if EOT reached. ** Slot READ locks updated accordingly, unlocked if EOT. */ static ERTS_INLINE Sint next_slot(DbTableHash* tb, Uint ix, erts_smp_rwmtx_t** lck_ptr) { #ifdef ERTS_SMP ix += DB_HASH_LOCK_CNT; if (ix < NACTIVE(tb)) return ix; RUNLOCK_HASH(*lck_ptr); ix = (ix + 1) & DB_HASH_LOCK_MASK; if (ix != 0) *lck_ptr = RLOCK_HASH(tb,ix); return ix; #else return (++ix < NACTIVE(tb)) ? ix : 0; #endif } /* Same as next_slot but with WRITE locking */ static ERTS_INLINE Sint next_slot_w(DbTableHash* tb, Uint ix, erts_smp_rwmtx_t** lck_ptr) { #ifdef ERTS_SMP ix += DB_HASH_LOCK_CNT; if (ix < NACTIVE(tb)) return ix; WUNLOCK_HASH(*lck_ptr); ix = (ix + 1) & DB_HASH_LOCK_MASK; if (ix != 0) *lck_ptr = WLOCK_HASH(tb,ix); return ix; #else return next_slot(tb,ix,lck_ptr); #endif } /* * Some special binary flags */ #define BIN_FLAG_ALL_OBJECTS BIN_FLAG_USR1 static ERTS_INLINE void free_term(DbTableHash *tb, HashDbTerm* p) { db_free_term((DbTable*)tb, p, offsetof(HashDbTerm, dbterm)); } /* * Local types */ struct mp_prefound { HashDbTerm** bucket; int ix; }; struct mp_info { int all_objects; /* True if complete objects are always * returned from the match_spec (can use * copy_shallow on the return value) */ int something_can_match; /* The match_spec is not "impossible" */ int key_given; struct mp_prefound dlists[10]; /* Default list of "pre-found" buckets */ struct mp_prefound* lists; /* Buckets to search if keys are given, * = dlists initially */ unsigned num_lists; /* Number of elements in "lists", * = 0 initially */ Binary *mp; /* The compiled match program */ }; /* A table segment */ struct segment { HashDbTerm* buckets[SEGSZ]; #ifdef MYDEBUG int is_ext_segment; #endif }; /* A segment that also contains a segment table */ struct ext_segment { struct segment s; /* The segment itself. Must be first */ struct segment** prev_segtab; /* Used when table is shrinking */ int nsegs; /* Size of segtab */ struct segment* segtab[1]; /* The segment table */ }; #define SIZEOF_EXTSEG(NSEGS) \ (sizeof(struct ext_segment) - sizeof(struct segment*) + sizeof(struct segment*)*(NSEGS)) #ifdef DEBUG # include /* offsetof */ # define EXTSEG(SEGTAB_PTR) \ ((struct ext_segment*) (((char*)(SEGTAB_PTR)) - offsetof(struct ext_segment,segtab))) #endif /* How the table segments relate to each other: ext_segment: ext_segment: "plain" segment #=================# #================# #=============# | bucket[0] |<--+ +------->| bucket[256] | +->| bucket[512] | | bucket[1] | | | | [257] | | | [513] | : : | | : : | : : | bucket[255] | | | | [511] | | | [767] | |-----------------| | | |----------------| | #=============# | prev_segtab=NULL| | | +--<---prev_segtab | | | nsegs = 2 | | | | | nsegs = 256 | | +->| segtab[0] -->-------+---|---|--<---segtab[0] |<-+ | | | segtab[1] -->-----------+---|--<---segtab[1] | | | | #=================# | | segtab[2] -->-----|--+ ext_segment: | | : : | #================# +----------------<---------------+ | segtab[255] ->----|----->| bucket[255*256]| #================# | | | | : : | |----------------| +----<---prev_segtab | : : */ /* ** Forward decl's (static functions) */ static struct ext_segment* alloc_ext_seg(DbTableHash* tb, unsigned seg_ix, struct segment** old_segtab); static int alloc_seg(DbTableHash *tb); static int free_seg(DbTableHash *tb, int free_records); static HashDbTerm* next(DbTableHash *tb, Uint *iptr, erts_smp_rwmtx_t** lck_ptr, HashDbTerm *list); static HashDbTerm* search_list(DbTableHash* tb, Eterm key, HashValue hval, HashDbTerm *list); static void shrink(DbTableHash* tb, int nactive); static void grow(DbTableHash* tb, int nactive); static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2, DbTableHash*); static int analyze_pattern(DbTableHash *tb, Eterm pattern, struct mp_info *mpi); /* * Method interface functions */ static int db_first_hash(Process *p, DbTable *tbl, Eterm *ret); static int db_next_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret); static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret); static int db_get_element_hash(Process *p, DbTable *tbl, Eterm key, int ndex, Eterm *ret); static int db_erase_object_hash(DbTable *tbl, Eterm object,Eterm *ret); static int db_slot_hash(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret); static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm pattern, Sint chunk_size, int reverse, Eterm *ret); static int db_select_hash(Process *p, DbTable *tbl, Eterm pattern, int reverse, Eterm *ret); static int db_select_count_hash(Process *p, DbTable *tbl, Eterm pattern, Eterm *ret); static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm pattern, Eterm *ret); static int db_select_continue_hash(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret); static int db_select_count_continue_hash(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret); static int db_select_delete_continue_hash(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret); static void db_print_hash(int to, void *to_arg, int show, DbTable *tbl); static int db_free_table_hash(DbTable *tbl); static int db_free_table_continue_hash(DbTable *tbl); static void db_foreach_offheap_hash(DbTable *, void (*)(ErlOffHeap *, void *), void *); static int db_delete_all_objects_hash(Process* p, DbTable* tbl); #ifdef HARDDEBUG static void db_check_table_hash(DbTableHash *tb); #endif static int db_lookup_dbterm_hash(DbTable *tbl, Eterm key, DbUpdateHandle* handle); static void db_finalize_dbterm_hash(DbUpdateHandle* handle); static ERTS_INLINE void try_shrink(DbTableHash* tb) { int nactive = NACTIVE(tb); if (nactive > SEGSZ && NITEMS(tb) < (nactive * CHAIN_LEN) && !IS_FIXED(tb)) { shrink(tb, nactive); } } #define EQ_REL(x,y,y_base) \ (is_same(x,NULL,y,y_base) || (is_not_both_immed((x),(y)) && eq_rel((x),NULL,(y),y_base))) /* Is this a live object (not pseodo-deleted) with the specified key? */ static ERTS_INLINE int has_live_key(DbTableHash* tb, HashDbTerm* b, Eterm key, HashValue hval) { if (b->hvalue != hval) return 0; else { Eterm itemKey = GETKEY(tb, b->dbterm.tpl); ASSERT(!is_header(itemKey)); return EQ_REL(key, itemKey, b->dbterm.tpl); } } /* Has this object the specified key? Can be pseudo-deleted. */ static ERTS_INLINE int has_key(DbTableHash* tb, HashDbTerm* b, Eterm key, HashValue hval) { if (b->hvalue != hval && b->hvalue != INVALID_HASH) return 0; else { Eterm itemKey = GETKEY(tb, b->dbterm.tpl); ASSERT(!is_header(itemKey)); return EQ_REL(key, itemKey, b->dbterm.tpl); } } static ERTS_INLINE HashDbTerm* new_dbterm(DbTableHash* tb, Eterm obj) { HashDbTerm* p; if (tb->common.compress) { p = db_store_term_comp(&tb->common, NULL, offsetof(HashDbTerm,dbterm), obj); } else { p = db_store_term(&tb->common, NULL, offsetof(HashDbTerm,dbterm), obj); } return p; } static ERTS_INLINE HashDbTerm* replace_dbterm(DbTableHash* tb, HashDbTerm* old, Eterm obj) { HashDbTerm* ret; ASSERT(old != NULL); if (tb->common.compress) { ret = db_store_term_comp(&tb->common, &(old->dbterm), offsetof(HashDbTerm,dbterm), obj); } else { ret = db_store_term(&tb->common, &(old->dbterm), offsetof(HashDbTerm,dbterm), obj); } return ret; } /* ** External interface */ DbTableMethod db_hash = { db_create_hash, db_first_hash, db_next_hash, db_first_hash, /* last == first */ db_next_hash, /* prev == next */ db_put_hash, db_get_hash, db_get_element_hash, db_member_hash, db_erase_hash, db_erase_object_hash, db_slot_hash, db_select_chunk_hash, db_select_hash, db_select_delete_hash, db_select_continue_hash, /* hmm continue_hash? */ db_select_delete_continue_hash, db_select_count_hash, db_select_count_continue_hash, db_delete_all_objects_hash, db_free_table_hash, db_free_table_continue_hash, db_print_hash, db_foreach_offheap_hash, #ifdef HARDDEBUG db_check_table_hash, #else NULL, #endif db_lookup_dbterm_hash, db_finalize_dbterm_hash }; #ifdef DEBUG /* Wait a while to provoke race and get code coverage */ static void DEBUG_WAIT(void) { unsigned long spin = 1UL << 20; while (--spin); } #else # define DEBUG_WAIT() #endif /* Rare case of restoring the rest of the fixdel list when "unfixer" gets interrupted by "fixer" */ static void restore_fixdel(DbTableHash* tb, FixedDeletion* fixdel) { /*int tries = 0;*/ DEBUG_WAIT(); if (erts_smp_atomic_cmpxchg(&tb->fixdel, (erts_aint_t)fixdel, (erts_aint_t)NULL) != (erts_aint_t)NULL) { /* Oboy, must join lists */ FixedDeletion* last = fixdel; erts_aint_t was_tail; erts_aint_t exp_tail; while (last->next != NULL) last = last->next; was_tail = erts_smp_atomic_read(&tb->fixdel); do { /* Lockless atomic list insertion */ exp_tail = was_tail; last->next = (FixedDeletion*) exp_tail; /*++tries;*/ DEBUG_WAIT(); was_tail = erts_smp_atomic_cmpxchg(&tb->fixdel, (erts_aint_t)fixdel, exp_tail); }while (was_tail != exp_tail); } /*erts_fprintf(stderr,"erl_db_hash: restore_fixdel tries=%d\r\n", tries);*/ } /* ** Table interface routines ie what's called by the bif's */ void db_unfix_table_hash(DbTableHash *tb) { FixedDeletion* fixdel; ERTS_SMP_LC_ASSERT(erts_smp_lc_rwmtx_is_rwlocked(&tb->common.rwlock) || (erts_smp_lc_rwmtx_is_rlocked(&tb->common.rwlock) && !tb->common.is_thread_safe)); restart: fixdel = (FixedDeletion*) erts_smp_atomic_xchg(&tb->fixdel, (erts_aint_t)NULL); while (fixdel != NULL) { FixedDeletion *fx = fixdel; int ix = fx->slot; HashDbTerm **bp; HashDbTerm *b; erts_smp_rwmtx_t* lck = WLOCK_HASH(tb,ix); if (IS_FIXED(tb)) { /* interrupted by fixer */ WUNLOCK_HASH(lck); restore_fixdel(tb,fixdel); if (!IS_FIXED(tb)) { goto restart; /* unfixed again! */ } return; } if (ix < NACTIVE(tb)) { bp = &BUCKET(tb, ix); b = *bp; while (b != NULL) { if (b->hvalue == INVALID_HASH) { *bp = b->next; free_term(tb, b); b = *bp; } else { bp = &b->next; b = b->next; } } } /* else slot has been joined and purged by shrink() */ WUNLOCK_HASH(lck); fixdel = fx->next; erts_db_free(ERTS_ALC_T_DB_FIX_DEL, (DbTable *) tb, (void *) fx, sizeof(FixedDeletion)); ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion)); } /* ToDo: Maybe try grow/shrink the table as well */ } /* Only used by tests */ Uint db_kept_items_hash(DbTableHash *tb) { Uint kept_items = 0; Uint ix = 0; erts_smp_rwmtx_t* lck = RLOCK_HASH(tb,ix); HashDbTerm* b; do { for (b = BUCKET(tb, ix); b != NULL; b = b->next) { if (b->hvalue == INVALID_HASH) { ++kept_items; } } ix = next_slot(tb, ix, &lck); }while (ix); return kept_items; } int db_create_hash(Process *p, DbTable *tbl) { DbTableHash *tb = &tbl->hash; erts_smp_atomic_init(&tb->szm, SEGSZ_MASK); erts_smp_atomic_init(&tb->nactive, SEGSZ); erts_smp_atomic_init(&tb->fixdel, (erts_aint_t)NULL); erts_smp_atomic_init(&tb->segtab, (erts_aint_t) alloc_ext_seg(tb,0,NULL)->segtab); tb->nsegs = NSEG_1; tb->nslots = SEGSZ; erts_smp_atomic_init(&tb->is_resizing, 0); #ifdef ERTS_SMP if (tb->common.type & DB_FINE_LOCKED) { erts_smp_rwmtx_opt_t rwmtx_opt = ERTS_SMP_RWMTX_OPT_DEFAULT_INITER; int i; if (tb->common.type & DB_FREQ_READ) rwmtx_opt.type = ERTS_SMP_RWMTX_TYPE_FREQUENT_READ; tb->locks = (DbTableHashFineLocks*) erts_db_alloc_fnf(ERTS_ALC_T_DB_SEG, /* Other type maybe? */ (DbTable *) tb, sizeof(DbTableHashFineLocks)); for (i=0; ilocks->lck_vec[i].lck, &rwmtx_opt, "db_hash_slot", make_small(i)); } /* This important property is needed to guarantee that the buckets * involved in a grow/shrink operation it protected by the same lock: */ ASSERT(erts_smp_atomic_read(&tb->nactive) % DB_HASH_LOCK_CNT == 0); } else { /* coarse locking */ tb->locks = NULL; } ERTS_THR_MEMORY_BARRIER; #endif /* ERST_SMP */ return DB_ERROR_NONE; } static int db_first_hash(Process *p, DbTable *tbl, Eterm *ret) { DbTableHash *tb = &tbl->hash; Uint ix = 0; erts_smp_rwmtx_t* lck = RLOCK_HASH(tb,ix); HashDbTerm* list; for (;;) { list = BUCKET(tb,ix); if (list != NULL) { if (list->hvalue == INVALID_HASH) { list = next(tb,&ix,&lck,list); } break; } if ((ix=next_slot(tb,ix,&lck)) == 0) { list = NULL; break; } } if (list != NULL) { *ret = db_copy_key(p, tbl, &list->dbterm); RUNLOCK_HASH(lck); } else { *ret = am_EOT; } return DB_ERROR_NONE; } static int db_next_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret) { DbTableHash *tb = &tbl->hash; HashValue hval; Uint ix; HashDbTerm* b; erts_smp_rwmtx_t* lck; hval = MAKE_HASH(key); lck = RLOCK_HASH(tb,hval); ix = hash_to_ix(tb, hval); b = BUCKET(tb, ix); for (;;) { if (b == NULL) { RUNLOCK_HASH(lck); return DB_ERROR_BADKEY; } if (has_key(tb, b, key, hval)) { break; } b = b->next; } /* Key found */ b = next(tb, &ix, &lck, b); if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) { while (b != 0) { if (!has_live_key(tb, b, key, hval)) { break; } b = next(tb, &ix, &lck, b); } } if (b == NULL) { *ret = am_EOT; } else { *ret = db_copy_key(p, tbl, &b->dbterm); RUNLOCK_HASH(lck); } return DB_ERROR_NONE; } int db_put_hash(DbTable *tbl, Eterm obj, int key_clash_fail) { DbTableHash *tb = &tbl->hash; HashValue hval; int ix; Eterm key; HashDbTerm** bp; HashDbTerm* b; HashDbTerm* q; erts_smp_rwmtx_t* lck; int nitems; int ret = DB_ERROR_NONE; key = GETKEY(tb, tuple_val(obj)); hval = MAKE_HASH(key); lck = WLOCK_HASH(tb, hval); ix = hash_to_ix(tb, hval); bp = &BUCKET(tb, ix); b = *bp; for (;;) { if (b == NULL) { goto Lnew; } if (has_key(tb,b,key,hval)) { break; } bp = &b->next; b = b->next; } /* Key found */ if (tb->common.status & DB_SET) { HashDbTerm* bnext = b->next; if (b->hvalue == INVALID_HASH) { erts_smp_atomic_inc(&tb->common.nitems); } else if (key_clash_fail) { ret = DB_ERROR_BADKEY; goto Ldone; } q = replace_dbterm(tb, b, obj); q->next = bnext; q->hvalue = hval; /* In case of INVALID_HASH */ *bp = q; goto Ldone; } else if (key_clash_fail) { /* && (DB_BAG || DB_DUPLICATE_BAG) */ q = b; do { if (q->hvalue != INVALID_HASH) { ret = DB_ERROR_BADKEY; goto Ldone; } q = q->next; }while (q != NULL && has_key(tb,q,key,hval)); } else if (tb->common.status & DB_BAG) { HashDbTerm** qp = bp; q = b; do { if (db_eq(&tb->common,obj,&q->dbterm)) { if (q->hvalue == INVALID_HASH) { erts_smp_atomic_inc(&tb->common.nitems); q->hvalue = hval; if (q != b) { /* must move to preserve key insertion order */ *qp = q->next; q->next = b; *bp = q; } } goto Ldone; } qp = &q->next; q = *qp; }while (q != NULL && has_key(tb,q,key,hval)); } /*else DB_DUPLICATE_BAG */ Lnew: q = new_dbterm(tb, obj); q->hvalue = hval; q->next = b; *bp = q; nitems = erts_smp_atomic_inctest(&tb->common.nitems); WUNLOCK_HASH(lck); { int nactive = NACTIVE(tb); if (nitems > nactive * (CHAIN_LEN+1) && !IS_FIXED(tb)) { grow(tb, nactive); } } CHECK_TABLES(); return DB_ERROR_NONE; Ldone: WUNLOCK_HASH(lck); return ret; } int db_get_hash(Process *p, DbTable *tbl, Eterm key, Eterm *ret) { DbTableHash *tb = &tbl->hash; HashValue hval; int ix; HashDbTerm* b1; erts_smp_rwmtx_t* lck; hval = MAKE_HASH(key); lck = RLOCK_HASH(tb,hval); ix = hash_to_ix(tb, hval); b1 = BUCKET(tb, ix); while(b1 != 0) { if (has_live_key(tb,b1,key,hval)) { HashDbTerm* b2 = b1->next; Eterm copy; if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) { while(b2 != NULL && has_key(tb,b2,key,hval)) b2 = b2->next; } copy = build_term_list(p, b1, b2, tb); CHECK_TABLES(); *ret = copy; goto done; } b1 = b1->next; } *ret = NIL; done: RUNLOCK_HASH(lck); return DB_ERROR_NONE; } int db_get_element_array(DbTable *tbl, Eterm key, int ndex, Eterm *ret, int *num_ret) { DbTableHash *tb = &tbl->hash; HashValue hval; int ix; HashDbTerm* b1; int num = 0; int retval; erts_smp_rwmtx_t* lck; ASSERT(!IS_FIXED(tbl)); /* no support for fixed tables here */ hval = MAKE_HASH(key); lck = RLOCK_HASH(tb, hval); ix = hash_to_ix(tb, hval); b1 = BUCKET(tb, ix); while(b1 != 0) { if (has_live_key(tb,b1,key,hval)) { if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) { HashDbTerm* b; HashDbTerm* b2 = b1->next; while(b2 != NULL && has_live_key(tb,b2,key,hval)) { if (ndex > arityval(b2->dbterm.tpl[0])) { retval = DB_ERROR_BADITEM; goto done; } b2 = b2->next; } b = b1; while(b != b2) { if (num < *num_ret) { ret[num++] = b->dbterm.tpl[ndex]; } else { retval = DB_ERROR_NONE; goto done; } b = b->next; } *num_ret = num; } else { ASSERT(*num_ret > 0); ret[0] = b1->dbterm.tpl[ndex]; *num_ret = 1; } retval = DB_ERROR_NONE; goto done; } b1 = b1->next; } retval = DB_ERROR_BADKEY; done: RUNLOCK_HASH(lck); return retval; } static int db_member_hash(DbTable *tbl, Eterm key, Eterm *ret) { DbTableHash *tb = &tbl->hash; HashValue hval; int ix; HashDbTerm* b1; erts_smp_rwmtx_t* lck; hval = MAKE_HASH(key); ix = hash_to_ix(tb, hval); lck = RLOCK_HASH(tb, hval); b1 = BUCKET(tb, ix); while(b1 != 0) { if (has_live_key(tb,b1,key,hval)) { *ret = am_true; goto done; } b1 = b1->next; } *ret = am_false; done: RUNLOCK_HASH(lck); return DB_ERROR_NONE; } static int db_get_element_hash(Process *p, DbTable *tbl, Eterm key, int ndex, Eterm *ret) { DbTableHash *tb = &tbl->hash; HashValue hval; int ix; HashDbTerm* b1; erts_smp_rwmtx_t* lck; int retval; hval = MAKE_HASH(key); lck = RLOCK_HASH(tb, hval); ix = hash_to_ix(tb, hval); b1 = BUCKET(tb, ix); while(b1 != 0) { if (has_live_key(tb,b1,key,hval)) { if (ndex > arityval(b1->dbterm.tpl[0])) { retval = DB_ERROR_BADITEM; goto done; } if (tb->common.status & (DB_BAG | DB_DUPLICATE_BAG)) { HashDbTerm* b; HashDbTerm* b2 = b1->next; Eterm elem_list = NIL; while(b2 != NULL && has_key(tb,b2,key,hval)) { if (ndex > arityval(b2->dbterm.tpl[0]) && b2->hvalue != INVALID_HASH) { retval = DB_ERROR_BADITEM; goto done; } b2 = b2->next; } b = b1; while(b != b2) { if (b->hvalue != INVALID_HASH) { Eterm *hp; Eterm copy = db_copy_element_from_ets(&tb->common, p, &b->dbterm, ndex, &hp, 2); elem_list = CONS(hp, copy, elem_list); hp += 2; } b = b->next; } *ret = elem_list; } else { Eterm* hp; *ret = db_copy_element_from_ets(&tb->common, p, &b1->dbterm, ndex, &hp, 0); } retval = DB_ERROR_NONE; goto done; } b1 = b1->next; } retval = DB_ERROR_BADKEY; done: RUNLOCK_HASH(lck); return retval; } /* * Very internal interface, removes elements of arity two from * BAG. Used for the PID meta table */ int db_erase_bag_exact2(DbTable *tbl, Eterm key, Eterm value) { DbTableHash *tb = &tbl->hash; HashValue hval; int ix; HashDbTerm** bp; HashDbTerm* b; erts_smp_rwmtx_t* lck; int found = 0; hval = MAKE_HASH(key); lck = WLOCK_HASH(tb,hval); ix = hash_to_ix(tb, hval); bp = &BUCKET(tb, ix); b = *bp; ASSERT(!IS_FIXED(tb)); ASSERT((tb->common.status & DB_BAG)); ASSERT(!tb->common.compress); while(b != 0) { if (has_live_key(tb,b,key,hval)) { found = 1; if ((arityval(b->dbterm.tpl[0]) == 2) && EQ(value, b->dbterm.tpl[2])) { *bp = b->next; free_term(tb, b); erts_smp_atomic_dec(&tb->common.nitems); b = *bp; break; } } else if (found) { break; } bp = &b->next; b = b->next; } WUNLOCK_HASH(lck); if (found) { try_shrink(tb); } return DB_ERROR_NONE; } /* ** NB, this is for the db_erase/2 bif. */ int db_erase_hash(DbTable *tbl, Eterm key, Eterm *ret) { DbTableHash *tb = &tbl->hash; HashValue hval; int ix; HashDbTerm** bp; HashDbTerm* b; erts_smp_rwmtx_t* lck; int nitems_diff = 0; hval = MAKE_HASH(key); lck = WLOCK_HASH(tb,hval); ix = hash_to_ix(tb, hval); bp = &BUCKET(tb, ix); b = *bp; while(b != 0) { if (has_live_key(tb,b,key,hval)) { --nitems_diff; if (nitems_diff == -1 && IS_FIXED(tb)) { /* Pseudo remove (no need to keep several of same key) */ add_fixed_deletion(tb, ix); b->hvalue = INVALID_HASH; } else { *bp = b->next; free_term(tb, b); b = *bp; continue; } } else { if (nitems_diff && b->hvalue != INVALID_HASH) break; } bp = &b->next; b = b->next; } WUNLOCK_HASH(lck); if (nitems_diff) { erts_smp_atomic_add(&tb->common.nitems, nitems_diff); try_shrink(tb); } *ret = am_true; return DB_ERROR_NONE; } /* ** This is for the ets:delete_object BIF */ static int db_erase_object_hash(DbTable *tbl, Eterm object, Eterm *ret) { DbTableHash *tb = &tbl->hash; HashValue hval; int ix; HashDbTerm** bp; HashDbTerm* b; erts_smp_rwmtx_t* lck; int nitems_diff = 0; int nkeys = 0; Eterm key; key = GETKEY(tb, tuple_val(object)); hval = MAKE_HASH(key); lck = WLOCK_HASH(tb,hval); ix = hash_to_ix(tb, hval); bp = &BUCKET(tb, ix); b = *bp; while(b != 0) { if (has_live_key(tb,b,key,hval)) { ++nkeys; if (db_eq(&tb->common,object, &b->dbterm)) { --nitems_diff; if (nkeys==1 && IS_FIXED(tb)) { /* Pseudo remove */ add_fixed_deletion(tb,ix); b->hvalue = INVALID_HASH; bp = &b->next; b = b->next; } else { *bp = b->next; free_term(tb, b); b = *bp; } if (tb->common.status & (DB_DUPLICATE_BAG)) { continue; } else { break; } } } else if (nitems_diff && b->hvalue != INVALID_HASH) { break; } bp = &b->next; b = b->next; } WUNLOCK_HASH(lck); if (nitems_diff) { erts_smp_atomic_add(&tb->common.nitems, nitems_diff); try_shrink(tb); } *ret = am_true; return DB_ERROR_NONE; } static int db_slot_hash(Process *p, DbTable *tbl, Eterm slot_term, Eterm *ret) { DbTableHash *tb = &tbl->hash; erts_smp_rwmtx_t* lck; Sint slot; int retval; int nactive; if (is_not_small(slot_term) || ((slot = signed_val(slot_term)) < 0)) { return DB_ERROR_BADPARAM; } lck = RLOCK_HASH(tb, slot); nactive = NACTIVE(tb); if (slot < nactive) { *ret = build_term_list(p, BUCKET(tb, slot), 0, tb); retval = DB_ERROR_NONE; } else if (slot == nactive) { *ret = am_EOT; retval = DB_ERROR_NONE; } else { retval = DB_ERROR_BADPARAM; } RUNLOCK_HASH(lck); return retval; } /* * This is just here so I can take care of the return value * that is to be sent during a trap (the BIF_TRAP macros explicitly returns) */ static BIF_RETTYPE bif_trap1(Export *bif, Process *p, Eterm p1) { BIF_TRAP1(bif, p, p1); } /* * Continue collecting select matches, this may happen either due to a trap * or when the user calls ets:select/1 */ static int db_select_continue_hash(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret) { DbTableHash *tb = &tbl->hash; Sint slot_ix; Sint save_slot_ix; Sint chunk_size; int all_objects; Binary *mp; int num_left = 1000; HashDbTerm *current = 0; Eterm match_list; Eterm *hp; Eterm match_res; Sint got; Eterm *tptr; erts_smp_rwmtx_t* lck; #define RET_TO_BIF(Term, State) do { *ret = (Term); return State; } while(0); /* Decode continuation. We know it's a tuple but not the arity or anything else */ tptr = tuple_val(continuation); if (arityval(*tptr) != 6) RET_TO_BIF(NIL,DB_ERROR_BADPARAM); if (!is_small(tptr[2]) || !is_small(tptr[3]) || !is_binary(tptr[4]) || !(is_list(tptr[5]) || tptr[5] == NIL) || !is_small(tptr[6])) RET_TO_BIF(NIL,DB_ERROR_BADPARAM); if ((chunk_size = signed_val(tptr[3])) < 0) RET_TO_BIF(NIL,DB_ERROR_BADPARAM); if (!(thing_subtag(*binary_val(tptr[4])) == REFC_BINARY_SUBTAG)) RET_TO_BIF(NIL,DB_ERROR_BADPARAM); mp = ((ProcBin *) binary_val(tptr[4]))->val; if (!IsMatchProgBinary(mp)) RET_TO_BIF(NIL,DB_ERROR_BADPARAM); all_objects = mp->flags & BIN_FLAG_ALL_OBJECTS; match_list = tptr[5]; if ((got = signed_val(tptr[6])) < 0) RET_TO_BIF(NIL,DB_ERROR_BADPARAM); slot_ix = signed_val(tptr[2]); if (slot_ix < 0 /* EOT */ || (chunk_size && got >= chunk_size)) { goto done; /* Already got all or enough in the match_list */ } lck = RLOCK_HASH(tb,slot_ix); if (slot_ix >= NACTIVE(tb)) { RUNLOCK_HASH(lck); RET_TO_BIF(NIL,DB_ERROR_BADPARAM); } while ((current = BUCKET(tb,slot_ix)) == NULL) { slot_ix = next_slot(tb, slot_ix, &lck); if (slot_ix == 0) { slot_ix = -1; /* EOT */ goto done; } } for(;;) { if (current->hvalue != INVALID_HASH && (match_res = db_match_dbterm(&tb->common, p, mp, all_objects, ¤t->dbterm, &hp, 2), is_value(match_res))) { match_list = CONS(hp, match_res, match_list); ++got; } --num_left; save_slot_ix = slot_ix; if ((current = next(tb, (Uint*)&slot_ix, &lck, current)) == NULL) { slot_ix = -1; /* EOT */ break; } if (slot_ix != save_slot_ix) { if (chunk_size && got >= chunk_size) { RUNLOCK_HASH(lck); break; } if (num_left <= 0 || MBUF(p)) { /* * We have either reached our limit, or just created some heap fragments. * Since many heap fragments will make the GC slower, trap and GC now. */ RUNLOCK_HASH(lck); goto trap; } } } done: BUMP_REDS(p, 1000 - num_left); if (chunk_size) { Eterm continuation; Eterm rest = NIL; Sint rest_size = 0; if (got > chunk_size) { /* Cannot write destructively here, the list may have been in user space */ rest = NIL; hp = HAlloc(p, (got - chunk_size) * 2); while (got-- > chunk_size) { rest = CONS(hp, CAR(list_val(match_list)), rest); hp += 2; match_list = CDR(list_val(match_list)); ++rest_size; } } if (rest != NIL || slot_ix >= 0) { hp = HAlloc(p,3+7); continuation = TUPLE6(hp, tptr[1], make_small(slot_ix), tptr[3], tptr[4], rest, make_small(rest_size)); hp += 7; RET_TO_BIF(TUPLE2(hp, match_list, continuation),DB_ERROR_NONE); } else { if (match_list != NIL) { hp = HAlloc(p, 3); RET_TO_BIF(TUPLE2(hp, match_list, am_EOT),DB_ERROR_NONE); } else { RET_TO_BIF(am_EOT, DB_ERROR_NONE); } } } RET_TO_BIF(match_list,DB_ERROR_NONE); trap: BUMP_ALL_REDS(p); hp = HAlloc(p,7); continuation = TUPLE6(hp, tptr[1], make_small(slot_ix), tptr[3], tptr[4], match_list, make_small(got)); RET_TO_BIF(bif_trap1(&ets_select_continue_exp, p, continuation), DB_ERROR_NONE); #undef RET_TO_BIF } static int db_select_hash(Process *p, DbTable *tbl, Eterm pattern, int reverse, Eterm *ret) { return db_select_chunk_hash(p, tbl, pattern, 0, reverse, ret); } static int db_select_chunk_hash(Process *p, DbTable *tbl, Eterm pattern, Sint chunk_size, int reverse, /* not used */ Eterm *ret) { DbTableHash *tb = &tbl->hash; struct mp_info mpi; Sint slot_ix; HashDbTerm *current = 0; unsigned current_list_pos = 0; Eterm match_list; Eterm match_res; Eterm *hp; int num_left = 1000; Uint got = 0; Eterm continuation; int errcode; Eterm mpb; erts_smp_rwmtx_t* lck; #define RET_TO_BIF(Term,RetVal) do { \ if (mpi.mp != NULL) { \ erts_bin_free(mpi.mp); \ } \ if (mpi.lists != mpi.dlists) { \ erts_free(ERTS_ALC_T_DB_SEL_LIST, \ (void *) mpi.lists); \ } \ *ret = (Term); \ return RetVal; \ } while(0) if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } if (!mpi.something_can_match) { if (chunk_size) { RET_TO_BIF(am_EOT, DB_ERROR_NONE); /* We're done */ } RET_TO_BIF(NIL, DB_ERROR_NONE); /* can't possibly match anything */ } if (!mpi.key_given) { /* Run this code if pattern is variable or GETKEY(pattern) */ /* is a variable */ slot_ix = 0; lck = RLOCK_HASH(tb,slot_ix); for (;;) { ASSERT(slot_ix < NACTIVE(tb)); if ((current = BUCKET(tb,slot_ix)) != NULL) { break; } slot_ix = next_slot(tb,slot_ix,&lck); if (slot_ix == 0) { if (chunk_size) { RET_TO_BIF(am_EOT, DB_ERROR_NONE); /* We're done */ } RET_TO_BIF(NIL,DB_ERROR_NONE); } } } else { /* We have at least one */ slot_ix = mpi.lists[current_list_pos].ix; lck = RLOCK_HASH(tb, slot_ix); current = *(mpi.lists[current_list_pos].bucket); ASSERT(current == BUCKET(tb,slot_ix)); ++current_list_pos; } match_list = NIL; for(;;) { if (current != NULL) { if (current->hvalue != INVALID_HASH) { match_res = db_match_dbterm(&tb->common, p, mpi.mp, 0, ¤t->dbterm, &hp, 2); if (is_value(match_res)) { match_list = CONS(hp, match_res, match_list); ++got; } } current = current->next; } else if (mpi.key_given) { /* Key is bound */ RUNLOCK_HASH(lck); if (current_list_pos == mpi.num_lists) { slot_ix = -1; /* EOT */ goto done; } else { slot_ix = mpi.lists[current_list_pos].ix; lck = RLOCK_HASH(tb, slot_ix); current = *(mpi.lists[current_list_pos].bucket); ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix)); ++current_list_pos; } } else { /* Key is variable */ --num_left; if ((slot_ix=next_slot(tb,slot_ix,&lck)) == 0) { slot_ix = -1; break; } if (chunk_size && got >= chunk_size) { RUNLOCK_HASH(lck); break; } if (num_left <= 0 || MBUF(p)) { /* * We have either reached our limit, or just created some heap fragments. * Since many heap fragments will make the GC slower, trap and GC now. */ RUNLOCK_HASH(lck); goto trap; } current = BUCKET(tb,slot_ix); } } done: BUMP_REDS(p, 1000 - num_left); if (chunk_size) { Eterm continuation; Eterm rest = NIL; Sint rest_size = 0; if (mpi.all_objects) (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS; if (got > chunk_size) { /* Split list in return value and 'rest' */ Eterm tmp = match_list; rest = match_list; while (got-- > chunk_size + 1) { tmp = CDR(list_val(tmp)); ++rest_size; } ++rest_size; match_list = CDR(list_val(tmp)); CDR(list_val(tmp)) = NIL; /* Destructive, the list has never been in 'user space' */ } if (rest != NIL || slot_ix >= 0) { /* Need more calls */ hp = HAlloc(p,3+7+PROC_BIN_SIZE); mpb =db_make_mp_binary(p,(mpi.mp),&hp); if (mpi.all_objects) (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS; continuation = TUPLE6(hp, tb->common.id,make_small(slot_ix), make_small(chunk_size), mpb, rest, make_small(rest_size)); mpi.mp = NULL; /*otherwise the return macro will destroy it */ hp += 7; RET_TO_BIF(TUPLE2(hp, match_list, continuation),DB_ERROR_NONE); } else { /* All data is exhausted */ if (match_list != NIL) { /* No more data to search but still a result to return to the caller */ hp = HAlloc(p, 3); RET_TO_BIF(TUPLE2(hp, match_list, am_EOT),DB_ERROR_NONE); } else { /* Reached the end of the ttable with no data to return */ RET_TO_BIF(am_EOT, DB_ERROR_NONE); } } } RET_TO_BIF(match_list,DB_ERROR_NONE); trap: BUMP_ALL_REDS(p); if (mpi.all_objects) (mpi.mp)->flags |= BIN_FLAG_ALL_OBJECTS; hp = HAlloc(p,7+PROC_BIN_SIZE); mpb =db_make_mp_binary(p,(mpi.mp),&hp); continuation = TUPLE6(hp, tb->common.id, make_small(slot_ix), make_small(chunk_size), mpb, match_list, make_small(got)); mpi.mp = NULL; /*otherwise the return macro will destroy it */ RET_TO_BIF(bif_trap1(&ets_select_continue_exp, p, continuation), DB_ERROR_NONE); #undef RET_TO_BIF } static int db_select_count_hash(Process *p, DbTable *tbl, Eterm pattern, Eterm *ret) { DbTableHash *tb = &tbl->hash; struct mp_info mpi; Uint slot_ix = 0; HashDbTerm* current = NULL; unsigned current_list_pos = 0; Eterm *hp; int num_left = 1000; Uint got = 0; Eterm continuation; int errcode; Eterm egot; Eterm mpb; erts_smp_rwmtx_t* lck; #define RET_TO_BIF(Term,RetVal) do { \ if (mpi.mp != NULL) { \ erts_bin_free(mpi.mp); \ } \ if (mpi.lists != mpi.dlists) { \ erts_free(ERTS_ALC_T_DB_SEL_LIST, \ (void *) mpi.lists); \ } \ *ret = (Term); \ return RetVal; \ } while(0) if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } if (!mpi.something_can_match) { RET_TO_BIF(make_small(0), DB_ERROR_NONE); /* can't possibly match anything */ } if (!mpi.key_given) { /* Run this code if pattern is variable or GETKEY(pattern) */ /* is a variable */ slot_ix = 0; lck = RLOCK_HASH(tb,slot_ix); current = BUCKET(tb,slot_ix); } else { /* We have at least one */ slot_ix = mpi.lists[current_list_pos].ix; lck = RLOCK_HASH(tb, slot_ix); current = *(mpi.lists[current_list_pos].bucket); ASSERT(current == BUCKET(tb,slot_ix)); ++current_list_pos; } for(;;) { if (current != NULL) { if (current->hvalue != INVALID_HASH) { if (db_match_dbterm(&tb->common, p, mpi.mp, 0, ¤t->dbterm, NULL,0) == am_true) { ++got; } --num_left; } current = current->next; } else { /* next bucket */ if (mpi.key_given) { /* Key is bound */ RUNLOCK_HASH(lck); if (current_list_pos == mpi.num_lists) { goto done; } else { slot_ix = mpi.lists[current_list_pos].ix; lck = RLOCK_HASH(tb, slot_ix); current = *(mpi.lists[current_list_pos].bucket); ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix)); ++current_list_pos; } } else { if ((slot_ix=next_slot(tb,slot_ix,&lck)) == 0) { goto done; } if (num_left <= 0) { RUNLOCK_HASH(lck); goto trap; } current = BUCKET(tb,slot_ix); } } } done: BUMP_REDS(p, 1000 - num_left); RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE); trap: BUMP_ALL_REDS(p); if (IS_USMALL(0, got)) { hp = HAlloc(p, PROC_BIN_SIZE + 5); egot = make_small(got); } else { hp = HAlloc(p, BIG_UINT_HEAP_SIZE + PROC_BIN_SIZE + 5); egot = uint_to_big(got, hp); hp += BIG_UINT_HEAP_SIZE; } mpb = db_make_mp_binary(p,mpi.mp,&hp); continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix), mpb, egot); mpi.mp = NULL; /*otherwise the return macro will destroy it */ RET_TO_BIF(bif_trap1(&ets_select_count_continue_exp, p, continuation), DB_ERROR_NONE); #undef RET_TO_BIF } static int db_select_delete_hash(Process *p, DbTable *tbl, Eterm pattern, Eterm *ret) { DbTableHash *tb = &tbl->hash; struct mp_info mpi; Uint slot_ix = 0; HashDbTerm **current = NULL; unsigned current_list_pos = 0; Eterm *hp; int num_left = 1000; Uint got = 0; Eterm continuation; int errcode; Uint last_pseudo_delete = (Uint)-1; Eterm mpb; Eterm egot; #ifdef ERTS_SMP erts_aint_t fixated_by_me = tb->common.is_thread_safe ? 0 : 1; /* ToDo: something nicer */ #else erts_aint_t fixated_by_me = 0; #endif erts_smp_rwmtx_t* lck; #define RET_TO_BIF(Term,RetVal) do { \ if (mpi.mp != NULL) { \ erts_bin_free(mpi.mp); \ } \ if (mpi.lists != mpi.dlists) { \ erts_free(ERTS_ALC_T_DB_SEL_LIST, \ (void *) mpi.lists); \ } \ *ret = (Term); \ return RetVal; \ } while(0) if ((errcode = analyze_pattern(tb, pattern, &mpi)) != DB_ERROR_NONE) { RET_TO_BIF(NIL,errcode); } if (!mpi.something_can_match) { RET_TO_BIF(make_small(0), DB_ERROR_NONE); /* can't possibly match anything */ } if (!mpi.key_given) { /* Run this code if pattern is variable or GETKEY(pattern) */ /* is a variable */ lck = WLOCK_HASH(tb,slot_ix); current = &BUCKET(tb,slot_ix); } else { /* We have at least one */ slot_ix = mpi.lists[current_list_pos].ix; lck = WLOCK_HASH(tb, slot_ix); current = mpi.lists[current_list_pos++].bucket; ASSERT(*current == BUCKET(tb,slot_ix)); } for(;;) { if ((*current) == NULL) { if (mpi.key_given) { /* Key is bound */ WUNLOCK_HASH(lck); if (current_list_pos == mpi.num_lists) { goto done; } else { slot_ix = mpi.lists[current_list_pos].ix; lck = WLOCK_HASH(tb, slot_ix); current = mpi.lists[current_list_pos].bucket; ASSERT(mpi.lists[current_list_pos].bucket == &BUCKET(tb,slot_ix)); ++current_list_pos; } } else { if ((slot_ix=next_slot_w(tb,slot_ix,&lck)) == 0) { goto done; } if (num_left <= 0) { WUNLOCK_HASH(lck); goto trap; } current = &BUCKET(tb,slot_ix); } } else if ((*current)->hvalue == INVALID_HASH) { current = &((*current)->next); } else { int did_erase = 0; if (db_match_dbterm(&tb->common, p, mpi.mp, 0, &(*current)->dbterm, NULL, 0) == am_true) { if (NFIXED(tb) > fixated_by_me) { /* fixated by others? */ if (slot_ix != last_pseudo_delete) { add_fixed_deletion(tb, slot_ix); last_pseudo_delete = slot_ix; } (*current)->hvalue = INVALID_HASH; } else { HashDbTerm *del = *current; *current = (*current)->next; free_term(tb, del); did_erase = 1; } erts_smp_atomic_dec(&tb->common.nitems); ++got; } --num_left; if (!did_erase) { current = &((*current)->next); } } } done: BUMP_REDS(p, 1000 - num_left); if (got) { try_shrink(tb); } RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE); trap: BUMP_ALL_REDS(p); if (IS_USMALL(0, got)) { hp = HAlloc(p, PROC_BIN_SIZE + 5); egot = make_small(got); } else { hp = HAlloc(p, BIG_UINT_HEAP_SIZE + PROC_BIN_SIZE + 5); egot = uint_to_big(got, hp); hp += BIG_UINT_HEAP_SIZE; } mpb = db_make_mp_binary(p,mpi.mp,&hp); continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix), mpb, egot); mpi.mp = NULL; /*otherwise the return macro will destroy it */ RET_TO_BIF(bif_trap1(&ets_select_delete_continue_exp, p, continuation), DB_ERROR_NONE); #undef RET_TO_BIF } /* ** This is called when select_delete traps */ static int db_select_delete_continue_hash(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret) { DbTableHash *tb = &tbl->hash; Uint slot_ix; Uint last_pseudo_delete = (Uint)-1; HashDbTerm **current = NULL; Eterm *hp; int num_left = 1000; Uint got; Eterm *tptr; Binary *mp; Eterm egot; int fixated_by_me = ONLY_WRITER(p,tb) ? 0 : 1; /* ToDo: something nicer */ erts_smp_rwmtx_t* lck; #define RET_TO_BIF(Term,RetVal) do { \ *ret = (Term); \ return RetVal; \ } while(0) tptr = tuple_val(continuation); slot_ix = unsigned_val(tptr[2]); mp = ((ProcBin *) binary_val(tptr[3]))->val; if (is_big(tptr[4])) { got = big_to_uint32(tptr[4]); } else { got = unsigned_val(tptr[4]); } lck = WLOCK_HASH(tb,slot_ix); if (slot_ix >= NACTIVE(tb)) { WUNLOCK_HASH(lck); goto done; } current = &BUCKET(tb,slot_ix); for(;;) { if ((*current) == NULL) { if ((slot_ix=next_slot_w(tb,slot_ix,&lck)) == 0) { goto done; } if (num_left <= 0) { WUNLOCK_HASH(lck); goto trap; } current = &BUCKET(tb,slot_ix); } else if ((*current)->hvalue == INVALID_HASH) { current = &((*current)->next); } else { int did_erase = 0; if (db_match_dbterm(&tb->common, p, mp, 0, &(*current)->dbterm, NULL, 0) == am_true) { if (NFIXED(tb) > fixated_by_me) { /* fixated by others? */ if (slot_ix != last_pseudo_delete) { add_fixed_deletion(tb, slot_ix); last_pseudo_delete = slot_ix; } (*current)->hvalue = INVALID_HASH; } else { HashDbTerm *del = *current; *current = (*current)->next; free_term(tb, del); did_erase = 1; } erts_smp_atomic_dec(&tb->common.nitems); ++got; } --num_left; if (!did_erase) { current = &((*current)->next); } } } done: BUMP_REDS(p, 1000 - num_left); if (got) { try_shrink(tb); } RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE); trap: BUMP_ALL_REDS(p); if (IS_USMALL(0, got)) { hp = HAlloc(p, 5); egot = make_small(got); } else { hp = HAlloc(p, BIG_UINT_HEAP_SIZE + 5); egot = uint_to_big(got, hp); hp += BIG_UINT_HEAP_SIZE; } continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix), tptr[3], egot); RET_TO_BIF(bif_trap1(&ets_select_delete_continue_exp, p, continuation), DB_ERROR_NONE); #undef RET_TO_BIF } /* ** This is called when select_count traps */ static int db_select_count_continue_hash(Process *p, DbTable *tbl, Eterm continuation, Eterm *ret) { DbTableHash *tb = &tbl->hash; Uint slot_ix; HashDbTerm* current; Eterm *hp; int num_left = 1000; Uint got; Eterm *tptr; Binary *mp; Eterm egot; erts_smp_rwmtx_t* lck; #define RET_TO_BIF(Term,RetVal) do { \ *ret = (Term); \ return RetVal; \ } while(0) tptr = tuple_val(continuation); slot_ix = unsigned_val(tptr[2]); mp = ((ProcBin *) binary_val(tptr[3]))->val; if (is_big(tptr[4])) { got = big_to_uint32(tptr[4]); } else { got = unsigned_val(tptr[4]); } lck = RLOCK_HASH(tb, slot_ix); if (slot_ix >= NACTIVE(tb)) { /* Is this posible? */ RUNLOCK_HASH(lck); goto done; } current = BUCKET(tb,slot_ix); for(;;) { if (current != NULL) { if (current->hvalue == INVALID_HASH) { current = current->next; continue; } if (db_match_dbterm(&tb->common, p, mp, 0, ¤t->dbterm, NULL, 0) == am_true) { ++got; } --num_left; current = current->next; } else { /* next bucket */ if ((slot_ix = next_slot(tb,slot_ix,&lck)) == 0) { goto done; } if (num_left <= 0) { RUNLOCK_HASH(lck); goto trap; } current = BUCKET(tb,slot_ix); } } done: BUMP_REDS(p, 1000 - num_left); RET_TO_BIF(erts_make_integer(got,p),DB_ERROR_NONE); trap: BUMP_ALL_REDS(p); if (IS_USMALL(0, got)) { hp = HAlloc(p, 5); egot = make_small(got); } else { hp = HAlloc(p, BIG_UINT_HEAP_SIZE + 5); egot = uint_to_big(got, hp); hp += BIG_UINT_HEAP_SIZE; } continuation = TUPLE4(hp, tb->common.id, make_small(slot_ix), tptr[3], egot); RET_TO_BIF(bif_trap1(&ets_select_count_continue_exp, p, continuation), DB_ERROR_NONE); #undef RET_TO_BIF } /* ** Other interface routines (not directly coupled to one bif) */ void db_initialize_hash(void) { } int db_mark_all_deleted_hash(DbTable *tbl) { DbTableHash *tb = &tbl->hash; HashDbTerm* list; int i; ERTS_SMP_LC_ASSERT(IS_TAB_WLOCKED(tb)); for (i = 0; i < NACTIVE(tb); i++) { if ((list = BUCKET(tb,i)) != NULL) { add_fixed_deletion(tb, i); do { list->hvalue = INVALID_HASH; list = list->next; }while(list != NULL); } } erts_smp_atomic_set(&tb->common.nitems, 0); return DB_ERROR_NONE; } /* Display hash table contents (for dump) */ static void db_print_hash(int to, void *to_arg, int show, DbTable *tbl) { DbTableHash *tb = &tbl->hash; int i; erts_print(to, to_arg, "Buckets: %d \n", NACTIVE(tb)); if (show) { for (i = 0; i < NACTIVE(tb); i++) { HashDbTerm* list = BUCKET(tb,i); if (list == NULL) continue; erts_print(to, to_arg, "%d: [", i); while(list != 0) { if (list->hvalue == INVALID_HASH) erts_print(to, to_arg, "*"); if (tb->common.compress) { Eterm key = GETKEY(tb, list->dbterm.tpl); erts_print(to, to_arg, "key=%R", key, list->dbterm.tpl); } else { Eterm obj = make_tuple_rel(list->dbterm.tpl,list->dbterm.tpl); erts_print(to, to_arg, "%R", obj, list->dbterm.tpl); } if (list->next != 0) erts_print(to, to_arg, ","); list = list->next; } erts_print(to, to_arg, "]\n"); } } } /* release all memory occupied by a single table */ static int db_free_table_hash(DbTable *tbl) { while (!db_free_table_continue_hash(tbl)) ; return 0; } static int db_free_table_continue_hash(DbTable *tbl) { DbTableHash *tb = &tbl->hash; int done; FixedDeletion* fixdel = (FixedDeletion*) erts_smp_atomic_read(&tb->fixdel); ERTS_SMP_LC_ASSERT(IS_TAB_WLOCKED(tb)); done = 0; while (fixdel != NULL) { FixedDeletion *fx = fixdel; fixdel = fx->next; erts_db_free(ERTS_ALC_T_DB_FIX_DEL, (DbTable *) tb, (void *) fx, sizeof(FixedDeletion)); ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion)); if (++done >= 2*DELETE_RECORD_LIMIT) { erts_smp_atomic_set(&tb->fixdel, (erts_aint_t)fixdel); return 0; /* Not done */ } } erts_smp_atomic_set(&tb->fixdel, (erts_aint_t)NULL); done /= 2; while(tb->nslots != 0) { free_seg(tb, 1); /* * If we have done enough work, get out here. */ if (++done >= (DELETE_RECORD_LIMIT / CHAIN_LEN / SEGSZ)) { return 0; /* Not done */ } } #ifdef ERTS_SMP if (tb->locks != NULL) { int i; for (i=0; ilocks, sizeof(DbTableHashFineLocks)); tb->locks = NULL; } #endif ASSERT(erts_smp_atomic_read(&tb->common.memory_size) == sizeof(DbTable)); return 1; /* Done */ } /* ** Utility routines. (static) */ /* ** For the select functions, analyzes the pattern and determines which ** slots should be searched. Also compiles the match program */ static int analyze_pattern(DbTableHash *tb, Eterm pattern, struct mp_info *mpi) { Eterm *ptpl; Eterm lst, tpl, ttpl; Eterm *matches,*guards, *bodies; Eterm sbuff[30]; Eterm *buff = sbuff; Eterm key = NIL; HashValue hval = NIL; int num_heads = 0; int i; mpi->lists = mpi->dlists; mpi->num_lists = 0; mpi->key_given = 1; mpi->something_can_match = 0; mpi->all_objects = 1; mpi->mp = NULL; for (lst = pattern; is_list(lst); lst = CDR(list_val(lst))) ++num_heads; if (lst != NIL) {/* proper list... */ return DB_ERROR_BADPARAM; } if (num_heads > 10) { buff = erts_alloc(ERTS_ALC_T_DB_TMP, sizeof(Eterm) * num_heads * 3); mpi->lists = erts_alloc(ERTS_ALC_T_DB_SEL_LIST, sizeof(*(mpi->lists)) * num_heads); } matches = buff; guards = buff + num_heads; bodies = buff + (num_heads * 2); i = 0; for(lst = pattern; is_list(lst); lst = CDR(list_val(lst))) { Eterm body; ttpl = CAR(list_val(lst)); if (!is_tuple(ttpl)) { if (buff != sbuff) { erts_free(ERTS_ALC_T_DB_TMP, buff); } return DB_ERROR_BADPARAM; } ptpl = tuple_val(ttpl); if (ptpl[0] != make_arityval(3U)) { if (buff != sbuff) { erts_free(ERTS_ALC_T_DB_TMP, buff); } return DB_ERROR_BADPARAM; } matches[i] = tpl = ptpl[1]; guards[i] = ptpl[2]; bodies[i] = body = ptpl[3]; if (!is_list(body) || CDR(list_val(body)) != NIL || CAR(list_val(body)) != am_DollarUnderscore) { mpi->all_objects = 0; } ++i; if (!(mpi->key_given)) { continue; } if (tpl == am_Underscore || db_is_variable(tpl) != -1) { (mpi->key_given) = 0; (mpi->something_can_match) = 1; } else { key = db_getkey(tb->common.keypos, tpl); if (is_value(key)) { if (!db_has_variable(key)) { /* Bound key */ int ix, search_slot; HashDbTerm** bp; erts_smp_rwmtx_t* lck; hval = MAKE_HASH(key); lck = RLOCK_HASH(tb,hval); ix = hash_to_ix(tb, hval); bp = &BUCKET(tb,ix); if (lck == NULL) { search_slot = search_list(tb,key,hval,*bp) != NULL; } else { /* No point to verify if key exist now as there may be concurrent inserters/deleters anyway */ RUNLOCK_HASH(lck); search_slot = 1; } if (search_slot) { int j; for (j=0; ; ++j) { if (j == mpi->num_lists) { mpi->lists[mpi->num_lists].bucket = bp; mpi->lists[mpi->num_lists].ix = ix; ++mpi->num_lists; break; } if (mpi->lists[j].bucket == bp) { ASSERT(mpi->lists[j].ix == ix); break; } ASSERT(mpi->lists[j].ix != ix); } mpi->something_can_match = 1; } } else { mpi->key_given = 0; mpi->something_can_match = 1; } } } } /* * It would be nice not to compile the match_spec if nothing could match, * but then the select calls would not fail like they should on bad * match specs that happen to specify non existent keys etc. */ if ((mpi->mp = db_match_compile(matches, guards, bodies, num_heads, DCOMP_TABLE, NULL)) == NULL) { if (buff != sbuff) { erts_free(ERTS_ALC_T_DB_TMP, buff); } return DB_ERROR_BADPARAM; } if (buff != sbuff) { erts_free(ERTS_ALC_T_DB_TMP, buff); } return DB_ERROR_NONE; } static struct ext_segment* alloc_ext_seg(DbTableHash* tb, unsigned seg_ix, struct segment** old_segtab) { int nsegs; struct ext_segment* eseg; switch (seg_ix) { case 0: nsegs = NSEG_1; break; case 1: nsegs = NSEG_2; break; default: nsegs = seg_ix + NSEG_INC; break; } eseg = (struct ext_segment*) erts_db_alloc_fnf(ERTS_ALC_T_DB_SEG, (DbTable *) tb, SIZEOF_EXTSEG(nsegs)); ASSERT(eseg != NULL); sys_memset(&eseg->s, 0, sizeof(struct segment)); IF_DEBUG(eseg->s.is_ext_segment = 1); eseg->prev_segtab = old_segtab; eseg->nsegs = nsegs; if (old_segtab) { ASSERT(nsegs > tb->nsegs); sys_memcpy(eseg->segtab, old_segtab, tb->nsegs*sizeof(struct segment*)); } #ifdef DEBUG sys_memset(&eseg->segtab[seg_ix], 0, (nsegs-seg_ix)*sizeof(struct segment*)); #endif eseg->segtab[seg_ix] = &eseg->s; return eseg; } /* Extend table with one new segment */ static int alloc_seg(DbTableHash *tb) { int seg_ix = tb->nslots >> SEGSZ_EXP; if (seg_ix+1 == tb->nsegs) { /* New segtab needed (extended segment) */ struct segment** segtab = SEGTAB(tb); struct ext_segment* seg = alloc_ext_seg(tb, seg_ix, segtab); if (seg == NULL) return 0; segtab[seg_ix] = &seg->s; /* We don't use the new segtab until next call (see "shrink race") */ } else { /* Just a new plain segment */ struct segment** segtab; if (seg_ix == tb->nsegs) { /* Time to start use segtab from last call */ struct ext_segment* eseg; eseg = (struct ext_segment*) SEGTAB(tb)[seg_ix-1]; MY_ASSERT(eseg!=NULL && eseg->s.is_ext_segment); erts_smp_atomic_set_relb(&tb->segtab, (erts_aint_t) eseg->segtab); tb->nsegs = eseg->nsegs; } ASSERT(seg_ix < tb->nsegs); segtab = SEGTAB(tb); ASSERT(segtab[seg_ix] == NULL); segtab[seg_ix] = (struct segment*) erts_db_alloc_fnf(ERTS_ALC_T_DB_SEG, (DbTable *) tb, sizeof(struct segment)); if (segtab[seg_ix] == NULL) return 0; sys_memset(segtab[seg_ix], 0, sizeof(struct segment)); } tb->nslots += SEGSZ; return 1; } /* Shrink table by freeing the top segment ** free_records: 1=free any records in segment, 0=assume segment is empty */ static int free_seg(DbTableHash *tb, int free_records) { int seg_ix = (tb->nslots >> SEGSZ_EXP) - 1; int bytes; struct segment** segtab = SEGTAB(tb); struct ext_segment* top = (struct ext_segment*) segtab[seg_ix]; int nrecords = 0; ASSERT(top != NULL); #ifndef DEBUG if (free_records) #endif { int i; for (i=0; is.buckets[i]; while(p != 0) { HashDbTerm* nxt = p->next; ASSERT(free_records); /* segment not empty as assumed? */ free_term(tb, p); p = nxt; ++nrecords; } } } /* The "shrink race": * We must avoid deallocating an extended segment while its segtab may * still be used by other threads. * The trick is to stop use a segtab one call earlier. That is, stop use * a segtab when the segment above it is deallocated. When the segtab is * later deallocated, it has not been used for a very long time. * It is even theoretically safe as we have by then rehashed the entire * segment, seizing *all* locks, so there cannot exist any retarded threads * still hanging in BUCKET macro with an old segtab pointer. * For this to work, we must of course allocate a new segtab one call * earlier in alloc_seg() as well. And this is also the reason why * the minimum size of the first segtab is 2 and not 1 (NSEG_1). */ if (seg_ix == tb->nsegs-1 || seg_ix==0) { /* Dealloc extended segment */ MY_ASSERT(top->s.is_ext_segment); ASSERT(segtab != top->segtab || seg_ix==0); bytes = SIZEOF_EXTSEG(top->nsegs); } else { /* Dealloc plain segment */ struct ext_segment* newtop = (struct ext_segment*) segtab[seg_ix-1]; MY_ASSERT(!top->s.is_ext_segment); if (segtab == newtop->segtab) { /* New top segment is extended */ MY_ASSERT(newtop->s.is_ext_segment); if (newtop->prev_segtab != NULL) { /* Time to use a smaller segtab */ erts_smp_atomic_set_relb(&tb->segtab, (erts_aint_t)newtop->prev_segtab); tb->nsegs = seg_ix; ASSERT(tb->nsegs == EXTSEG(SEGTAB(tb))->nsegs); } else { ASSERT(NSEG_1 > 2 && seg_ix==1); } } bytes = sizeof(struct segment); } erts_db_free(ERTS_ALC_T_DB_SEG, (DbTable *)tb, (void*)top, bytes); #ifdef DEBUG if (seg_ix > 0) { if (seg_ix < tb->nsegs) SEGTAB(tb)[seg_ix] = NULL; } else { erts_smp_atomic_set_relb(&tb->segtab, (erts_aint_t)NULL); } #endif tb->nslots -= SEGSZ; ASSERT(tb->nslots >= 0); return nrecords; } /* ** Copy terms from ptr1 until ptr2 ** works for ptr1 == ptr2 == 0 => [] ** or ptr2 == 0 */ static Eterm build_term_list(Process* p, HashDbTerm* ptr1, HashDbTerm* ptr2, DbTableHash* tb) { int sz = 0; HashDbTerm* ptr; Eterm list = NIL; Eterm copy; Eterm *hp, *hend; ptr = ptr1; while(ptr != ptr2) { if (ptr->hvalue != INVALID_HASH) sz += ptr->dbterm.size + 2; ptr = ptr->next; } hp = HAlloc(p, sz); hend = hp + sz; ptr = ptr1; while(ptr != ptr2) { if (ptr->hvalue != INVALID_HASH) { copy = db_copy_object_from_ets(&tb->common, &ptr->dbterm, &hp, &MSO(p)); list = CONS(hp, copy, list); hp += 2; } ptr = ptr->next; } HRelease(p,hend,hp); return list; } /* Grow table with one new bucket. ** Allocate new segment if needed. */ static void grow(DbTableHash* tb, int nactive) { HashDbTerm** pnext; HashDbTerm** to_pnext; HashDbTerm* p; erts_smp_rwmtx_t* lck; int from_ix; int szm; if (erts_smp_atomic_xchg(&tb->is_resizing, 1)) { return; /* already in progress */ } if (NACTIVE(tb) != nactive) { goto abort; /* already done (race) */ } /* Ensure that the slot nactive exists */ if (nactive == tb->nslots) { /* Time to get a new segment */ ASSERT((nactive & SEGSZ_MASK) == 0); if (!alloc_seg(tb)) goto abort; } ASSERT(nactive < tb->nslots); szm = erts_smp_atomic_read(&tb->szm); if (nactive <= szm) { from_ix = nactive & (szm >> 1); } else { ASSERT(nactive == szm+1); from_ix = 0; szm = (szm<<1) | 1; } lck = WLOCK_HASH(tb, from_ix); /* Now a final double check (with the from_ix lock held) * that we did not get raced by a table fixer. */ if (IS_FIXED(tb)) { WUNLOCK_HASH(lck); goto abort; } erts_smp_atomic_inc(&tb->nactive); if (from_ix == 0) { erts_smp_atomic_set_relb(&tb->szm, szm); } erts_smp_atomic_set_relb(&tb->is_resizing, 0); /* Finally, let's split the bucket. We try to do it in a smart way to keep link order and avoid unnecessary updates of next-pointers */ pnext = &BUCKET(tb, from_ix); p = *pnext; to_pnext = &BUCKET(tb, nactive); while (p != NULL) { if (p->hvalue == INVALID_HASH) { /* rare but possible with fine locking */ *pnext = p->next; free_term(tb, p); p = *pnext; } else { int ix = p->hvalue & szm; if (ix != from_ix) { ASSERT(ix == (from_ix ^ ((szm+1)>>1))); *to_pnext = p; /* Swap "from" and "to": */ from_ix = ix; to_pnext = pnext; } pnext = &p->next; p = *pnext; } } *to_pnext = NULL; WUNLOCK_HASH(lck); return; abort: erts_smp_atomic_set_relb(&tb->is_resizing, 0); } /* Shrink table by joining top bucket. ** Remove top segment if it gets empty. */ static void shrink(DbTableHash* tb, int nactive) { if (erts_smp_atomic_xchg(&tb->is_resizing, 1)) { return; /* already in progress */ } if (NACTIVE(tb) == nactive) { erts_smp_rwmtx_t* lck; int src_ix = nactive - 1; int low_szm = erts_smp_atomic_read(&tb->szm) >> 1; int dst_ix = src_ix & low_szm; ASSERT(dst_ix < src_ix); ASSERT(nactive > SEGSZ); lck = WLOCK_HASH(tb, dst_ix); /* Double check for racing table fixers */ if (!IS_FIXED(tb)) { HashDbTerm** src_bp = &BUCKET(tb, src_ix); HashDbTerm** dst_bp = &BUCKET(tb, dst_ix); HashDbTerm** bp = src_bp; /* Q: Why join lists by appending "dst" at the end of "src"? A: Must step through "src" anyway to purge pseudo deleted. */ while(*bp != NULL) { if ((*bp)->hvalue == INVALID_HASH) { HashDbTerm* deleted = *bp; *bp = deleted->next; free_term(tb, deleted); } else { bp = &(*bp)->next; } } *bp = *dst_bp; *dst_bp = *src_bp; *src_bp = NULL; erts_smp_atomic_set(&tb->nactive, src_ix); if (dst_ix == 0) { erts_smp_atomic_set_relb(&tb->szm, low_szm); } WUNLOCK_HASH(lck); if (tb->nslots - src_ix >= SEGSZ) { free_seg(tb, 0); } } else { WUNLOCK_HASH(lck); } } /*else already done */ erts_smp_atomic_set_relb(&tb->is_resizing, 0); } /* Search a list of tuples for a matching key */ static HashDbTerm* search_list(DbTableHash* tb, Eterm key, HashValue hval, HashDbTerm *list) { while (list != 0) { if (has_live_key(tb,list,key,hval)) return list; list = list->next; } return 0; } /* This function is called by the next AND the select BIF */ /* It return the next live object in a table, NULL if no more */ /* In-bucket: RLOCKED */ /* Out-bucket: RLOCKED unless NULL */ static HashDbTerm* next(DbTableHash *tb, Uint *iptr, erts_smp_rwmtx_t** lck_ptr, HashDbTerm *list) { int i; ERTS_SMP_LC_ASSERT(IS_HASH_RLOCKED(tb,*iptr)); for (list = list->next; list != NULL; list = list->next) { if (list->hvalue != INVALID_HASH) return list; } i = *iptr; while ((i=next_slot(tb, i, lck_ptr)) != 0) { list = BUCKET(tb,i); while (list != NULL) { if (list->hvalue != INVALID_HASH) { *iptr = i; return list; } list = list->next; } } /* *iptr = ??? */ return NULL; } static int db_lookup_dbterm_hash(DbTable *tbl, Eterm key, DbUpdateHandle* handle) { DbTableHash *tb = &tbl->hash; HashDbTerm* b; HashDbTerm** prevp; int ix; HashValue hval; erts_smp_rwmtx_t* lck; hval = MAKE_HASH(key); lck = WLOCK_HASH(tb,hval); ix = hash_to_ix(tb, hval); prevp = &BUCKET(tb, ix); b = *prevp; while (b != 0) { if (has_live_key(tb,b,key,hval)) { handle->tb = tbl; handle->bp = (void**) prevp; handle->dbterm = &b->dbterm; handle->mustResize = 0; handle->new_size = b->dbterm.size; #if HALFWORD_HEAP handle->abs_vec = NULL; #endif handle->lck = lck; /* KEEP hval WLOCKED, db_finalize_dbterm_hash will WUNLOCK */ return 1; } prevp = &b->next; b = *prevp; } WUNLOCK_HASH(lck); return 0; } /* Must be called after call to db_lookup_dbterm */ static void db_finalize_dbterm_hash(DbUpdateHandle* handle) { DbTable* tbl = handle->tb; HashDbTerm* oldp = (HashDbTerm*) *(handle->bp); erts_smp_rwmtx_t* lck = (erts_smp_rwmtx_t*) handle->lck; ERTS_SMP_LC_ASSERT(IS_HASH_WLOCKED(&tbl->hash,lck)); /* locked by db_lookup_dbterm_hash */ ASSERT((&oldp->dbterm == handle->dbterm) == !(tbl->common.compress && handle->mustResize)); if (handle->mustResize) { db_finalize_resize(handle, offsetof(HashDbTerm,dbterm)); WUNLOCK_HASH(lck); free_term(&tbl->hash, oldp); } else { WUNLOCK_HASH(lck); } #ifdef DEBUG handle->dbterm = 0; #endif return; } static int db_delete_all_objects_hash(Process* p, DbTable* tbl) { if (IS_FIXED(tbl)) { db_mark_all_deleted_hash(tbl); } else { db_free_table_hash(tbl); db_create_hash(p, tbl); erts_smp_atomic_set(&tbl->hash.common.nitems, 0); } return 0; } void db_foreach_offheap_hash(DbTable *tbl, void (*func)(ErlOffHeap *, void *), void * arg) { DbTableHash *tb = &tbl->hash; HashDbTerm* list; int i; int nactive = NACTIVE(tb); for (i = 0; i < nactive; i++) { list = BUCKET(tb,i); while(list != 0) { ErlOffHeap tmp_offheap; tmp_offheap.first = list->dbterm.first_oh; tmp_offheap.overhead = 0; (*func)(&tmp_offheap, arg); list->dbterm.first_oh = tmp_offheap.first; list = list->next; } } } void db_calc_stats_hash(DbTableHash* tb, DbHashStats* stats) { HashDbTerm* b; erts_smp_rwmtx_t* lck; int sum = 0; int sq_sum = 0; int ix; int len; stats->min_chain_len = INT_MAX; stats->max_chain_len = 0; ix = 0; lck = RLOCK_HASH(tb,ix); do { len = 0; for (b = BUCKET(tb,ix); b!=NULL; b=b->next) { len++; } sum += len; sq_sum += len*len; if (len < stats->min_chain_len) stats->min_chain_len = len; if (len > stats->max_chain_len) stats->max_chain_len = len; ix = next_slot(tb,ix,&lck); }while (ix); stats->avg_chain_len = (float)sum / NACTIVE(tb); stats->std_dev_chain_len = sqrt((sq_sum - stats->avg_chain_len*sum) / NACTIVE(tb)); /* Expected standard deviation from a good uniform hash function, ie binomial distribution (not taking the linear hashing into acount) */ stats->std_dev_expected = sqrt(stats->avg_chain_len * (1 - 1.0/NACTIVE(tb))); } #ifdef HARDDEBUG void db_check_table_hash(DbTable *tbl) { DbTableHash *tb = &tbl->hash; HashDbTerm* list; int j; for (j = 0; j < tb->nactive; j++) { if ((list = BUCKET(tb,j)) != 0) { while (list != 0) { if (!is_tuple(make_tuple(list->dbterm.tpl))) { erl_exit(1, "Bad term in slot %d of ets table", j); } list = list->next; } } } } #endif