aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/emulator/beam/erl_port_task.c21
-rw-r--r--erts/emulator/beam/erl_process.c65
-rw-r--r--erts/emulator/beam/erl_process.h2
3 files changed, 64 insertions, 24 deletions
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index 86454fe1fa..2c26e1fd45 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -524,13 +524,8 @@ erts_port_task_schedule(Eterm id,
ERTS_PT_CHK_PRES_PORTQ(runq, pp);
- if (!pp->sched.taskq) {
- pp->sched.taskq = port_taskq_init(port_taskq_alloc(), pp);
- enq_port = !pp->sched.in_runq && !pp->sched.exe_taskq;
- }
-
+ if (!pp->sched.taskq && !pp->sched.in_runq && !pp->sched.exe_taskq) {
#ifdef ERTS_SMP
- if (enq_port) {
ErtsRunQueue *xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
if (xrunq) {
/* Port emigrated ... */
@@ -540,10 +535,17 @@ erts_port_task_schedule(Eterm id,
if (!runq)
return -1;
}
- }
+ enq_port = !pp->sched.taskq && !pp->sched.in_runq && !pp->sched.exe_taskq;
+#else
+ enq_port = 1;
#endif
+ }
+
+ ASSERT(!enq_port || !(runq->flags & ERTS_RUNQ_FLG_SUSPENDED));
+
+ if (!pp->sched.taskq)
+ pp->sched.taskq = port_taskq_init(port_taskq_alloc(), pp);
- ASSERT(pp->sched.taskq);
ASSERT(ptp);
ptp->type = type;
@@ -932,8 +934,9 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
#endif
done:
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
+ runq->scheduler->reductions += reds;
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
ERTS_PORT_REDUCTIONS_EXECUTED(runq, reds);
return res;
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 09169ba914..f8b0da1154 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -43,6 +43,9 @@
#include "erl_async.h"
#include "dtrace-wrapper.h"
+#define ERTS_DELAYED_WAKEUP_INFINITY (~(Uint64) 0)
+#define ERTS_DELAYED_WAKEUP_REDUCTIONS ((Uint64) CONTEXT_REDS/2)
+
#define ERTS_RUNQ_CHECK_BALANCE_REDS_PER_SCHED (2000*CONTEXT_REDS)
#define ERTS_RUNQ_CALL_CHECK_BALANCE_REDS \
(ERTS_RUNQ_CHECK_BALANCE_REDS_PER_SCHED/2)
@@ -1176,9 +1179,15 @@ haw_thr_prgr_current_check_progress(ErtsAuxWorkData *awdp)
}
static ERTS_INLINE erts_aint32_t
-handle_delayed_aux_work_wakeup(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)
+handle_delayed_aux_work_wakeup(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waiting)
{
int jix, max_jix;
+
+ ASSERT(awdp->delayed_wakeup.next != ERTS_DELAYED_WAKEUP_INFINITY);
+
+ if (!waiting && awdp->delayed_wakeup.next > awdp->esdp->reductions)
+ return aux_work;
+
unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_DELAYED_AW_WAKEUP);
ERTS_THR_MEMORY_BARRIER;
@@ -1194,11 +1203,14 @@ handle_delayed_aux_work_wakeup(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)
set_aux_work_flags_wakeup_nob(ERTS_SCHED_SLEEP_INFO_IX(sched-1),
aux_work);
}
+ awdp->delayed_wakeup.next = ERTS_DELAYED_WAKEUP_INFINITY;
return aux_work & ~ERTS_SSI_AUX_WORK_DELAYED_AW_WAKEUP;
}
static ERTS_INLINE void
-schedule_aux_work_wakeup(ErtsAuxWorkData *awdp, int sched, erts_aint32_t aux_work)
+schedule_aux_work_wakeup(ErtsAuxWorkData *awdp,
+ int sched,
+ erts_aint32_t aux_work)
{
int jix = awdp->delayed_wakeup.sched2jix[sched];
if (jix >= 0) {
@@ -1211,7 +1223,20 @@ schedule_aux_work_wakeup(ErtsAuxWorkData *awdp, int sched, erts_aint32_t aux_wor
awdp->delayed_wakeup.job[jix].sched = sched;
awdp->delayed_wakeup.job[jix].aux_work = aux_work;
}
- set_aux_work_flags_wakeup_nob(awdp->ssi, ERTS_SSI_AUX_WORK_DELAYED_AW_WAKEUP);
+
+ if (awdp->delayed_wakeup.next != ERTS_DELAYED_WAKEUP_INFINITY) {
+ ASSERT(erts_atomic32_read_nob(&awdp->ssi->aux_work)
+ & ERTS_SSI_AUX_WORK_DELAYED_AW_WAKEUP);
+ }
+ else {
+ awdp->delayed_wakeup.next = (awdp->esdp->reductions
+ + ERTS_DELAYED_WAKEUP_REDUCTIONS);
+
+ ASSERT(!(erts_atomic32_read_nob(&awdp->ssi->aux_work)
+ & ERTS_SSI_AUX_WORK_DELAYED_AW_WAKEUP));
+ set_aux_work_flags_wakeup_nob(awdp->ssi,
+ ERTS_SSI_AUX_WORK_DELAYED_AW_WAKEUP);
+ }
}
#endif
@@ -1290,7 +1315,8 @@ misc_aux_work_clean(ErtsThrQ_t *q,
static ERTS_INLINE erts_aint32_t
handle_misc_aux_work(ErtsAuxWorkData *awdp,
- erts_aint32_t aux_work)
+ erts_aint32_t aux_work,
+ int waiting)
{
ErtsThrQ_t *q = &misc_aux_work_queues[awdp->sched_id].q;
@@ -1310,7 +1336,8 @@ handle_misc_aux_work(ErtsAuxWorkData *awdp,
static ERTS_INLINE erts_aint32_t
handle_misc_aux_work_thr_prgr(ErtsAuxWorkData *awdp,
- erts_aint32_t aux_work)
+ erts_aint32_t aux_work,
+ int waiting)
{
if (!erts_thr_progress_has_reached_this(haw_thr_prgr_current(awdp),
awdp->misc.thr_prgr))
@@ -1389,7 +1416,8 @@ erts_notify_check_async_ready_queue(void *vno)
static ERTS_INLINE erts_aint32_t
handle_async_ready(ErtsAuxWorkData *awdp,
- erts_aint32_t aux_work)
+ erts_aint32_t aux_work,
+ int waiting)
{
ErtsSchedulerSleepInfo *ssi = awdp->ssi;
unset_aux_work_flags(ssi, ERTS_SSI_AUX_WORK_ASYNC_READY);
@@ -1411,7 +1439,8 @@ handle_async_ready(ErtsAuxWorkData *awdp,
static ERTS_INLINE erts_aint32_t
handle_async_ready_clean(ErtsAuxWorkData *awdp,
- erts_aint32_t aux_work)
+ erts_aint32_t aux_work,
+ int waiting)
{
void *thr_prgr_p;
@@ -1448,7 +1477,7 @@ handle_async_ready_clean(ErtsAuxWorkData *awdp,
static ERTS_INLINE erts_aint32_t
-handle_fix_alloc(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)
+handle_fix_alloc(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waiting)
{
ErtsSchedulerSleepInfo *ssi = awdp->ssi;
erts_aint32_t res;
@@ -1482,7 +1511,7 @@ erts_alloc_notify_delayed_dealloc(int ix)
}
static ERTS_INLINE erts_aint32_t
-handle_delayed_dealloc(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)
+handle_delayed_dealloc(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waiting)
{
ErtsSchedulerSleepInfo *ssi = awdp->ssi;
int need_thr_progress = 0;
@@ -1520,7 +1549,7 @@ handle_delayed_dealloc(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)
}
static ERTS_INLINE erts_aint32_t
-handle_delayed_dealloc_thr_prgr(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)
+handle_delayed_dealloc_thr_prgr(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waiting)
{
ErtsSchedulerSleepInfo *ssi;
int need_thr_progress;
@@ -1719,7 +1748,7 @@ erts_smp_notify_check_children_needed(void)
}
static ERTS_INLINE erts_aint32_t
-handle_check_children(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)
+handle_check_children(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waiting)
{
unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_CHECK_CHILDREN);
erts_check_children();
@@ -1742,7 +1771,7 @@ erts_smp_atomic32_t erts_halt_progress;
int erts_halt_code;
static ERTS_INLINE erts_aint32_t
-handle_reap_ports(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)
+handle_reap_ports(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waiting)
{
unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_REAP_PORTS);
awdp->esdp->run_queue->halt_in_progress = 1;
@@ -1790,7 +1819,7 @@ handle_reap_ports(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)
#if HAVE_ERTS_MSEG
static ERTS_INLINE erts_aint32_t
-handle_mseg_cache_check(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)
+handle_mseg_cache_check(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waiting)
{
unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_MSEG_CACHE_CHECK);
erts_mseg_cache_check();
@@ -1800,7 +1829,7 @@ handle_mseg_cache_check(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)
#endif
static ERTS_INLINE erts_aint32_t
-handle_setup_aux_work_timer(ErtsAuxWorkData *awdp, erts_aint32_t aux_work)
+handle_setup_aux_work_timer(ErtsAuxWorkData *awdp, erts_aint32_t aux_work, int waiting)
{
unset_aux_work_flags(awdp->ssi, ERTS_SSI_AUX_WORK_SET_TMO);
setup_aux_work_timer();
@@ -1814,7 +1843,7 @@ handle_aux_work(ErtsAuxWorkData *awdp, erts_aint32_t orig_aux_work, int waiting)
#define HANDLE_AUX_WORK(FLG, HNDLR) \
ignore |= FLG; \
if (aux_work & FLG) { \
- aux_work = HNDLR(awdp, aux_work); \
+ aux_work = HNDLR(awdp, aux_work, waiting); \
ERTS_DBG_CHK_AUX_WORK_VAL(aux_work); \
if (!(aux_work & ~ignore)) { \
ERTS_DBG_CHK_AUX_WORK_VAL(aux_work); \
@@ -4584,6 +4613,7 @@ init_aux_work_data(ErtsAuxWorkData *awdp, ErtsSchedulerData *esdp, char *dawwp)
awdp->async_ready.queue = NULL;
#endif
#ifdef ERTS_SMP
+ awdp->delayed_wakeup.next = ERTS_DELAYED_WAKEUP_INFINITY;
if (!dawwp) {
awdp->delayed_wakeup.job = NULL;
awdp->delayed_wakeup.sched2jix = NULL;
@@ -4778,6 +4808,9 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online)
#ifdef ERTS_SMP
daww_ptr += daww_sz;
#endif
+
+ esdp->reductions = 0;
+
init_sched_wall_time(&esdp->sched_wall_time);
}
@@ -6863,6 +6896,8 @@ Process *schedule(Process *p, int calls)
| ERTS_PROC_LOCK_STATUS));
#endif
+ esdp->reductions += reds;
+
schedule_out_process(rq, state, p); /* Returns with rq locked! */
ERTS_PROC_REDUCTIONS_EXECUTED(rq,
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index fa6961e8a4..e789c873fb 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -478,6 +478,7 @@ typedef struct {
#endif
#ifdef ERTS_SMP
struct {
+ Uint64 next;
int *sched2jix;
int jix;
ErtsDelayedAuxWorkWakeupJob *job;
@@ -520,6 +521,7 @@ struct ErtsSchedulerData_ {
ErtsSchedAllocData alloc_data;
+ Uint64 reductions;
ErtsSchedWallTime sched_wall_time;
#ifdef ERTS_DO_VERIFY_UNUSED_TEMP_ALLOC