diff options
Diffstat (limited to 'erts/emulator/beam/erl_db.c')
-rw-r--r-- | erts/emulator/beam/erl_db.c | 559 |
1 files changed, 475 insertions, 84 deletions
diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c index ad11d72d5f..8c4c76e187 100644 --- a/erts/emulator/beam/erl_db.c +++ b/erts/emulator/beam/erl_db.c @@ -114,7 +114,12 @@ static struct { #define MARK_SLOT_DEAD(i) (meta_main_tab[(i)].u.next_free |= 2) #define GET_ANY_SLOT_TAB(i) ((DbTable*)(meta_main_tab[(i)].u.next_free & ~(1|2))) /* dead or alive */ +static void +send_ets_transfer_message(Process *c_p, Process *proc, + ErtsProcLocks *locks, + DbTable *tb, Eterm heir_data); static void schedule_free_dbtable(DbTable* tb); +static void delete_sched_table(Process *c_p, DbTable *tb); static void table_dec_refc(DbTable *tb, erts_aint_t min_val) { @@ -164,16 +169,32 @@ tid2tab(Eterm tid) return tb; } +static ERTS_INLINE int +is_table_alive(DbTable *tb) +{ + erts_smp_atomic_t *tbref; + DbTable *rtb; + + tbref = erts_smp_binary_to_magic_indirection(tb->common.btid); + rtb = (DbTable *) erts_smp_atomic_read_nob(tbref); + + ASSERT(!rtb || rtb == tb); + + return !!rtb; +} + static ERTS_INLINE void -tid_clear(DbTable *tb) +tid_clear(Process *c_p, DbTable *tb) { DbTable *rtb; Binary *btid = tb->common.btid; erts_smp_atomic_t *tbref = erts_smp_binary_to_magic_indirection(btid); rtb = (DbTable *) erts_smp_atomic_xchg_nob(tbref, (erts_aint_t) NULL); ASSERT(!rtb || tb == rtb); - if (rtb) + if (rtb) { table_dec_refc(tb, 1); + delete_sched_table(c_p, tb); + } } static ERTS_INLINE Eterm @@ -348,6 +369,99 @@ static void schedule_free_dbtable(DbTable* tb) sizeof(DbTable)); } +static ERTS_INLINE void +save_sched_table(Process *c_p, DbTable *tb) +{ + ErtsSchedulerData *esdp = erts_proc_sched_data(c_p); + DbTable *first; + + ASSERT(esdp); + esdp->ets_tables.count++; + erts_smp_refc_inc(&tb->common.refc, 1); + + first = esdp->ets_tables.clist; + if (!first) { + tb->common.all.next = tb->common.all.prev = tb; + esdp->ets_tables.clist = tb; + } + else { + tb->common.all.prev = first->common.all.prev; + tb->common.all.next = first; + tb->common.all.prev->common.all.next = tb; + first->common.all.prev = tb; + } +} + +static ERTS_INLINE void +remove_sched_table(ErtsSchedulerData *esdp, DbTable *tb) +{ + ErtsEtsAllYieldData *eaydp; + ASSERT(esdp); + ASSERT(erts_get_ref_numbers_thr_id(ERTS_MAGIC_BIN_REFN(tb->common.btid)) + == (Uint32) esdp->no); + + ASSERT(esdp->ets_tables.count > 0); + esdp->ets_tables.count--; + + eaydp = ERTS_SCHED_AUX_YIELD_DATA(esdp, ets_all); + if (eaydp->ongoing) { + /* ets:all() op process list from last to first... */ + if (eaydp->tab == tb) { + if (eaydp->tab == esdp->ets_tables.clist) + eaydp->tab = NULL; + else + eaydp->tab = tb->common.all.prev; + } + } + + if (tb->common.all.next == tb) { + ASSERT(tb->common.all.prev == tb); + ASSERT(esdp->ets_tables.clist == tb); + esdp->ets_tables.clist = NULL; + } + else { +#ifdef DEBUG + DbTable *tmp = esdp->ets_tables.clist; + do { + if (tmp == tb) break; + tmp = tmp->common.all.next; + } while (tmp != esdp->ets_tables.clist); + ASSERT(tmp == tb); +#endif + tb->common.all.prev->common.all.next = tb->common.all.next; + tb->common.all.next->common.all.prev = tb->common.all.prev; + + if (esdp->ets_tables.clist == tb) + esdp->ets_tables.clist = tb->common.all.next; + + } + + table_dec_refc(tb, 0); +} + +static void +scheduled_remove_sched_table(void *vtb) +{ + remove_sched_table(erts_get_scheduler_data(), (DbTable *) vtb); +} + +static void +delete_sched_table(Process *c_p, DbTable *tb) +{ + ErtsSchedulerData *esdp = erts_proc_sched_data(c_p); + Uint32 sid; + + ASSERT(esdp); + + ASSERT(tb->common.btid); + sid = erts_get_ref_numbers_thr_id(ERTS_MAGIC_BIN_REFN(tb->common.btid)); + ASSERT(1 <= sid && sid <= erts_no_schedulers); + if (sid == (Uint32) esdp->no) + remove_sched_table(esdp, tb); + else + erts_schedule_misc_aux_work((int) sid, scheduled_remove_sched_table, tb); +} + static ERTS_INLINE void db_init_lock(DbTable* tb, int use_frequent_read_lock, char *rwname, char* fixname) { @@ -1322,6 +1436,7 @@ BIF_RETTYPE ets_rename_2(BIF_ALIST_2) { DbTable* tb; Eterm ret; + Eterm old_name; erts_smp_rwmtx_t *lck1, *lck2; #ifdef HARDDEBUG @@ -1338,12 +1453,10 @@ BIF_RETTYPE ets_rename_2(BIF_ALIST_2) (void) meta_name_tab_bucket(BIF_ARG_2, &lck1); - if (is_small(BIF_ARG_1)) { - Uint slot = unsigned_val(BIF_ARG_1) & meta_main_tab_slot_mask; - lck2 = get_meta_main_tab_lock(slot); - } - else if (is_atom(BIF_ARG_1)) { - (void) meta_name_tab_bucket(BIF_ARG_1, &lck2); + if (is_atom(BIF_ARG_1)) { + old_name = BIF_ARG_1; + named_tab: + (void) meta_name_tab_bucket(old_name, &lck2); if (lck1 == lck2) lck2 = NULL; else if (lck1 > lck2) { @@ -1353,7 +1466,18 @@ BIF_RETTYPE ets_rename_2(BIF_ALIST_2) } } else { - BIF_ERROR(BIF_P, BADARG); + Uint slot; + tb = tid2tab(BIF_ARG_1); + if (!tb) + BIF_ERROR(BIF_P, BADARG); + else { + if (is_atom(tb->common.id)) { + old_name = tb->common.id; + goto named_tab; + } + slot = unsigned_val(tb->common.id) & meta_main_tab_slot_mask; + lck2 = get_meta_main_tab_lock(slot); + } } erts_smp_rwmtx_rwlock(lck1); @@ -1378,7 +1502,10 @@ BIF_RETTYPE ets_rename_2(BIF_ALIST_2) tb->common.id = tb->common.the_name = BIF_ARG_2; done: - ret = tb->common.id; + if (is_atom(tb->common.id)) + ret = tb->common.id; + else + ret = BIF_ARG_1; db_unlock(tb, LCK_WRITE); erts_smp_rwmtx_rwunlock(lck1); if (lck2) @@ -1622,6 +1749,8 @@ BIF_RETTYPE ets_new_2(BIF_ALIST_2) if (!is_named) ret = make_tid(BIF_P, tb); + save_sched_table(BIF_P, tb); + mmtl = get_meta_main_tab_lock(slot); erts_smp_rwmtx_rwlock(mmtl); meta_main_tab[slot].u.tb = tb; @@ -1634,7 +1763,7 @@ BIF_RETTYPE ets_new_2(BIF_ALIST_2) free_slot(slot); erts_smp_rwmtx_rwunlock(mmtl); - tid_clear(tb); + tid_clear(BIF_P, tb); db_lock(tb,LCK_WRITE); free_heir_data(tb); @@ -1825,7 +1954,7 @@ BIF_RETTYPE ets_delete_1(BIF_ALIST_1) UnUseTmpHeap(3,BIF_P); } - tid_clear(tb); + tid_clear(BIF_P, tb); mmtl = get_meta_main_tab_lock(tb->common.slot); #ifdef ERTS_SMP @@ -1913,12 +2042,8 @@ BIF_RETTYPE ets_give_away_3(BIF_ALIST_3) db_meta_unlock(meta_pid_to_tab, LCK_WRITE_REC); db_unlock(tb,LCK_WRITE); - erts_send_message(BIF_P, to_proc, &to_locks, - TUPLE4(buf, am_ETS_TRANSFER, - tb->common.id, - from_pid, - BIF_ARG_3), - 0); + send_ets_transfer_message(BIF_P, to_proc, &to_locks, + tb, BIF_ARG_3); erts_smp_proc_unlock(to_proc, to_locks); UnUseTmpHeap(5,BIF_P); BIF_RET(am_true); @@ -2188,46 +2313,254 @@ BIF_RETTYPE ets_select_delete_2(BIF_ALIST_2) return result; } -/* -** Return a list of tables on this node -*/ -BIF_RETTYPE ets_all_0(BIF_ALIST_0) +/* + * ets:all/0 + * + * ets:all() calls ets:internal_request_all/0 which + * requests information about all tables from + * each scheduler thread. Each scheduler replies + * to the calling process with information about + * existing tables created on that specific scheduler. + */ + +struct ErtsEtsAllReq_ { + erts_smp_atomic32_t refc; + Process *proc; + ErtsOIRefStorage ref; + ErtsEtsAllReqList list[1]; /* one per scheduler */ +}; + +#define ERTS_ETS_ALL_REQ_SIZE \ + (sizeof(ErtsEtsAllReq) \ + + (sizeof(ErtsEtsAllReqList) \ + * (erts_no_schedulers - 1))) + +typedef struct { + ErtsEtsAllReq *ongoing; + ErlHeapFragment *hfrag; + DbTable *tab; + ErtsEtsAllReq *queue; +} ErtsEtsAllData; + +/* Tables handled before yielding */ +#define ERTS_ETS_ALL_TB_YCNT 200 +/* + * Min yield count required before starting + * an operation that will require yield. + */ +#define ERTS_ETS_ALL_TB_YCNT_START 10 + +#ifdef DEBUG +/* Test yielding... */ +#undef ERTS_ETS_ALL_TB_YCNT +#undef ERTS_ETS_ALL_TB_YCNT_START +#define ERTS_ETS_ALL_TB_YCNT 10 +#define ERTS_ETS_ALL_TB_YCNT_START 1 +#endif + +static int +ets_all_reply(ErtsSchedulerData *esdp, ErtsEtsAllReq **reqpp, + ErlHeapFragment **hfragpp, DbTable **tablepp, + int *yield_count_p) { - DbTable* tb; - Eterm previous; - int i; - Eterm* hp; - Eterm* hendp; - int t_tabs_cnt; - int t_top; + ErtsEtsAllReq *reqp = *reqpp; + ErlHeapFragment *hfragp = *hfragpp; + int ycount = *yield_count_p; + DbTable *tb, *first; + Uint sz; + Eterm list, msg, ref, *hp; + ErlOffHeap *ohp; + ErtsMessage *mp; - erts_smp_spin_lock(&meta_main_tab_main_lock); - t_tabs_cnt = meta_main_tab_cnt; - t_top = meta_main_tab_top; - erts_smp_spin_unlock(&meta_main_tab_main_lock); + /* + * - save_sched_table() inserts at end of circular list. + * + * - This function scans from the end so we know that + * the amount of tables to scan wont grow even if we + * yield. + * + * - remove_sched_table() updates the table we yielded + * on if it removes it. + */ + + if (hfragp) { + /* Restart of a yielded operation... */ + ASSERT(hfragp->used_size < hfragp->alloc_size); + ohp = &hfragp->off_heap; + hp = &hfragp->mem[hfragp->used_size]; + list = *hp; + hfragp->used_size = hfragp->alloc_size; + first = esdp->ets_tables.clist; + tb = *tablepp; + } + else { + /* A new operation... */ + ASSERT(!*tablepp); - hp = HAlloc(BIF_P, 2*t_tabs_cnt); - hendp = hp + 2*t_tabs_cnt; + /* Max heap size needed... */ + sz = esdp->ets_tables.count; + sz *= ERTS_MAGIC_REF_THING_SIZE + 2; + sz += 3 + ERTS_REF_THING_SIZE; + hfragp = new_message_buffer(sz); - previous = NIL; - for(i = 0; i < t_top; i++) { - erts_smp_rwmtx_t *mmtl = get_meta_main_tab_lock(i); - erts_smp_rwmtx_rlock(mmtl); - if (IS_SLOT_ALIVE(i)) { - if (hp == hendp) { - /* Racing table creator, grab some more heap space */ - t_tabs_cnt = 10; - hp = HAlloc(BIF_P, 2*t_tabs_cnt); - hendp = hp + 2*t_tabs_cnt; - } - tb = meta_main_tab[i].u.tb; - previous = CONS(hp, tb->common.id, previous); - hp += 2; - } - erts_smp_rwmtx_runlock(mmtl); + hp = &hfragp->mem[0]; + ohp = &hfragp->off_heap; + list = NIL; + first = esdp->ets_tables.clist; + tb = first ? first->common.all.prev : NULL; + } + + if (tb) { + while (1) { + if (is_table_alive(tb)) { + Eterm tid; + if (is_atom(tb->common.id)) + tid = tb->common.id; + else + tid = erts_mk_magic_ref(&hp, ohp, tb->common.btid); + list = CONS(hp, tid, list); + hp += 2; + } + + if (tb == first) + break; + + tb = tb->common.all.prev; + + if (--ycount <= 0) { + sz = hp - &hfragp->mem[0]; + ASSERT(hfragp->alloc_size > sz + 1); + *hp = list; + hfragp->used_size = sz; + *hfragpp = hfragp; + *reqpp = reqp; + *tablepp = tb; + *yield_count_p = 0; + return 1; /* Yield! */ + } + } + } + + ref = erts_oiref_storage_make_ref(&reqp->ref, &hp); + msg = TUPLE2(hp, ref, list); + hp += 3; + + sz = hp - &hfragp->mem[0]; + ASSERT(sz <= hfragp->alloc_size); + + hfragp = erts_resize_message_buffer(hfragp, sz, &msg, 1); + + mp = erts_alloc_message(0, NULL); + mp->data.heap_frag = hfragp; + + erts_queue_message(reqp->proc, 0, mp, msg, am_system); + + erts_proc_dec_refc(reqp->proc); + + if (erts_smp_atomic32_dec_read_nob(&reqp->refc) == 0) + erts_free(ERTS_ALC_T_ETS_ALL_REQ, reqp); + + *reqpp = NULL; + *hfragpp = NULL; + *tablepp = NULL; + *yield_count_p = ycount; + + return 0; +} + +int +erts_handle_yielded_ets_all_request(ErtsSchedulerData *esdp, + ErtsEtsAllYieldData *eaydp) +{ + int ix = (int) esdp->no - 1; + int yc = ERTS_ETS_ALL_TB_YCNT; + + while (1) { + if (!eaydp->ongoing) { + ErtsEtsAllReq *ongoing; + + if (!eaydp->queue) + return 0; /* All work completed! */ + + if (yc < ERTS_ETS_ALL_TB_YCNT_START && yc > esdp->ets_tables.count) + return 1; /* Yield! */ + + eaydp->ongoing = ongoing = eaydp->queue; + if (ongoing->list[ix].next == ongoing) + eaydp->queue = NULL; + else { + ongoing->list[ix].next->list[ix].prev = ongoing->list[ix].prev; + ongoing->list[ix].prev->list[ix].next = ongoing->list[ix].next; + eaydp->queue = ongoing->list[ix].next; + } + ASSERT(!eaydp->hfrag); + ASSERT(!eaydp->tab); + } + + if (ets_all_reply(esdp, &eaydp->ongoing, &eaydp->hfrag, &eaydp->tab, &yc)) + return 1; /* Yield! */ + } +} + +static void +handle_ets_all_request(void *vreq) +{ + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + ErtsEtsAllYieldData *eayp = ERTS_SCHED_AUX_YIELD_DATA(esdp, ets_all); + ErtsEtsAllReq *req = (ErtsEtsAllReq *) vreq; + + if (!eayp->ongoing && !eayp->queue) { + /* No ets:all() operations ongoing... */ + ErlHeapFragment *hf = NULL; + DbTable *tb = NULL; + int yc = ERTS_ETS_ALL_TB_YCNT; + if (ets_all_reply(esdp, &req, &hf, &tb, &yc)) { + /* Yielded... */ + ASSERT(hf); + eayp->ongoing = req; + eayp->hfrag = hf; + eayp->tab = tb; + erts_notify_new_aux_yield_work(esdp); + } + } + else { + /* Ongoing ets:all() operations; queue up this request... */ + int ix = (int) esdp->no - 1; + if (!eayp->queue) { + req->list[ix].next = req; + req->list[ix].prev = req; + eayp->queue = req; + } + else { + req->list[ix].next = eayp->queue; + req->list[ix].prev = eayp->queue->list[ix].prev; + eayp->queue->list[ix].prev = req; + req->list[ix].prev->list[ix].next = req; + } } - HRelease(BIF_P, hendp, hp); - BIF_RET(previous); +} + +BIF_RETTYPE ets_internal_request_all_0(BIF_ALIST_0) +{ + Eterm ref = erts_make_ref(BIF_P); + ErtsEtsAllReq *req = erts_alloc(ERTS_ALC_T_ETS_ALL_REQ, + ERTS_ETS_ALL_REQ_SIZE); + erts_smp_atomic32_init_nob(&req->refc, + (erts_aint32_t) erts_no_schedulers); + erts_oiref_storage_save(&req->ref, ref); + req->proc = BIF_P; + erts_proc_add_refc(BIF_P, (Sint) erts_no_schedulers); + +#ifdef ERTS_SMP + if (erts_no_schedulers > 1) + erts_schedule_multi_misc_aux_work(1, + erts_no_schedulers, + handle_ets_all_request, + (void *) req); +#endif + + handle_ets_all_request((void *) req); + BIF_RET(ref); } @@ -2788,7 +3121,7 @@ BIF_RETTYPE ets_info_1(BIF_ALIST_1) */ if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ)) == NULL) { - if (is_atom(BIF_ARG_1) || is_small(BIF_ARG_1)) { + if (is_atom(BIF_ARG_1) || is_ref(BIF_ARG_1)) { BIF_RET(am_undefined); } BIF_ERROR(BIF_P, BADARG); @@ -2850,7 +3183,7 @@ BIF_RETTYPE ets_info_2(BIF_ALIST_2) Eterm ret = THE_NON_VALUE; if ((tb = db_get_table(BIF_P, BIF_ARG_1, DB_INFO, LCK_READ)) == NULL) { - if (is_atom(BIF_ARG_1) || is_small(BIF_ARG_1)) { + if (is_atom(BIF_ARG_1) || is_ref(BIF_ARG_1)) { BIF_RET(am_undefined); } BIF_ERROR(BIF_P, BADARG); @@ -3140,6 +3473,19 @@ void init_db(ErtsDbSpinCount db_spin_count) ms_delete_all = CONS(hp, ms_delete_all,NIL); } +void +erts_ets_sched_spec_data_init(ErtsSchedulerData *esdp) +{ + ErtsEtsAllYieldData *eaydp = ERTS_SCHED_AUX_YIELD_DATA(esdp, ets_all); + eaydp->ongoing = NULL; + eaydp->hfrag = NULL; + eaydp->tab = NULL; + eaydp->queue = NULL; + esdp->ets_tables.clist = NULL; + esdp->ets_tables.count = 0; +} + + #define ARRAY_CHUNK 100 typedef enum { @@ -3288,17 +3634,46 @@ retry: ASSERT(arityval(*tpv) == 1); heir_data = tpv[1]; } - erts_send_message(p, to_proc, &to_locks, - TUPLE4(buf, - am_ETS_TRANSFER, - tb->common.id, - p->common.id, - heir_data), - 0); + send_ets_transfer_message(p, to_proc, &to_locks, tb, heir_data); erts_smp_proc_unlock(to_proc, to_locks); return !0; } +static void +send_ets_transfer_message(Process *c_p, Process *proc, + ErtsProcLocks *locks, + DbTable *tb, Eterm heir_data) +{ + Uint hsz, hd_sz; + ErtsMessage *mp; + Eterm *hp; + ErlOffHeap *ohp; + Eterm tid, hd_copy, msg, sender; + + hsz = 5; + if (is_not_atom(tb->common.id)) + hsz += ERTS_MAGIC_REF_THING_SIZE; + if (is_immed(heir_data)) + hd_sz = 0; + else { + hd_sz = size_object(heir_data); + hsz += hd_sz; + } + + mp = erts_alloc_message_heap(proc, locks, hsz, &hp, &ohp); + if (is_atom(tb->common.id)) + tid = tb->common.id; + else + tid = erts_mk_magic_ref(&hp, ohp, tb->common.btid); + if (!hd_sz) + hd_copy = heir_data; + else + hd_copy = copy_struct(heir_data, hd_sz, &hp, ohp); + sender = c_p->common.id; + msg = TUPLE4(hp, am_ETS_TRANSFER, tid, sender, hd_copy); + erts_queue_message(proc, *locks, mp, msg, sender); +} + /* * erts_db_process_exiting() is called when a process terminates. * It returns 0 when completely done, and !0 when it wants to @@ -3384,7 +3759,7 @@ erts_db_process_exiting(Process *c_p, ErtsProcLocks c_p_locks) #endif first_call = (tb->common.status & DB_DELETE) == 0; if (first_call) { - tid_clear(tb); + tid_clear(c_p, tb); /* Clear all access bits. */ tb->common.status &= ~(DB_PROTECTED | DB_PUBLIC @@ -3975,21 +4350,30 @@ static void print_table(fmtfn_t to, void *to_arg, int show, DbTable* tb) erts_print(to, to_arg, "Read Concurrency: %T\n", table_info(NULL, tb, am_read_concurrency)); } +typedef struct { + fmtfn_t to; + void *to_arg; + int show; +} ErtsPrintDbInfo; + +static void +db_info_print(DbTable *tb, void *vpdbip) +{ + ErtsPrintDbInfo *pdbip = (ErtsPrintDbInfo *) vpdbip; + erts_print(pdbip->to, pdbip->to_arg, "=ets:%T\n", tb->common.owner); + erts_print(pdbip->to, pdbip->to_arg, "Slot: %bpu\n", (Uint) tb); + print_table(pdbip->to, pdbip->to_arg, pdbip->show, tb); +} + void db_info(fmtfn_t to, void *to_arg, int show) /* Called by break handler */ { - int i; - for (i=0; i < db_max_tabs; i++) - if (IS_SLOT_ALIVE(i)) { - erts_print(to, to_arg, "=ets:%T\n", meta_main_tab[i].u.tb->common.owner); - erts_print(to, to_arg, "Slot: %d\n", i); - print_table(to, to_arg, show, meta_main_tab[i].u.tb); - } -#ifdef DEBUG - erts_print(to, to_arg, "=internal_ets: Process to table index\n"); - print_table(to, to_arg, show, meta_pid_to_tab); - erts_print(to, to_arg, "=internal_ets: Process to fixation index\n"); - print_table(to, to_arg, show, meta_pid_to_fixed_tab); -#endif + ErtsPrintDbInfo pdbi; + + pdbi.to = to; + pdbi.to_arg = to_arg; + pdbi.show = show; + + erts_db_foreach_table(db_info_print, &pdbi); } Uint @@ -4004,15 +4388,22 @@ erts_get_ets_misc_mem_size(void) void erts_db_foreach_table(void (*func)(DbTable *, void *), void *arg) { - int i, j; - j = 0; - for(i = 0; (i < db_max_tabs && j < meta_main_tab_cnt); i++) { - if (IS_SLOT_ALIVE(i)) { - j++; - (*func)(meta_main_tab[i].u.tb, arg); - } + int ix; + + ASSERT(erts_smp_thr_progress_is_blocking()); + + for (ix = 0; ix < erts_no_schedulers; ix++) { + ErtsSchedulerData *esdp = ERTS_SCHEDULER_IX(ix); + DbTable *first = esdp->ets_tables.clist; + if (first) { + DbTable *tb = first; + do { + if (is_table_alive(tb)) + (*func)(tb, arg); + tb = tb->common.all.next; + } while (tb != first); + } } - ASSERT(j == meta_main_tab_cnt); } /* SMP Note: May only be used when system is locked */ |