From 933790021e5fa95e4e6242e3f2eb2fcf64666a57 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Sun, 9 Oct 2011 01:03:06 +0200 Subject: Use generic lock-free queue for misc aux work --- erts/emulator/beam/erl_alloc.c | 8 +- erts/emulator/beam/erl_alloc.types | 5 +- erts/emulator/beam/erl_db.c | 3 +- erts/emulator/beam/erl_lock_check.c | 2 - erts/emulator/beam/erl_process.c | 207 ++++++++++++++++++++++-------------- erts/emulator/beam/erl_process.h | 28 +++-- 6 files changed, 151 insertions(+), 102 deletions(-) (limited to 'erts') diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c index cce4b4adf0..33d6cf5f2f 100644 --- a/erts/emulator/beam/erl_alloc.c +++ b/erts/emulator/beam/erl_alloc.c @@ -3075,10 +3075,10 @@ erts_request_alloc_info(struct process *c_p, #ifdef ERTS_SMP if (erts_no_schedulers > 1) - erts_smp_schedule_misc_aux_work(1, - erts_no_schedulers, - reply_alloc_info, - (void *) air); + erts_schedule_multi_misc_aux_work(1, + erts_no_schedulers, + reply_alloc_info, + (void *) air); #endif reply_alloc_info((void *) air); diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 4efad0197b..9f0cb681c0 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -262,15 +262,18 @@ type EXT_TERM_DATA SHORT_LIVED PROCESSES external_term_data type ZLIB STANDARD SYSTEM zlib type CPU_GRPS_MAP LONG_LIVED SYSTEM cpu_groups_map type AUX_WORK_TMO LONG_LIVED SYSTEM aux_work_timeouts +type MISC_AUX_WORK_Q LONG_LIVED SYSTEM misc_aux_work_q +if threads_no_smp # Need thread safe allocs, but std_alloc and fix_alloc are not; # use driver_alloc which is... type THR_Q_EL DRIVER SYSTEM thr_q_element type THR_Q_EL_SL DRIVER SYSTEM sl_thr_q_element +type MISC_AUX_WORK DRIVER SYSTEM misc_aux_work +else type THR_Q_EL STANDARD SYSTEM thr_q_element type THR_Q_EL_SL FIXED_SIZE SYSTEM sl_thr_q_element +type MISC_AUX_WORK SHORT_LIVED SYSTEM misc_aux_work +endif type THR_Q STANDARD SYSTEM thr_queue type THR_Q_SL SHORT_LIVED SYSTEM short_lived_thr_queue @@ -290,8 +293,6 @@ type XPORTS_LIST SHORT_LIVED SYSTEM extra_port_list type PROC_LCK_WTR LONG_LIVED SYSTEM proc_lock_waiter type PROC_LCK_QS LONG_LIVED SYSTEM proc_lock_queues type RUNQ_BLNS LONG_LIVED SYSTEM run_queue_balancing -type MISC_AUX_WORK_Q LONG_LIVED SYSTEM misc_aux_work_q -type MISC_AUX_WORK SHORT_LIVED SYSTEM misc_aux_work type THR_PRGR_IDATA LONG_LIVED SYSTEM thr_prgr_internal_data type THR_PRGR_DATA LONG_LIVED SYSTEM thr_prgr_data type T_THR_PRGR_DATA SHORT_LIVED SYSTEM temp_thr_prgr_data diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c index 0327850cb9..259ebd838e 100644 --- a/erts/emulator/beam/erl_db.c +++ b/erts/emulator/beam/erl_db.c @@ -277,8 +277,7 @@ static void schedule_free_dbtable(DbTable* tb) ASSERT(scheds >= 1); ASSERT(erts_refc_read(&tb->common.ref, 0) == 0); erts_refc_init(&tb->common.ref, scheds); - ERTS_THR_MEMORY_BARRIER; - erts_smp_schedule_misc_aux_work(0, scheds, chk_free_dbtable, tb); + erts_schedule_multi_misc_aux_work(0, scheds, chk_free_dbtable, tb); #else free_dbtable(tb); #endif diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c index 02d1407a2d..633be0ef58 100644 --- a/erts/emulator/beam/erl_lock_check.c +++ b/erts/emulator/beam/erl_lock_check.c @@ -176,8 +176,6 @@ static erts_lc_lock_order_t erts_lock_order[] = { { "async_id", NULL }, { "pix_lock", "address" }, { "run_queues_lists", NULL }, - { "misc_aux_work_queue", "index" }, - { "misc_aux_work_pre_alloc_lock", "address" }, { "sched_stat", NULL }, { "run_queue_sleep_list", "address" }, #endif diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index f14a35bafd..4292522fba 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -40,6 +40,7 @@ #include "beam_bp.h" #include "erl_cpu_topology.h" #include "erl_thr_progress.h" +#include "erl_thr_queue.h" #define ERTS_RUNQ_CHECK_BALANCE_REDS_PER_SCHED (2000*CONTEXT_REDS) #define ERTS_RUNQ_CALL_CHECK_BALANCE_REDS \ @@ -125,7 +126,6 @@ ErtsLcPSDLocks erts_psd_required_locks[ERTS_PSD_SIZE]; #endif #ifdef ERTS_SMP - int erts_disable_proc_not_running_opt; static ErtsAuxWorkData *aux_thread_aux_work_data; @@ -361,6 +361,9 @@ dbg_chk_aux_work_val(erts_aint32_t value) #ifdef ERTS_SSI_AUX_WORK_MISC valid |= ERTS_SSI_AUX_WORK_MISC; #endif +#ifdef ERTS_SSI_AUX_WORK_MISC_THR_PRGR + valid |= ERTS_SSI_AUX_WORK_MISC_THR_PRGR; +#endif #ifdef ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM valid |= ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM; @@ -707,37 +710,37 @@ unset_aux_work_flags(ErtsSchedulerSleepInfo *ssi, erts_aint32_t flgs) return erts_atomic32_read_band_nob(&ssi->aux_work, ~flgs); } -#ifdef ERTS_SMP - typedef struct erts_misc_aux_work_t_ erts_misc_aux_work_t; struct erts_misc_aux_work_t_ { - erts_misc_aux_work_t *next; void (*func)(void *); void *arg; }; -typedef struct { - erts_smp_mtx_t mtx; - erts_misc_aux_work_t *first; - erts_misc_aux_work_t *last; -} erts_misc_aux_work_q_t; +ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(misc_aux_work, + erts_misc_aux_work_t, + 200, + ERTS_ALC_T_MISC_AUX_WORK) typedef union { - erts_misc_aux_work_q_t data; - char align[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_misc_aux_work_q_t))]; + ErtsThrQ_t q; + char align[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQ_t))]; } erts_algnd_misc_aux_work_q_t; static erts_algnd_misc_aux_work_q_t *misc_aux_work_queues; -ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(misc_aux_work, - erts_misc_aux_work_t, - 200, - ERTS_ALC_T_MISC_AUX_WORK) +static void +notify_aux_work(void *vssi) +{ + set_aux_work_flags_wakeup_nob((ErtsSchedulerSleepInfo *) vssi, + ERTS_SSI_AUX_WORK_MISC); +} static void init_misc_aux_work(void) { int ix; + ErtsThrQInit_t qinit = ERTS_THR_Q_INIT_DEFAULT; + qinit.notify = notify_aux_work; init_misc_aux_work_alloc(); @@ -746,93 +749,127 @@ init_misc_aux_work(void) sizeof(erts_algnd_misc_aux_work_q_t) * (erts_no_schedulers+1)); - for (ix = 0; ix <= erts_no_schedulers; ix++) { - erts_smp_mtx_init_x(&misc_aux_work_queues[ix].data.mtx, - "misc_aux_work_queue", - make_small(ix)); - misc_aux_work_queues[ix].data.first = NULL; - misc_aux_work_queues[ix].data.last = NULL; +#ifdef ERTS_SMP + ix = 0; /* aux_thread + schedulers */ +#else + ix = 1; /* scheduler only */ +#endif + + for (; ix <= erts_no_schedulers; ix++) { + qinit.arg = (void *) ERTS_SCHED_SLEEP_INFO_IX(ix-1); + erts_thr_q_initialize(&misc_aux_work_queues[ix].q, &qinit); + } +} + +static erts_aint32_t +misc_aux_work_clean(ErtsThrQ_t *q, + ErtsAuxWorkData *awdp, + erts_aint32_t aux_work) +{ + switch (erts_thr_q_clean(q)) { + case ERTS_THR_Q_DIRTY: + set_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC); + return aux_work | ERTS_SSI_AUX_WORK_MISC; +#ifdef ERTS_SMP + case ERTS_THR_Q_NEED_THR_PRGR: + set_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC_THR_PRGR); + erts_thr_progress_wakeup(awdp->esdp, + erts_thr_q_need_thr_progress(q)); +#endif + case ERTS_THR_Q_CLEAN: + break; } + return aux_work; } static erts_aint32_t handle_misc_aux_work(ErtsAuxWorkData *awdp, erts_aint32_t aux_work) { - int ix = (int) awdp->sched_id; - erts_misc_aux_work_t *mawp; + ErtsThrQ_t *q = &misc_aux_work_queues[awdp->sched_id].q; unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC); - - erts_smp_mtx_lock(&misc_aux_work_queues[ix].data.mtx); - mawp = misc_aux_work_queues[ix].data.first; - misc_aux_work_queues[ix].data.first = NULL; - misc_aux_work_queues[ix].data.last = NULL; - erts_smp_mtx_unlock(&misc_aux_work_queues[ix].data.mtx); - - while (mawp) { - erts_misc_aux_work_t *free_mawp; + while (1) { + erts_misc_aux_work_t *mawp = erts_thr_q_dequeue(q); + if (!mawp) + break; mawp->func(mawp->arg); - free_mawp = mawp; - mawp = mawp->next; - misc_aux_work_free(free_mawp); + misc_aux_work_free(mawp); } - return aux_work & ~ERTS_SSI_AUX_WORK_MISC; + return misc_aux_work_clean(q, awdp, aux_work & ~ERTS_SSI_AUX_WORK_MISC); } -static void -smp_schedule_misc_aux_work(int ix, - void (*func)(void *), - void *arg) +#ifdef ERTS_SMP + +static erts_aint32_t +handle_misc_aux_work_thr_prgr(ErtsAuxWorkData *awdp, + erts_aint32_t aux_work) { - erts_aint32_t aux_work; + if (!erts_thr_progress_has_reached(awdp->misc.thr_prgr)) + return aux_work; + + unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC_THR_PRGR); + + return misc_aux_work_clean(&misc_aux_work_queues[awdp->sched_id].q, + awdp, + aux_work & ~ERTS_SSI_AUX_WORK_MISC_THR_PRGR); +} + +#endif + +static ERTS_INLINE void +schedule_misc_aux_work(int sched_id, + void (*func)(void *), + void *arg) +{ + ErtsThrQ_t *q; erts_misc_aux_work_t *mawp; - ErtsSchedulerSleepInfo *ssi; - mawp = misc_aux_work_alloc(); +#ifdef ERTS_SMP + ASSERT(0 <= sched_id && sched_id <= erts_no_schedulers); +#else + ASSERT(sched_id == 1); +#endif + q = &misc_aux_work_queues[sched_id].q; + mawp = misc_aux_work_alloc(); mawp->func = func; mawp->arg = arg; - mawp->next = NULL; - - erts_smp_mtx_lock(&misc_aux_work_queues[ix].data.mtx); - if (!misc_aux_work_queues[ix].data.last) - misc_aux_work_queues[ix].data.first = mawp; - else - misc_aux_work_queues[ix].data.last->next = mawp; - misc_aux_work_queues[ix].data.last = mawp; - erts_smp_mtx_unlock(&misc_aux_work_queues[ix].data.mtx); + erts_thr_q_enqueue(q, mawp); +} - set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(ix-1), - ERTS_SSI_AUX_WORK_MISC); +void +erts_schedule_misc_aux_work(int sched_id, + void (*func)(void *), + void *arg) +{ + schedule_misc_aux_work(sched_id, func, arg); } void -erts_smp_schedule_misc_aux_work(int ignore_self, - int max_sched, - void (*func)(void *), - void *arg) +erts_schedule_multi_misc_aux_work(int ignore_self, + int max_sched, + void (*func)(void *), + void *arg) { - int ix, ignore_ix = -1; + int id, self = 0; if (ignore_self) { ErtsSchedulerData *esdp = erts_get_scheduler_data(); if (esdp) - ignore_ix = (int) esdp->no; + self = (int) esdp->no; } ASSERT(0 < max_sched && max_sched <= erts_no_schedulers); - for (ix = 1; ix <= max_sched; ix++) { - if (ix == ignore_ix) + for (id = 1; id <= max_sched; id++) { + if (id == self) continue; - smp_schedule_misc_aux_work(ix, func, arg); - } + schedule_misc_aux_work(id, func, arg); + } } -#endif - static erts_aint32_t handle_fix_alloc(ErtsAuxWorkData *awdp, erts_aint32_t aux_work) { @@ -964,14 +1001,14 @@ prep_setup_completed_dealloc(void *vproc) erts_aint32_t count = (erts_aint32_t) (erts_no_schedulers+1); if (erts_atomic32_dec_read_mb(&completed_dealloc_count) == count) { /* scheduler threads */ - erts_smp_schedule_misc_aux_work(0, - erts_no_schedulers, - setup_completed_dealloc, - vproc); + erts_schedule_multi_misc_aux_work(0, + erts_no_schedulers, + setup_completed_dealloc, + vproc); /* aux_thread */ - smp_schedule_misc_aux_work(0, - setup_completed_dealloc, - vproc); + erts_schedule_misc_aux_work(0, + setup_completed_dealloc, + vproc); } } @@ -992,14 +1029,14 @@ erts_debug_wait_deallocations(Process *c_p) erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL); erts_smp_proc_inc_refc(c_p); /* scheduler threads */ - erts_smp_schedule_misc_aux_work(0, - erts_no_schedulers, - prep_setup_completed_dealloc, - (void *) c_p); + erts_schedule_multi_misc_aux_work(0, + erts_no_schedulers, + prep_setup_completed_dealloc, + (void *) c_p); /* aux_thread */ - smp_schedule_misc_aux_work(0, - prep_setup_completed_dealloc, - (void *) c_p); + erts_schedule_misc_aux_work(0, + prep_setup_completed_dealloc, + (void *) c_p); return 1; } return 0; @@ -1062,11 +1099,15 @@ handle_aux_work(ErtsAuxWorkData *awdp, erts_aint32_t aux_work) ERTS_DBG_CHK_AUX_WORK_VAL(aux_work); } #ifdef ERTS_SMP + if (aux_work & ERTS_SSI_AUX_WORK_MISC_THR_PRGR) { + aux_work = handle_misc_aux_work_thr_prgr(awdp, aux_work); + ERTS_DBG_CHK_AUX_WORK_VAL(aux_work); + } +#endif if (aux_work & ERTS_SSI_AUX_WORK_MISC) { aux_work = handle_misc_aux_work(awdp, aux_work); ERTS_DBG_CHK_AUX_WORK_VAL(aux_work); } -#endif #ifdef ERTS_SMP_SCHEDULERS_NEED_TO_CHECK_CHILDREN if (aux_work & ERTS_SSI_AUX_WORK_CHECK_CHILDREN) { aux_work = handle_check_children(awdp, aux_work); @@ -3191,6 +3232,7 @@ init_aux_work_data(ErtsAuxWorkData *awdp, ErtsSchedulerData *esdp) awdp->esdp = esdp; awdp->ssi = esdp ? esdp->ssi : NULL; #ifdef ERTS_SMP + awdp->misc.thr_prgr = ERTS_THR_PRGR_VAL_WAITING; awdp->dd.thr_prgr = ERTS_THR_PRGR_VAL_WAITING; awdp->dd.completed_callback = NULL; awdp->dd.completed_arg = NULL; @@ -3382,9 +3424,10 @@ erts_init_scheduling(int mrq, int no_schedulers, int no_schedulers_online) init_aux_work_data(&esdp->aux_work_data, esdp); } -#ifdef ERTS_SMP init_misc_aux_work(); +#ifdef ERTS_SMP + erts_atomic32_init_nob(&completed_dealloc_count, 0); /* debug only */ aux_thread_aux_work_data = diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 337990a0db..895f5ae3c0 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -251,13 +251,16 @@ typedef enum { #define ERTS_SSI_AUX_WORK_SET_TMO (((erts_aint32_t) 1) << 0) #define ERTS_SSI_AUX_WORK_CHECK_CHILDREN (((erts_aint32_t) 1) << 1) #define ERTS_SSI_AUX_WORK_MISC (((erts_aint32_t) 1) << 2) -#define ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM (((erts_aint32_t) 1) << 3) -#define ERTS_SSI_AUX_WORK_FIX_ALLOC_DEALLOC (((erts_aint32_t) 1) << 4) #ifdef ERTS_SMP -#define ERTS_SSI_AUX_WORK_DD (((erts_aint32_t) 1) << 5) -#define ERTS_SSI_AUX_WORK_DD_THR_PRGR (((erts_aint32_t) 1) << 6) +#define ERTS_SSI_AUX_WORK_MISC_THR_PRGR (((erts_aint32_t) 1) << 3) #endif -#define ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK (((erts_aint32_t) 1) << 7) +#define ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM (((erts_aint32_t) 1) << 4) +#define ERTS_SSI_AUX_WORK_FIX_ALLOC_DEALLOC (((erts_aint32_t) 1) << 5) +#ifdef ERTS_SMP +#define ERTS_SSI_AUX_WORK_DD (((erts_aint32_t) 1) << 6) +#define ERTS_SSI_AUX_WORK_DD_THR_PRGR (((erts_aint32_t) 1) << 7) +#endif +#define ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK (((erts_aint32_t) 1) << 8) #if !HAVE_ERTS_MSEG # undef ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK @@ -404,6 +407,9 @@ typedef struct { ErtsSchedulerSleepInfo *ssi; struct { int ix; +#ifdef ERTS_SMP + ErtsThrPrgrVal thr_prgr; +#endif } misc; #ifdef ERTS_SMP struct { @@ -1094,12 +1100,14 @@ Eterm erts_multi_scheduling_blockers(Process *); void erts_start_schedulers(void); void erts_alloc_notify_delayed_dealloc(int); void erts_smp_notify_check_children_needed(void); -void -erts_smp_schedule_misc_aux_work(int ignore_self, - int max_sched, - void (*func)(void *), - void *arg); #endif +void erts_schedule_misc_aux_work(int sched_id, + void (*func)(void *), + void *arg); +void erts_schedule_multi_misc_aux_work(int ignore_self, + int max_sched, + void (*func)(void *), + void *arg); erts_aint32_t erts_set_aux_work_timeout(int, erts_aint32_t, int); void erts_sched_notify_check_cpu_bind(void); Uint erts_active_schedulers(void); -- cgit v1.2.3