aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam
diff options
context:
space:
mode:
authorSverker Eriksson <[email protected]>2017-05-12 18:05:03 +0200
committerLukas Larsson <[email protected]>2017-10-02 10:34:26 +0200
commit9a0970257aaaf9d343f8045548a34abf30dc0c92 (patch)
treec46a7a7e5acf262f11099d00f1b42bed89f5590b /erts/emulator/beam
parent48e77453536e49b07ddb6be63ba322ddaa5dac45 (diff)
downloadotp-9a0970257aaaf9d343f8045548a34abf30dc0c92.tar.gz
otp-9a0970257aaaf9d343f8045548a34abf30dc0c92.tar.bz2
otp-9a0970257aaaf9d343f8045548a34abf30dc0c92.zip
erts: Add multiple poll sets
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r--erts/emulator/beam/erl_port_task.c37
-rw-r--r--erts/emulator/beam/erl_port_task.h22
-rw-r--r--erts/emulator/beam/erl_process.c144
-rw-r--r--erts/emulator/beam/erl_process.h3
-rw-r--r--erts/emulator/beam/erl_vm.h2
5 files changed, 30 insertions, 178 deletions
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index bdce811847..141e815d1f 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -91,8 +91,6 @@ static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_q
erts_atomic_read_nob(&(PP)->run_queue))); \
} while (0)
-erts_atomic_t erts_port_task_outstanding_io_tasks;
-
#define ERTS_PT_STATE_SCHEDULED 0
#define ERTS_PT_STATE_ABORTED 1
#define ERTS_PT_STATE_EXECUTING 2
@@ -584,9 +582,12 @@ static ERTS_INLINE void
reset_executed_io_task_handle(ErtsPortTask *ptp)
{
if (ptp->u.alive.handle) {
+ ErtsIoTask *itp = ErtsContainerStruct(ptp->u.alive.handle, ErtsIoTask, task);
+
ASSERT(ptp == handle2task(ptp->u.alive.handle));
erts_io_notify_port_task_executed(ptp->u.alive.handle);
reset_port_task_handle(ptp->u.alive.handle);
+ erts_check_io_interrupt(itp->pollset, 1);
}
}
@@ -1308,20 +1309,7 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp)
if (old_state != ERTS_PT_STATE_SCHEDULED)
res = - 1; /* Task already aborted, executing, or executed */
else {
-
reset_port_task_handle(pthp);
-
- switch (ptp->type) {
- case ERTS_PORT_TASK_INPUT:
- case ERTS_PORT_TASK_OUTPUT:
- ASSERT(erts_atomic_read_nob(
- &erts_port_task_outstanding_io_tasks) > 0);
- erts_atomic_dec_relb(&erts_port_task_outstanding_io_tasks);
- break;
- default:
- break;
- }
-
res = 0;
}
}
@@ -1458,7 +1446,6 @@ erts_port_task_schedule(Eterm id,
va_start(argp, type);
ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent);
va_end(argp);
- erts_atomic_inc_relb(&erts_port_task_outstanding_io_tasks);
break;
}
case ERTS_PORT_TASK_PROC_SIG: {
@@ -1634,13 +1621,12 @@ erts_port_task_free_port(Port *pp)
* scheduling of processes. Run-queue lock should be held by caller.
*/
-int
+void
erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
{
Port *pp;
ErtsPortTask *execq;
int processing_busy_q;
- int res = 0;
int vreds = 0;
int reds = 0;
erts_aint_t io_tasks_executed = 0;
@@ -1655,7 +1641,6 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
pp = pop_port(runq);
if (!pp) {
- res = 0;
goto done;
}
@@ -1822,14 +1807,6 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
erts_unblock_fpe(fpe_was_unmasked);
ERTS_MSACC_POP_STATE_M();
-
- if (io_tasks_executed) {
- ASSERT(erts_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
- >= io_tasks_executed);
- erts_atomic_add_relb(&erts_port_task_outstanding_io_tasks,
- -1*io_tasks_executed);
- }
-
ASSERT(runq == (ErtsRunQueue *) erts_atomic_read_nob(&pp->run_queue));
active = finalize_exec(pp, &execq, processing_busy_q);
@@ -1870,15 +1847,11 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
}
done:
- res = (erts_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
- != (erts_aint_t) 0);
runq->scheduler->reductions += reds;
ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
ERTS_PORT_REDUCTIONS_EXECUTED(esdp, runq, reds);
-
- return res;
}
static void
@@ -2123,8 +2096,6 @@ erts_dequeue_port(ErtsRunQueue *rq)
void
erts_port_task_init(void)
{
- erts_atomic_init_nob(&erts_port_task_outstanding_io_tasks,
- (erts_aint_t) 0);
init_port_task_alloc();
init_busy_caller_table_alloc();
}
diff --git a/erts/emulator/beam/erl_port_task.h b/erts/emulator/beam/erl_port_task.h
index ffd42c9bab..ce63e4a7c0 100644
--- a/erts/emulator/beam/erl_port_task.h
+++ b/erts/emulator/beam/erl_port_task.h
@@ -61,11 +61,6 @@ typedef enum {
ERTS_PORT_TASK_PROC_SIG
} ErtsPortTaskType;
-#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
-/* NOTE: Do not access any of the exported variables directly */
-extern erts_atomic_t erts_port_task_outstanding_io_tasks;
-#endif
-
#define ERTS_PTS_FLG_IN_RUNQ (((erts_aint32_t) 1) << 0)
#define ERTS_PTS_FLG_EXEC (((erts_aint32_t) 1) << 1)
#define ERTS_PTS_FLG_HAVE_TASKS (((erts_aint32_t) 1) << 2)
@@ -139,10 +134,6 @@ ERTS_GLB_INLINE void erts_port_task_sched_unlock(ErtsPortTaskSched *ptsp);
ERTS_GLB_INLINE int erts_port_task_sched_lock_is_locked(ErtsPortTaskSched *ptsp);
ERTS_GLB_INLINE void erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched *ptsp);
-#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
-ERTS_GLB_INLINE int erts_port_task_have_outstanding_io_tasks(void);
-#endif
-
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE void
@@ -220,21 +211,10 @@ erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched *ptsp)
erts_atomic32_read_bor_nob(&ptsp->flags, ERTS_PTS_FLG_EXITING);
}
-#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
-
-ERTS_GLB_INLINE int
-erts_port_task_have_outstanding_io_tasks(void)
-{
- return (erts_atomic_read_acqb(&erts_port_task_outstanding_io_tasks)
- != 0);
-}
-
-#endif /* ERTS_INCLUDE_SCHEDULER_INTERNALS */
-
#endif
#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
-int erts_port_task_execute(ErtsRunQueue *, Port **);
+void erts_port_task_execute(ErtsRunQueue *, Port **);
void erts_port_task_init(void);
#endif
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 5d33c9f6d6..2a34410d5b 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -365,9 +365,6 @@ erts_sched_stat_t erts_sched_stat;
static erts_tsd_key_t ERTS_WRITE_UNLIKELY(sched_data_key);
-static erts_atomic32_t function_calls;
-
-static erts_atomic32_t doing_sys_schedule;
static erts_atomic32_t no_empty_run_queues;
long erts_runq_supervision_interval = 0;
static ethr_event runq_supervision_event;
@@ -1558,18 +1555,19 @@ erts_psd_set_init(Process *p, int ix, void *data)
void
-erts_sched_finish_poke(ErtsSchedulerSleepInfo *ssi, erts_aint32_t flags)
+erts_sched_finish_poke(ErtsSchedulerSleepInfo *ssi,
+ erts_aint32_t flags)
{
switch (flags & ERTS_SSI_FLGS_SLEEP_TYPE) {
case ERTS_SSI_FLG_POLL_SLEEPING:
- erts_check_io_interrupt(1);
+ erts_check_io_interrupt(ssi->esdp->pollset, 1);
break;
case ERTS_SSI_FLG_POLL_SLEEPING|ERTS_SSI_FLG_TSE_SLEEPING:
/*
* Thread progress blocking while poll sleeping; need
* to signal on both...
*/
- erts_check_io_interrupt(1);
+ erts_check_io_interrupt(ssi->esdp->pollset, 1);
/* fall through */
case ERTS_SSI_FLG_TSE_SLEEPING:
erts_tse_set(ssi->event);
@@ -2869,50 +2867,14 @@ erts_active_schedulers(void)
return as;
}
-
-static ERTS_INLINE void
-clear_sys_scheduling(void)
-{
- erts_atomic32_set_mb(&doing_sys_schedule, 0);
-}
-
-static ERTS_INLINE int
-try_set_sys_scheduling(void)
-{
- return 0 == erts_atomic32_cmpxchg_acqb(&doing_sys_schedule, 1, 0);
-}
-
-
static ERTS_INLINE int
prepare_for_sys_schedule(int non_blocking)
{
- if (non_blocking && erts_eager_check_io) {
- return try_set_sys_scheduling();
- }
- else {
- while (!erts_port_task_have_outstanding_io_tasks()
- && try_set_sys_scheduling()) {
- if (!erts_port_task_have_outstanding_io_tasks())
- return 1;
- clear_sys_scheduling();
- }
- return 0;
- }
+ return 1;
}
static ERTS_INLINE void
-sched_change_waiting_sys_to_waiting(Uint no, ErtsRunQueue *rq)
-{
- ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
-
- ASSERT(!ERTS_RUNQ_IX_IS_DIRTY(rq->ix));
-
- ASSERT(rq->waiting < 0);
- rq->waiting *= -1;
-}
-
-static ERTS_INLINE void
sched_waiting(Uint no, ErtsRunQueue *rq)
{
ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
@@ -3083,7 +3045,7 @@ sched_set_sleeptype(ErtsSchedulerSleepInfo *ssi, erts_aint32_t sleep_type)
erts_tse_reset(ssi->event);
else {
ASSERT(sleep_type == ERTS_SSI_FLG_POLL_SLEEPING);
- erts_check_io_interrupt(0);
+ erts_check_io_interrupt(ssi->esdp->pollset, 0);
}
while (1) {
@@ -3272,8 +3234,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
spincount = sched_busy_wait.tse;
- tse_wait:
-
if (ERTS_SCHEDULER_IS_DIRTY(esdp))
dirty_sched_wall_time_change(esdp, working = 0);
else if (thr_prgr_active != working)
@@ -3402,7 +3362,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
else
{
- erts_atomic32_set_relb(&function_calls, 0);
+ esdp->function_calls = 0;
*fcalls = 0;
ASSERT(!ERTS_SCHEDULER_IS_DIRTY(esdp));
@@ -3428,7 +3388,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_CHECK_IO);
- ASSERT(!erts_port_task_have_outstanding_io_tasks());
LTTNG2(scheduler_poll, esdp->no, 1);
erl_sys_schedule(1); /* Might give us something to do */
@@ -3459,48 +3418,10 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
ASSERT(!(flgs & ERTS_SSI_FLG_SLEEPING));
goto sys_woken;
}
-
- /*
- * If we got new I/O tasks we aren't allowed to
- * call erl_sys_schedule() until it is handled.
- */
- if (erts_port_task_have_outstanding_io_tasks()) {
- clear_sys_scheduling();
- /*
- * Got to check that we still got I/O tasks; otherwise
- * we have to continue checking for I/O...
- */
- if (!prepare_for_sys_schedule(0)) {
- spincount *= ERTS_SCHED_TSE_SLEEP_SPINCOUNT_FACT;
- goto tse_wait;
- }
- }
}
erts_runq_lock(rq);
- /*
- * If we got new I/O tasks we aren't allowed to
- * sleep in erl_sys_schedule().
- */
- if (erts_port_task_have_outstanding_io_tasks()) {
- clear_sys_scheduling();
-
- /*
- * Got to check that we still got I/O tasks; otherwise
- * we have to wait in erl_sys_schedule() after all...
- */
- if (!prepare_for_sys_schedule(0)) {
- /*
- * Not allowed to wait in erl_sys_schedule;
- * do tse wait instead...
- */
- sched_change_waiting_sys_to_waiting(esdp->no, rq);
- erts_runq_unlock(rq);
- spincount = 0;
- goto tse_wait;
- }
- }
if (aux_work) {
erts_runq_unlock(rq);
goto sys_poll_aux_work;
@@ -3517,7 +3438,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
ASSERT(!(flgs & ERTS_SSI_FLG_SLEEPING));
goto sys_woken;
}
- ASSERT(!erts_port_task_have_outstanding_io_tasks());
goto sys_poll_aux_work;
}
@@ -3532,8 +3452,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
if (thr_prgr_active)
erts_thr_progress_active(esdp, thr_prgr_active = 0);
- ASSERT(!erts_port_task_have_outstanding_io_tasks());
-
ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_CHECK_IO);
LTTNG2(scheduler_poll, esdp->no, 0);
@@ -3561,7 +3479,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
erts_thr_progress_active(esdp, thr_prgr_active = 1);
erts_runq_lock(rq);
}
- clear_sys_scheduling();
if (flgs & ~(ERTS_SSI_FLG_SUSPENDED|ERTS_SSI_FLG_MSB_EXEC))
erts_atomic32_read_band_nob(&ssi->flags,
(ERTS_SSI_FLG_SUSPENDED
@@ -5840,6 +5757,7 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num,
esdp->run_queue = runq;
if (ERTS_RUNQ_IX_IS_DIRTY(runq->ix)) {
esdp->no = 0;
+ esdp->pollset = NULL;
if (runq == ERTS_DIRTY_CPU_RUNQ)
esdp->type = ERTS_SCHED_DIRTY_CPU;
else {
@@ -5859,6 +5777,7 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num,
esdp->type = ERTS_SCHED_NORMAL;
esdp->no = (Uint) num;
esdp->dirty_no = 0;
+ esdp->pollset = erts_get_pollset(num);
runq->scheduler = esdp;
}
esdp->dirty_shadow_process = shadow_proc;
@@ -5871,6 +5790,9 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num,
shadow_proc->static_flags = ERTS_STC_FLG_SHADOW_PROC;
}
+ esdp->function_calls = 0;
+
+ ssi->esdp = esdp;
esdp->ssi = ssi;
esdp->current_process = NULL;
esdp->current_port = NULL;
@@ -6204,13 +6126,9 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online
erts_atomic32_set_nob(&schdlr_sspnd.changing,
set_schdlr_sspnd_change_flags);
- erts_atomic32_init_nob(&doing_sys_schedule, 0);
-
init_misc_aux_work();
- erts_atomic32_init_nob(&function_calls, 0);
-
/* init port tasks */
erts_port_task_init();
@@ -7379,7 +7297,7 @@ msb_scheduler_type_switch(ErtsSchedType sched_type,
calls = ERTS_MODIFIED_TIMING_INPUT_REDS + 1;
else
calls = INPUT_REDUCTIONS + 1;
- erts_atomic32_set_nob(&function_calls, calls);
+ esdp->function_calls = calls;
if ((nrml_prio == ERTS_MSB_NONE_PRIO_BIT)
& ((dcpu_prio != ERTS_MSB_NONE_PRIO_BIT)
@@ -9769,7 +9687,7 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
}
rq = erts_get_runq_current(esdp);
ASSERT(esdp);
- fcalls = (int) erts_atomic32_read_acqb(&function_calls);
+ fcalls = esdp->function_calls;
actual_reds = reds = 0;
erts_runq_lock(rq);
} else {
@@ -9795,7 +9713,8 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
reds = ERTS_PROC_MIN_CONTEXT_SWITCH_REDS_COST;
esdp->virtual_reds = 0;
- fcalls = (int) erts_atomic32_add_read_acqb(&function_calls, reds);
+ esdp->function_calls += reds;
+ fcalls = esdp->function_calls;
ASSERT(esdp && esdp == erts_get_scheduler_data());
rq = erts_get_runq_current(esdp);
@@ -10044,11 +9963,11 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
ERTS_MSACC_PUSH_STATE_CACHED_M();
- erts_atomic32_set_relb(&function_calls, 0);
+ esdp->function_calls = 0;
fcalls = 0;
#if 0 /* Not needed since we wont wait in sys schedule */
- erts_check_io_interrupt(0);
+ erts_check_io_interrupt(esdp->pollset, 0);
#endif
erts_runq_unlock(rq);
@@ -10063,7 +9982,6 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
erts_bump_timers(esdp->timer_wheel, current_time);
erts_runq_lock(rq);
- clear_sys_scheduling();
goto continue_check_activities_to_run;
}
@@ -10079,29 +9997,9 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
if (flags & PORT_BIT) {
- int have_outstanding_io;
- have_outstanding_io = erts_port_task_execute(rq, &esdp->current_port);
- if ((!erts_eager_check_io
- && have_outstanding_io
- && fcalls > 2*input_reductions)
- || (flags & ERTS_RUNQ_FLG_HALTING)) {
- /*
- * If we have performed more than 2*INPUT_REDUCTIONS since
- * last call to erl_sys_schedule() and we still haven't
- * handled all I/O tasks we stop running processes and
- * focus completely on ports.
- *
- * One could argue that this is a strange behavior. The
- * reason for doing it this way is that it is similar
- * to the behavior before port tasks were introduced.
- * We don't want to change the behavior too much, at
- * least not at the time of writing. This behavior
- * might change in the future.
- *
- * /rickard
- */
- goto check_activities_to_run;
- }
+ erts_port_task_execute(rq, &esdp->current_port);
+ if (flags & ERTS_RUNQ_FLG_HALTING)
+ goto check_activities_to_run;
}
/*
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index e63da2d9db..b372146846 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -364,6 +364,7 @@ typedef struct {
} ErtsSchedulerSleepList;
struct ErtsSchedulerSleepInfo_ {
+ struct ErtsSchedulerData_ *esdp;
ErtsSchedulerSleepInfo *next;
ErtsSchedulerSleepInfo *prev;
erts_atomic32_t flags;
@@ -631,6 +632,8 @@ struct ErtsSchedulerData_ {
void *match_pseudo_process; /* erl_db_util.c:db_prog_match() */
Process *free_process;
ErtsThrPrgrData thr_progress_data;
+ struct pollset_info* pollset;
+ int function_calls;
ErtsSchedulerSleepInfo *ssi;
Process *current_process;
ErtsSchedType type;
diff --git a/erts/emulator/beam/erl_vm.h b/erts/emulator/beam/erl_vm.h
index 076767c7cd..82977e62ea 100644
--- a/erts/emulator/beam/erl_vm.h
+++ b/erts/emulator/beam/erl_vm.h
@@ -46,7 +46,7 @@
*/
#define ERTS_X_REGS_ALLOCATED (MAX_REG+3)
-#define INPUT_REDUCTIONS (2 * CONTEXT_REDS)
+#define INPUT_REDUCTIONS (CONTEXT_REDS / 4)
#define H_DEFAULT_SIZE 233 /* default (heap + stack) min size */
#define VH_DEFAULT_SIZE 32768 /* default virtual (bin) heap min size (words) */