From 9a0970257aaaf9d343f8045548a34abf30dc0c92 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 12 May 2017 18:05:03 +0200 Subject: erts: Add multiple poll sets --- erts/emulator/beam/erl_port_task.c | 37 ++-------- erts/emulator/beam/erl_port_task.h | 22 +----- erts/emulator/beam/erl_process.c | 144 ++++++------------------------------- erts/emulator/beam/erl_process.h | 3 + erts/emulator/beam/erl_vm.h | 2 +- 5 files changed, 30 insertions(+), 178 deletions(-) (limited to 'erts/emulator/beam') diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c index bdce811847..141e815d1f 100644 --- a/erts/emulator/beam/erl_port_task.c +++ b/erts/emulator/beam/erl_port_task.c @@ -91,8 +91,6 @@ static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_q erts_atomic_read_nob(&(PP)->run_queue))); \ } while (0) -erts_atomic_t erts_port_task_outstanding_io_tasks; - #define ERTS_PT_STATE_SCHEDULED 0 #define ERTS_PT_STATE_ABORTED 1 #define ERTS_PT_STATE_EXECUTING 2 @@ -584,9 +582,12 @@ static ERTS_INLINE void reset_executed_io_task_handle(ErtsPortTask *ptp) { if (ptp->u.alive.handle) { + ErtsIoTask *itp = ErtsContainerStruct(ptp->u.alive.handle, ErtsIoTask, task); + ASSERT(ptp == handle2task(ptp->u.alive.handle)); erts_io_notify_port_task_executed(ptp->u.alive.handle); reset_port_task_handle(ptp->u.alive.handle); + erts_check_io_interrupt(itp->pollset, 1); } } @@ -1308,20 +1309,7 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp) if (old_state != ERTS_PT_STATE_SCHEDULED) res = - 1; /* Task already aborted, executing, or executed */ else { - reset_port_task_handle(pthp); - - switch (ptp->type) { - case ERTS_PORT_TASK_INPUT: - case ERTS_PORT_TASK_OUTPUT: - ASSERT(erts_atomic_read_nob( - &erts_port_task_outstanding_io_tasks) > 0); - erts_atomic_dec_relb(&erts_port_task_outstanding_io_tasks); - break; - default: - break; - } - res = 0; } } @@ -1458,7 +1446,6 @@ erts_port_task_schedule(Eterm id, va_start(argp, type); ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent); va_end(argp); - erts_atomic_inc_relb(&erts_port_task_outstanding_io_tasks); break; } case ERTS_PORT_TASK_PROC_SIG: { @@ -1634,13 +1621,12 @@ erts_port_task_free_port(Port *pp) * scheduling of processes. Run-queue lock should be held by caller. */ -int +void erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) { Port *pp; ErtsPortTask *execq; int processing_busy_q; - int res = 0; int vreds = 0; int reds = 0; erts_aint_t io_tasks_executed = 0; @@ -1655,7 +1641,6 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) pp = pop_port(runq); if (!pp) { - res = 0; goto done; } @@ -1822,14 +1807,6 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) erts_unblock_fpe(fpe_was_unmasked); ERTS_MSACC_POP_STATE_M(); - - if (io_tasks_executed) { - ASSERT(erts_atomic_read_nob(&erts_port_task_outstanding_io_tasks) - >= io_tasks_executed); - erts_atomic_add_relb(&erts_port_task_outstanding_io_tasks, - -1*io_tasks_executed); - } - ASSERT(runq == (ErtsRunQueue *) erts_atomic_read_nob(&pp->run_queue)); active = finalize_exec(pp, &execq, processing_busy_q); @@ -1870,15 +1847,11 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) } done: - res = (erts_atomic_read_nob(&erts_port_task_outstanding_io_tasks) - != (erts_aint_t) 0); runq->scheduler->reductions += reds; ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq)); ERTS_PORT_REDUCTIONS_EXECUTED(esdp, runq, reds); - - return res; } static void @@ -2123,8 +2096,6 @@ erts_dequeue_port(ErtsRunQueue *rq) void erts_port_task_init(void) { - erts_atomic_init_nob(&erts_port_task_outstanding_io_tasks, - (erts_aint_t) 0); init_port_task_alloc(); init_busy_caller_table_alloc(); } diff --git a/erts/emulator/beam/erl_port_task.h b/erts/emulator/beam/erl_port_task.h index ffd42c9bab..ce63e4a7c0 100644 --- a/erts/emulator/beam/erl_port_task.h +++ b/erts/emulator/beam/erl_port_task.h @@ -61,11 +61,6 @@ typedef enum { ERTS_PORT_TASK_PROC_SIG } ErtsPortTaskType; -#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS -/* NOTE: Do not access any of the exported variables directly */ -extern erts_atomic_t erts_port_task_outstanding_io_tasks; -#endif - #define ERTS_PTS_FLG_IN_RUNQ (((erts_aint32_t) 1) << 0) #define ERTS_PTS_FLG_EXEC (((erts_aint32_t) 1) << 1) #define ERTS_PTS_FLG_HAVE_TASKS (((erts_aint32_t) 1) << 2) @@ -139,10 +134,6 @@ ERTS_GLB_INLINE void erts_port_task_sched_unlock(ErtsPortTaskSched *ptsp); ERTS_GLB_INLINE int erts_port_task_sched_lock_is_locked(ErtsPortTaskSched *ptsp); ERTS_GLB_INLINE void erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched *ptsp); -#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS -ERTS_GLB_INLINE int erts_port_task_have_outstanding_io_tasks(void); -#endif - #if ERTS_GLB_INLINE_INCL_FUNC_DEF ERTS_GLB_INLINE void @@ -220,21 +211,10 @@ erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched *ptsp) erts_atomic32_read_bor_nob(&ptsp->flags, ERTS_PTS_FLG_EXITING); } -#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS - -ERTS_GLB_INLINE int -erts_port_task_have_outstanding_io_tasks(void) -{ - return (erts_atomic_read_acqb(&erts_port_task_outstanding_io_tasks) - != 0); -} - -#endif /* ERTS_INCLUDE_SCHEDULER_INTERNALS */ - #endif #ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS -int erts_port_task_execute(ErtsRunQueue *, Port **); +void erts_port_task_execute(ErtsRunQueue *, Port **); void erts_port_task_init(void); #endif diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 5d33c9f6d6..2a34410d5b 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -365,9 +365,6 @@ erts_sched_stat_t erts_sched_stat; static erts_tsd_key_t ERTS_WRITE_UNLIKELY(sched_data_key); -static erts_atomic32_t function_calls; - -static erts_atomic32_t doing_sys_schedule; static erts_atomic32_t no_empty_run_queues; long erts_runq_supervision_interval = 0; static ethr_event runq_supervision_event; @@ -1558,18 +1555,19 @@ erts_psd_set_init(Process *p, int ix, void *data) void -erts_sched_finish_poke(ErtsSchedulerSleepInfo *ssi, erts_aint32_t flags) +erts_sched_finish_poke(ErtsSchedulerSleepInfo *ssi, + erts_aint32_t flags) { switch (flags & ERTS_SSI_FLGS_SLEEP_TYPE) { case ERTS_SSI_FLG_POLL_SLEEPING: - erts_check_io_interrupt(1); + erts_check_io_interrupt(ssi->esdp->pollset, 1); break; case ERTS_SSI_FLG_POLL_SLEEPING|ERTS_SSI_FLG_TSE_SLEEPING: /* * Thread progress blocking while poll sleeping; need * to signal on both... */ - erts_check_io_interrupt(1); + erts_check_io_interrupt(ssi->esdp->pollset, 1); /* fall through */ case ERTS_SSI_FLG_TSE_SLEEPING: erts_tse_set(ssi->event); @@ -2869,49 +2867,13 @@ erts_active_schedulers(void) return as; } - -static ERTS_INLINE void -clear_sys_scheduling(void) -{ - erts_atomic32_set_mb(&doing_sys_schedule, 0); -} - -static ERTS_INLINE int -try_set_sys_scheduling(void) -{ - return 0 == erts_atomic32_cmpxchg_acqb(&doing_sys_schedule, 1, 0); -} - - static ERTS_INLINE int prepare_for_sys_schedule(int non_blocking) { - if (non_blocking && erts_eager_check_io) { - return try_set_sys_scheduling(); - } - else { - while (!erts_port_task_have_outstanding_io_tasks() - && try_set_sys_scheduling()) { - if (!erts_port_task_have_outstanding_io_tasks()) - return 1; - clear_sys_scheduling(); - } - return 0; - } + return 1; } -static ERTS_INLINE void -sched_change_waiting_sys_to_waiting(Uint no, ErtsRunQueue *rq) -{ - ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq)); - - ASSERT(!ERTS_RUNQ_IX_IS_DIRTY(rq->ix)); - - ASSERT(rq->waiting < 0); - rq->waiting *= -1; -} - static ERTS_INLINE void sched_waiting(Uint no, ErtsRunQueue *rq) { @@ -3083,7 +3045,7 @@ sched_set_sleeptype(ErtsSchedulerSleepInfo *ssi, erts_aint32_t sleep_type) erts_tse_reset(ssi->event); else { ASSERT(sleep_type == ERTS_SSI_FLG_POLL_SLEEPING); - erts_check_io_interrupt(0); + erts_check_io_interrupt(ssi->esdp->pollset, 0); } while (1) { @@ -3272,8 +3234,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) spincount = sched_busy_wait.tse; - tse_wait: - if (ERTS_SCHEDULER_IS_DIRTY(esdp)) dirty_sched_wall_time_change(esdp, working = 0); else if (thr_prgr_active != working) @@ -3402,7 +3362,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) else { - erts_atomic32_set_relb(&function_calls, 0); + esdp->function_calls = 0; *fcalls = 0; ASSERT(!ERTS_SCHEDULER_IS_DIRTY(esdp)); @@ -3428,7 +3388,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_CHECK_IO); - ASSERT(!erts_port_task_have_outstanding_io_tasks()); LTTNG2(scheduler_poll, esdp->no, 1); erl_sys_schedule(1); /* Might give us something to do */ @@ -3459,48 +3418,10 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) ASSERT(!(flgs & ERTS_SSI_FLG_SLEEPING)); goto sys_woken; } - - /* - * If we got new I/O tasks we aren't allowed to - * call erl_sys_schedule() until it is handled. - */ - if (erts_port_task_have_outstanding_io_tasks()) { - clear_sys_scheduling(); - /* - * Got to check that we still got I/O tasks; otherwise - * we have to continue checking for I/O... - */ - if (!prepare_for_sys_schedule(0)) { - spincount *= ERTS_SCHED_TSE_SLEEP_SPINCOUNT_FACT; - goto tse_wait; - } - } } erts_runq_lock(rq); - /* - * If we got new I/O tasks we aren't allowed to - * sleep in erl_sys_schedule(). - */ - if (erts_port_task_have_outstanding_io_tasks()) { - clear_sys_scheduling(); - - /* - * Got to check that we still got I/O tasks; otherwise - * we have to wait in erl_sys_schedule() after all... - */ - if (!prepare_for_sys_schedule(0)) { - /* - * Not allowed to wait in erl_sys_schedule; - * do tse wait instead... - */ - sched_change_waiting_sys_to_waiting(esdp->no, rq); - erts_runq_unlock(rq); - spincount = 0; - goto tse_wait; - } - } if (aux_work) { erts_runq_unlock(rq); goto sys_poll_aux_work; @@ -3517,7 +3438,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) ASSERT(!(flgs & ERTS_SSI_FLG_SLEEPING)); goto sys_woken; } - ASSERT(!erts_port_task_have_outstanding_io_tasks()); goto sys_poll_aux_work; } @@ -3532,8 +3452,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) if (thr_prgr_active) erts_thr_progress_active(esdp, thr_prgr_active = 0); - ASSERT(!erts_port_task_have_outstanding_io_tasks()); - ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_CHECK_IO); LTTNG2(scheduler_poll, esdp->no, 0); @@ -3561,7 +3479,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) erts_thr_progress_active(esdp, thr_prgr_active = 1); erts_runq_lock(rq); } - clear_sys_scheduling(); if (flgs & ~(ERTS_SSI_FLG_SUSPENDED|ERTS_SSI_FLG_MSB_EXEC)) erts_atomic32_read_band_nob(&ssi->flags, (ERTS_SSI_FLG_SUSPENDED @@ -5840,6 +5757,7 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num, esdp->run_queue = runq; if (ERTS_RUNQ_IX_IS_DIRTY(runq->ix)) { esdp->no = 0; + esdp->pollset = NULL; if (runq == ERTS_DIRTY_CPU_RUNQ) esdp->type = ERTS_SCHED_DIRTY_CPU; else { @@ -5859,6 +5777,7 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num, esdp->type = ERTS_SCHED_NORMAL; esdp->no = (Uint) num; esdp->dirty_no = 0; + esdp->pollset = erts_get_pollset(num); runq->scheduler = esdp; } esdp->dirty_shadow_process = shadow_proc; @@ -5871,6 +5790,9 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num, shadow_proc->static_flags = ERTS_STC_FLG_SHADOW_PROC; } + esdp->function_calls = 0; + + ssi->esdp = esdp; esdp->ssi = ssi; esdp->current_process = NULL; esdp->current_port = NULL; @@ -6204,13 +6126,9 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online erts_atomic32_set_nob(&schdlr_sspnd.changing, set_schdlr_sspnd_change_flags); - erts_atomic32_init_nob(&doing_sys_schedule, 0); - init_misc_aux_work(); - erts_atomic32_init_nob(&function_calls, 0); - /* init port tasks */ erts_port_task_init(); @@ -7379,7 +7297,7 @@ msb_scheduler_type_switch(ErtsSchedType sched_type, calls = ERTS_MODIFIED_TIMING_INPUT_REDS + 1; else calls = INPUT_REDUCTIONS + 1; - erts_atomic32_set_nob(&function_calls, calls); + esdp->function_calls = calls; if ((nrml_prio == ERTS_MSB_NONE_PRIO_BIT) & ((dcpu_prio != ERTS_MSB_NONE_PRIO_BIT) @@ -9769,7 +9687,7 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) } rq = erts_get_runq_current(esdp); ASSERT(esdp); - fcalls = (int) erts_atomic32_read_acqb(&function_calls); + fcalls = esdp->function_calls; actual_reds = reds = 0; erts_runq_lock(rq); } else { @@ -9795,7 +9713,8 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) reds = ERTS_PROC_MIN_CONTEXT_SWITCH_REDS_COST; esdp->virtual_reds = 0; - fcalls = (int) erts_atomic32_add_read_acqb(&function_calls, reds); + esdp->function_calls += reds; + fcalls = esdp->function_calls; ASSERT(esdp && esdp == erts_get_scheduler_data()); rq = erts_get_runq_current(esdp); @@ -10044,11 +9963,11 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) ERTS_MSACC_PUSH_STATE_CACHED_M(); - erts_atomic32_set_relb(&function_calls, 0); + esdp->function_calls = 0; fcalls = 0; #if 0 /* Not needed since we wont wait in sys schedule */ - erts_check_io_interrupt(0); + erts_check_io_interrupt(esdp->pollset, 0); #endif erts_runq_unlock(rq); @@ -10063,7 +9982,6 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) erts_bump_timers(esdp->timer_wheel, current_time); erts_runq_lock(rq); - clear_sys_scheduling(); goto continue_check_activities_to_run; } @@ -10079,29 +9997,9 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) flags = ERTS_RUNQ_FLGS_GET_NOB(rq); if (flags & PORT_BIT) { - int have_outstanding_io; - have_outstanding_io = erts_port_task_execute(rq, &esdp->current_port); - if ((!erts_eager_check_io - && have_outstanding_io - && fcalls > 2*input_reductions) - || (flags & ERTS_RUNQ_FLG_HALTING)) { - /* - * If we have performed more than 2*INPUT_REDUCTIONS since - * last call to erl_sys_schedule() and we still haven't - * handled all I/O tasks we stop running processes and - * focus completely on ports. - * - * One could argue that this is a strange behavior. The - * reason for doing it this way is that it is similar - * to the behavior before port tasks were introduced. - * We don't want to change the behavior too much, at - * least not at the time of writing. This behavior - * might change in the future. - * - * /rickard - */ - goto check_activities_to_run; - } + erts_port_task_execute(rq, &esdp->current_port); + if (flags & ERTS_RUNQ_FLG_HALTING) + goto check_activities_to_run; } /* diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index e63da2d9db..b372146846 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -364,6 +364,7 @@ typedef struct { } ErtsSchedulerSleepList; struct ErtsSchedulerSleepInfo_ { + struct ErtsSchedulerData_ *esdp; ErtsSchedulerSleepInfo *next; ErtsSchedulerSleepInfo *prev; erts_atomic32_t flags; @@ -631,6 +632,8 @@ struct ErtsSchedulerData_ { void *match_pseudo_process; /* erl_db_util.c:db_prog_match() */ Process *free_process; ErtsThrPrgrData thr_progress_data; + struct pollset_info* pollset; + int function_calls; ErtsSchedulerSleepInfo *ssi; Process *current_process; ErtsSchedType type; diff --git a/erts/emulator/beam/erl_vm.h b/erts/emulator/beam/erl_vm.h index 076767c7cd..82977e62ea 100644 --- a/erts/emulator/beam/erl_vm.h +++ b/erts/emulator/beam/erl_vm.h @@ -46,7 +46,7 @@ */ #define ERTS_X_REGS_ALLOCATED (MAX_REG+3) -#define INPUT_REDUCTIONS (2 * CONTEXT_REDS) +#define INPUT_REDUCTIONS (CONTEXT_REDS / 4) #define H_DEFAULT_SIZE 233 /* default (heap + stack) min size */ #define VH_DEFAULT_SIZE 32768 /* default virtual (bin) heap min size (words) */ -- cgit v1.2.3