aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_process.c
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2011-11-13 21:42:30 +0100
committerRickard Green <[email protected]>2011-11-13 21:42:30 +0100
commitca58731b5df58aa2a8b42c583d1ba7bb929e72b2 (patch)
tree6bfe9e29277a767a240536897646016a971ead81 /erts/emulator/beam/erl_process.c
parent73ee2e00fe0389d0362e89a74d1909510da9e0fd (diff)
parentdcc7ecbf6af5420af2d5dbd0e97fc7a2e0e894a6 (diff)
downloadotp-ca58731b5df58aa2a8b42c583d1ba7bb929e72b2.tar.gz
otp-ca58731b5df58aa2a8b42c583d1ba7bb929e72b2.tar.bz2
otp-ca58731b5df58aa2a8b42c583d1ba7bb929e72b2.zip
Merge branch 'rickard/generic-thr-queue/OTP-9632'
* rickard/generic-thr-queue/OTP-9632: Use generic lock-free queue for async threads Use generic lock-free queue for misc aux work Implement generic lock-free queue
Diffstat (limited to 'erts/emulator/beam/erl_process.c')
-rw-r--r--erts/emulator/beam/erl_process.c293
1 files changed, 215 insertions, 78 deletions
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 6adeef2e69..a3c1c9577b 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -40,6 +40,8 @@
#include "beam_bp.h"
#include "erl_cpu_topology.h"
#include "erl_thr_progress.h"
+#include "erl_thr_queue.h"
+#include "erl_async.h"
#define ERTS_RUNQ_CHECK_BALANCE_REDS_PER_SCHED (2000*CONTEXT_REDS)
#define ERTS_RUNQ_CALL_CHECK_BALANCE_REDS \
@@ -125,7 +127,6 @@ ErtsLcPSDLocks erts_psd_required_locks[ERTS_PSD_SIZE];
#endif
#ifdef ERTS_SMP
-
int erts_disable_proc_not_running_opt;
static ErtsAuxWorkData *aux_thread_aux_work_data;
@@ -361,6 +362,15 @@ dbg_chk_aux_work_val(erts_aint32_t value)
#ifdef ERTS_SSI_AUX_WORK_MISC
valid |= ERTS_SSI_AUX_WORK_MISC;
#endif
+#ifdef ERTS_SSI_AUX_WORK_MISC_THR_PRGR
+ valid |= ERTS_SSI_AUX_WORK_MISC_THR_PRGR;
+#endif
+#ifdef ERTS_SSI_AUX_WORK_ASYNC_READY
+ valid |= ERTS_SSI_AUX_WORK_ASYNC_READY;
+#endif
+#ifdef ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN
+ valid |= ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN;
+#endif
#ifdef ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM
valid |= ERTS_SSI_AUX_WORK_FIX_ALLOC_LOWER_LIM;
@@ -707,37 +717,37 @@ unset_aux_work_flags(ErtsSchedulerSleepInfo *ssi, erts_aint32_t flgs)
return erts_atomic32_read_band_nob(&ssi->aux_work, ~flgs);
}
-#ifdef ERTS_SMP
-
typedef struct erts_misc_aux_work_t_ erts_misc_aux_work_t;
struct erts_misc_aux_work_t_ {
- erts_misc_aux_work_t *next;
void (*func)(void *);
void *arg;
};
-typedef struct {
- erts_smp_mtx_t mtx;
- erts_misc_aux_work_t *first;
- erts_misc_aux_work_t *last;
-} erts_misc_aux_work_q_t;
+ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(misc_aux_work,
+ erts_misc_aux_work_t,
+ 200,
+ ERTS_ALC_T_MISC_AUX_WORK)
typedef union {
- erts_misc_aux_work_q_t data;
- char align[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_misc_aux_work_q_t))];
+ ErtsThrQ_t q;
+ char align[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQ_t))];
} erts_algnd_misc_aux_work_q_t;
static erts_algnd_misc_aux_work_q_t *misc_aux_work_queues;
-ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(misc_aux_work,
- erts_misc_aux_work_t,
- 200,
- ERTS_ALC_T_MISC_AUX_WORK)
+static void
+notify_aux_work(void *vssi)
+{
+ set_aux_work_flags_wakeup_nob((ErtsSchedulerSleepInfo *) vssi,
+ ERTS_SSI_AUX_WORK_MISC);
+}
static void
init_misc_aux_work(void)
{
int ix;
+ ErtsThrQInit_t qinit = ERTS_THR_Q_INIT_DEFAULT;
+ qinit.notify = notify_aux_work;
init_misc_aux_work_alloc();
@@ -746,88 +756,189 @@ init_misc_aux_work(void)
sizeof(erts_algnd_misc_aux_work_q_t)
* (erts_no_schedulers+1));
- for (ix = 0; ix <= erts_no_schedulers; ix++) {
- erts_smp_mtx_init_x(&misc_aux_work_queues[ix].data.mtx,
- "misc_aux_work_queue",
- make_small(ix));
- misc_aux_work_queues[ix].data.first = NULL;
- misc_aux_work_queues[ix].data.last = NULL;
+#ifdef ERTS_SMP
+ ix = 0; /* aux_thread + schedulers */
+#else
+ ix = 1; /* scheduler only */
+#endif
+
+ for (; ix <= erts_no_schedulers; ix++) {
+ qinit.arg = (void *) ERTS_SCHED_SLEEP_INFO_IX(ix-1);
+ erts_thr_q_initialize(&misc_aux_work_queues[ix].q, &qinit);
+ }
+}
+
+static erts_aint32_t
+misc_aux_work_clean(ErtsThrQ_t *q,
+ ErtsAuxWorkData *awdp,
+ erts_aint32_t aux_work)
+{
+ switch (erts_thr_q_clean(q)) {
+ case ERTS_THR_Q_DIRTY:
+ set_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC);
+ return aux_work | ERTS_SSI_AUX_WORK_MISC;
+#ifdef ERTS_SMP
+ case ERTS_THR_Q_NEED_THR_PRGR:
+ set_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC_THR_PRGR);
+ erts_thr_progress_wakeup(awdp->esdp,
+ erts_thr_q_need_thr_progress(q));
+#endif
+ case ERTS_THR_Q_CLEAN:
+ break;
}
+ return aux_work;
}
static erts_aint32_t
handle_misc_aux_work(ErtsAuxWorkData *awdp,
erts_aint32_t aux_work)
{
- int ix = (int) awdp->sched_id;
- erts_misc_aux_work_t *mawp;
+ ErtsThrQ_t *q = &misc_aux_work_queues[awdp->sched_id].q;
unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC);
-
- erts_smp_mtx_lock(&misc_aux_work_queues[ix].data.mtx);
- mawp = misc_aux_work_queues[ix].data.first;
- misc_aux_work_queues[ix].data.first = NULL;
- misc_aux_work_queues[ix].data.last = NULL;
- erts_smp_mtx_unlock(&misc_aux_work_queues[ix].data.mtx);
-
- while (mawp) {
- erts_misc_aux_work_t *free_mawp;
+ while (1) {
+ erts_misc_aux_work_t *mawp = erts_thr_q_dequeue(q);
+ if (!mawp)
+ break;
mawp->func(mawp->arg);
- free_mawp = mawp;
- mawp = mawp->next;
- misc_aux_work_free(free_mawp);
+ misc_aux_work_free(mawp);
}
- return aux_work & ~ERTS_SSI_AUX_WORK_MISC;
+ return misc_aux_work_clean(q, awdp, aux_work & ~ERTS_SSI_AUX_WORK_MISC);
}
-static void
-smp_schedule_misc_aux_work(int ix,
- void (*func)(void *),
- void *arg)
+#ifdef ERTS_SMP
+
+static erts_aint32_t
+handle_misc_aux_work_thr_prgr(ErtsAuxWorkData *awdp,
+ erts_aint32_t aux_work)
{
- erts_aint32_t aux_work;
+ if (!erts_thr_progress_has_reached(awdp->misc.thr_prgr))
+ return aux_work;
+
+ unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MISC_THR_PRGR);
+
+ return misc_aux_work_clean(&misc_aux_work_queues[awdp->sched_id].q,
+ awdp,
+ aux_work & ~ERTS_SSI_AUX_WORK_MISC_THR_PRGR);
+}
+
+#endif
+
+static ERTS_INLINE void
+schedule_misc_aux_work(int sched_id,
+ void (*func)(void *),
+ void *arg)
+{
+ ErtsThrQ_t *q;
erts_misc_aux_work_t *mawp;
- ErtsSchedulerSleepInfo *ssi;
- mawp = misc_aux_work_alloc();
+#ifdef ERTS_SMP
+ ASSERT(0 <= sched_id && sched_id <= erts_no_schedulers);
+#else
+ ASSERT(sched_id == 1);
+#endif
+ q = &misc_aux_work_queues[sched_id].q;
+ mawp = misc_aux_work_alloc();
mawp->func = func;
mawp->arg = arg;
- mawp->next = NULL;
-
- erts_smp_mtx_lock(&misc_aux_work_queues[ix].data.mtx);
- if (!misc_aux_work_queues[ix].data.last)
- misc_aux_work_queues[ix].data.first = mawp;
- else
- misc_aux_work_queues[ix].data.last->next = mawp;
- misc_aux_work_queues[ix].data.last = mawp;
- erts_smp_mtx_unlock(&misc_aux_work_queues[ix].data.mtx);
+ erts_thr_q_enqueue(q, mawp);
+}
- set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(ix-1),
- ERTS_SSI_AUX_WORK_MISC);
+void
+erts_schedule_misc_aux_work(int sched_id,
+ void (*func)(void *),
+ void *arg)
+{
+ schedule_misc_aux_work(sched_id, func, arg);
}
void
-erts_smp_schedule_misc_aux_work(int ignore_self,
- int max_sched,
- void (*func)(void *),
- void *arg)
+erts_schedule_multi_misc_aux_work(int ignore_self,
+ int max_sched,
+ void (*func)(void *),
+ void *arg)
{
- int ix, ignore_ix = -1;
+ int id, self = 0;
if (ignore_self) {
ErtsSchedulerData *esdp = erts_get_scheduler_data();
if (esdp)
- ignore_ix = (int) esdp->no;
+ self = (int) esdp->no;
}
ASSERT(0 < max_sched && max_sched <= erts_no_schedulers);
- for (ix = 1; ix <= max_sched; ix++) {
- if (ix == ignore_ix)
+ for (id = 1; id <= max_sched; id++) {
+ if (id == self)
continue;
- smp_schedule_misc_aux_work(ix, func, arg);
+ schedule_misc_aux_work(id, func, arg);
+ }
+}
+
+#if ERTS_USE_ASYNC_READY_Q
+
+void
+erts_notify_check_async_ready_queue(void *vno)
+{
+ int ix = ((int) (SWord) vno) -1;
+ set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(ix),
+ ERTS_SSI_AUX_WORK_ASYNC_READY);
+}
+
+static erts_aint32_t
+handle_async_ready(ErtsAuxWorkData *awdp,
+ erts_aint32_t aux_work)
+{
+ ErtsSchedulerSleepInfo *ssi = awdp->ssi;
+ unset_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY);
+ if (erts_check_async_ready(awdp->async_ready.queue)) {
+ if (set_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY)
+ & ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN) {
+ unset_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN);
+ aux_work &= ~ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN;
+ }
+ return aux_work;
+ }
+#ifdef ERTS_SMP
+ awdp->async_ready.need_thr_prgr = 0;
+#endif
+ set_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN);
+ return ((aux_work & ~ERTS_SSI_AUX_WORK_ASYNC_READY)
+ | ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN);
+}
+
+static erts_aint32_t
+handle_async_ready_clean(ErtsAuxWorkData *awdp,
+ erts_aint32_t aux_work)
+{
+ void *thr_prgr_p;
+
+#ifdef ERTS_SMP
+ if (awdp->async_ready.need_thr_prgr
+ && !erts_thr_progress_has_reached(awdp->misc.thr_prgr)) {
+ return aux_work & ~ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN;
+ }
+
+ awdp->async_ready.need_thr_prgr = 0;
+ thr_prgr_p = (void *) &awdp->async_ready.thr_prgr;
+#else
+ thr_prgr_p = NULL;
+#endif
+
+ switch (erts_async_ready_clean(awdp->async_ready.queue, thr_prgr_p)) {
+ case ERTS_ASYNC_READY_CLEAN:
+ unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN);
+ return aux_work & ~ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN;
+#ifdef ERTS_SMP
+ case ERTS_ASYNC_READY_NEED_THR_PRGR:
+ erts_thr_progress_wakeup(awdp->esdp,
+ awdp->async_ready.thr_prgr);
+ awdp->async_ready.need_thr_prgr = 1;
+#endif
+ default:
+ return aux_work;
}
}
@@ -964,14 +1075,14 @@ prep_setup_completed_dealloc(void *vproc)
erts_aint32_t count = (erts_aint32_t) (erts_no_schedulers+1);
if (erts_atomic32_dec_read_mb(&completed_dealloc_count) == count) {
/* scheduler threads */
- erts_smp_schedule_misc_aux_work(0,
- erts_no_schedulers,
- setup_completed_dealloc,
- vproc);
+ erts_schedule_multi_misc_aux_work(0,
+ erts_no_schedulers,
+ setup_completed_dealloc,
+ vproc);
/* aux_thread */
- smp_schedule_misc_aux_work(0,
- setup_completed_dealloc,
- vproc);
+ erts_schedule_misc_aux_work(0,
+ setup_completed_dealloc,
+ vproc);
}
}
@@ -992,14 +1103,14 @@ erts_debug_wait_deallocations(Process *c_p)
erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
erts_smp_proc_inc_refc(c_p);
/* scheduler threads */
- erts_smp_schedule_misc_aux_work(0,
- erts_no_schedulers,
- prep_setup_completed_dealloc,
- (void *) c_p);
+ erts_schedule_multi_misc_aux_work(0,
+ erts_no_schedulers,
+ prep_setup_completed_dealloc,
+ (void *) c_p);
/* aux_thread */
- smp_schedule_misc_aux_work(0,
- prep_setup_completed_dealloc,
- (void *) c_p);
+ erts_schedule_misc_aux_work(0,
+ prep_setup_completed_dealloc,
+ (void *) c_p);
return 1;
}
return 0;
@@ -1062,10 +1173,24 @@ handle_aux_work(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)
ERTS_DBG_CHK_AUX_WORK_VAL(aux_work);
}
#ifdef ERTS_SMP
+ if (aux_work & ERTS_SSI_AUX_WORK_MISC_THR_PRGR) {
+ aux_work = handle_misc_aux_work_thr_prgr(awdp, aux_work);
+ ERTS_DBG_CHK_AUX_WORK_VAL(aux_work);
+ }
+#endif
if (aux_work & ERTS_SSI_AUX_WORK_MISC) {
aux_work = handle_misc_aux_work(awdp, aux_work);
ERTS_DBG_CHK_AUX_WORK_VAL(aux_work);
}
+#if ERTS_USE_ASYNC_READY_Q
+ if (aux_work & ERTS_SSI_AUX_WORK_ASYNC_READY) {
+ aux_work = handle_async_ready(awdp, aux_work);
+ ERTS_DBG_CHK_AUX_WORK_VAL(aux_work);
+ }
+ if (aux_work & ERTS_SSI_AUX_WORK_ASYNC_READY_CLEAN) {
+ aux_work = handle_async_ready_clean(awdp, aux_work);
+ ERTS_DBG_CHK_AUX_WORK_VAL(aux_work);
+ }
#endif
#ifdef ERTS_SMP_SCHEDULERS_NEED_TO_CHECK_CHILDREN
if (aux_work & ERTS_SSI_AUX_WORK_CHECK_CHILDREN) {
@@ -3191,10 +3316,18 @@ init_aux_work_data(ErtsAuxWorkData *awdp, ErtsSchedulerData *esdp)
awdp->esdp = esdp;
awdp->ssi = esdp ? esdp->ssi : NULL;
#ifdef ERTS_SMP
+ awdp->misc.thr_prgr = ERTS_THR_PRGR_VAL_WAITING;
awdp->dd.thr_prgr = ERTS_THR_PRGR_VAL_WAITING;
awdp->dd.completed_callback = NULL;
awdp->dd.completed_arg = NULL;
#endif
+#ifdef ERTS_USE_ASYNC_READY_Q
+#ifdef ERTS_SMP
+ awdp->async_ready.need_thr_prgr = 0;
+ awdp->async_ready.thr_prgr = ERTS_THR_PRGR_VAL_WAITING;
+#endif
+ awdp->async_ready.queue = NULL;
+#endif
}
void
@@ -3385,9 +3518,10 @@ erts_init_scheduling(int mrq, int no_schedulers, int no_schedulers_online)
init_aux_work_data(&esdp->aux_work_data, esdp);
}
-#ifdef ERTS_SMP
init_misc_aux_work();
+#ifdef ERTS_SMP
+
erts_atomic32_init_nob(&completed_dealloc_count, 0); /* debug only */
aux_thread_aux_work_data =
@@ -4408,6 +4542,9 @@ sched_thread_func(void *vesdp)
#if HAVE_ERTS_MSEG
erts_mseg_late_init();
#endif
+#if ERTS_USE_ASYNC_READY_Q
+ esdp->aux_work_data.async_ready.queue = erts_get_async_ready_queue(no);
+#endif
erts_sched_init_check_cpu_bind(esdp);