diff options
author | Sverker Eriksson <[email protected]> | 2017-05-12 18:05:03 +0200 |
---|---|---|
committer | Lukas Larsson <[email protected]> | 2017-10-02 10:34:26 +0200 |
commit | 9a0970257aaaf9d343f8045548a34abf30dc0c92 (patch) | |
tree | c46a7a7e5acf262f11099d00f1b42bed89f5590b /erts | |
parent | 48e77453536e49b07ddb6be63ba322ddaa5dac45 (diff) | |
download | otp-9a0970257aaaf9d343f8045548a34abf30dc0c92.tar.gz otp-9a0970257aaaf9d343f8045548a34abf30dc0c92.tar.bz2 otp-9a0970257aaaf9d343f8045548a34abf30dc0c92.zip |
erts: Add multiple poll sets
Diffstat (limited to 'erts')
-rw-r--r-- | erts/emulator/beam/erl_port_task.c | 37 | ||||
-rw-r--r-- | erts/emulator/beam/erl_port_task.h | 22 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 144 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.h | 3 | ||||
-rw-r--r-- | erts/emulator/beam/erl_vm.h | 2 | ||||
-rw-r--r-- | erts/emulator/sys/common/erl_check_io.c | 734 | ||||
-rw-r--r-- | erts/emulator/sys/common/erl_check_io.h | 27 | ||||
-rw-r--r-- | erts/emulator/sys/common/erl_poll.h | 1 | ||||
-rw-r--r-- | erts/emulator/sys/common/erl_sys_common_misc.c | 8 | ||||
-rw-r--r-- | erts/emulator/sys/unix/sys.c | 6 | ||||
-rw-r--r-- | erts/emulator/sys/unix/sys_drivers.c | 11 | ||||
-rw-r--r-- | erts/emulator/test/driver_SUITE.erl | 56 | ||||
-rw-r--r-- | erts/emulator/test/z_SUITE.erl | 2 |
13 files changed, 543 insertions, 510 deletions
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c index bdce811847..141e815d1f 100644 --- a/erts/emulator/beam/erl_port_task.c +++ b/erts/emulator/beam/erl_port_task.c @@ -91,8 +91,6 @@ static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_q erts_atomic_read_nob(&(PP)->run_queue))); \ } while (0) -erts_atomic_t erts_port_task_outstanding_io_tasks; - #define ERTS_PT_STATE_SCHEDULED 0 #define ERTS_PT_STATE_ABORTED 1 #define ERTS_PT_STATE_EXECUTING 2 @@ -584,9 +582,12 @@ static ERTS_INLINE void reset_executed_io_task_handle(ErtsPortTask *ptp) { if (ptp->u.alive.handle) { + ErtsIoTask *itp = ErtsContainerStruct(ptp->u.alive.handle, ErtsIoTask, task); + ASSERT(ptp == handle2task(ptp->u.alive.handle)); erts_io_notify_port_task_executed(ptp->u.alive.handle); reset_port_task_handle(ptp->u.alive.handle); + erts_check_io_interrupt(itp->pollset, 1); } } @@ -1308,20 +1309,7 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp) if (old_state != ERTS_PT_STATE_SCHEDULED) res = - 1; /* Task already aborted, executing, or executed */ else { - reset_port_task_handle(pthp); - - switch (ptp->type) { - case ERTS_PORT_TASK_INPUT: - case ERTS_PORT_TASK_OUTPUT: - ASSERT(erts_atomic_read_nob( - &erts_port_task_outstanding_io_tasks) > 0); - erts_atomic_dec_relb(&erts_port_task_outstanding_io_tasks); - break; - default: - break; - } - res = 0; } } @@ -1458,7 +1446,6 @@ erts_port_task_schedule(Eterm id, va_start(argp, type); ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent); va_end(argp); - erts_atomic_inc_relb(&erts_port_task_outstanding_io_tasks); break; } case ERTS_PORT_TASK_PROC_SIG: { @@ -1634,13 +1621,12 @@ erts_port_task_free_port(Port *pp) * scheduling of processes. Run-queue lock should be held by caller. */ -int +void erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) { Port *pp; ErtsPortTask *execq; int processing_busy_q; - int res = 0; int vreds = 0; int reds = 0; erts_aint_t io_tasks_executed = 0; @@ -1655,7 +1641,6 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) pp = pop_port(runq); if (!pp) { - res = 0; goto done; } @@ -1822,14 +1807,6 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) erts_unblock_fpe(fpe_was_unmasked); ERTS_MSACC_POP_STATE_M(); - - if (io_tasks_executed) { - ASSERT(erts_atomic_read_nob(&erts_port_task_outstanding_io_tasks) - >= io_tasks_executed); - erts_atomic_add_relb(&erts_port_task_outstanding_io_tasks, - -1*io_tasks_executed); - } - ASSERT(runq == (ErtsRunQueue *) erts_atomic_read_nob(&pp->run_queue)); active = finalize_exec(pp, &execq, processing_busy_q); @@ -1870,15 +1847,11 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) } done: - res = (erts_atomic_read_nob(&erts_port_task_outstanding_io_tasks) - != (erts_aint_t) 0); runq->scheduler->reductions += reds; ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq)); ERTS_PORT_REDUCTIONS_EXECUTED(esdp, runq, reds); - - return res; } static void @@ -2123,8 +2096,6 @@ erts_dequeue_port(ErtsRunQueue *rq) void erts_port_task_init(void) { - erts_atomic_init_nob(&erts_port_task_outstanding_io_tasks, - (erts_aint_t) 0); init_port_task_alloc(); init_busy_caller_table_alloc(); } diff --git a/erts/emulator/beam/erl_port_task.h b/erts/emulator/beam/erl_port_task.h index ffd42c9bab..ce63e4a7c0 100644 --- a/erts/emulator/beam/erl_port_task.h +++ b/erts/emulator/beam/erl_port_task.h @@ -61,11 +61,6 @@ typedef enum { ERTS_PORT_TASK_PROC_SIG } ErtsPortTaskType; -#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS -/* NOTE: Do not access any of the exported variables directly */ -extern erts_atomic_t erts_port_task_outstanding_io_tasks; -#endif - #define ERTS_PTS_FLG_IN_RUNQ (((erts_aint32_t) 1) << 0) #define ERTS_PTS_FLG_EXEC (((erts_aint32_t) 1) << 1) #define ERTS_PTS_FLG_HAVE_TASKS (((erts_aint32_t) 1) << 2) @@ -139,10 +134,6 @@ ERTS_GLB_INLINE void erts_port_task_sched_unlock(ErtsPortTaskSched *ptsp); ERTS_GLB_INLINE int erts_port_task_sched_lock_is_locked(ErtsPortTaskSched *ptsp); ERTS_GLB_INLINE void erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched *ptsp); -#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS -ERTS_GLB_INLINE int erts_port_task_have_outstanding_io_tasks(void); -#endif - #if ERTS_GLB_INLINE_INCL_FUNC_DEF ERTS_GLB_INLINE void @@ -220,21 +211,10 @@ erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched *ptsp) erts_atomic32_read_bor_nob(&ptsp->flags, ERTS_PTS_FLG_EXITING); } -#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS - -ERTS_GLB_INLINE int -erts_port_task_have_outstanding_io_tasks(void) -{ - return (erts_atomic_read_acqb(&erts_port_task_outstanding_io_tasks) - != 0); -} - -#endif /* ERTS_INCLUDE_SCHEDULER_INTERNALS */ - #endif #ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS -int erts_port_task_execute(ErtsRunQueue *, Port **); +void erts_port_task_execute(ErtsRunQueue *, Port **); void erts_port_task_init(void); #endif diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 5d33c9f6d6..2a34410d5b 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -365,9 +365,6 @@ erts_sched_stat_t erts_sched_stat; static erts_tsd_key_t ERTS_WRITE_UNLIKELY(sched_data_key); -static erts_atomic32_t function_calls; - -static erts_atomic32_t doing_sys_schedule; static erts_atomic32_t no_empty_run_queues; long erts_runq_supervision_interval = 0; static ethr_event runq_supervision_event; @@ -1558,18 +1555,19 @@ erts_psd_set_init(Process *p, int ix, void *data) void -erts_sched_finish_poke(ErtsSchedulerSleepInfo *ssi, erts_aint32_t flags) +erts_sched_finish_poke(ErtsSchedulerSleepInfo *ssi, + erts_aint32_t flags) { switch (flags & ERTS_SSI_FLGS_SLEEP_TYPE) { case ERTS_SSI_FLG_POLL_SLEEPING: - erts_check_io_interrupt(1); + erts_check_io_interrupt(ssi->esdp->pollset, 1); break; case ERTS_SSI_FLG_POLL_SLEEPING|ERTS_SSI_FLG_TSE_SLEEPING: /* * Thread progress blocking while poll sleeping; need * to signal on both... */ - erts_check_io_interrupt(1); + erts_check_io_interrupt(ssi->esdp->pollset, 1); /* fall through */ case ERTS_SSI_FLG_TSE_SLEEPING: erts_tse_set(ssi->event); @@ -2869,50 +2867,14 @@ erts_active_schedulers(void) return as; } - -static ERTS_INLINE void -clear_sys_scheduling(void) -{ - erts_atomic32_set_mb(&doing_sys_schedule, 0); -} - -static ERTS_INLINE int -try_set_sys_scheduling(void) -{ - return 0 == erts_atomic32_cmpxchg_acqb(&doing_sys_schedule, 1, 0); -} - - static ERTS_INLINE int prepare_for_sys_schedule(int non_blocking) { - if (non_blocking && erts_eager_check_io) { - return try_set_sys_scheduling(); - } - else { - while (!erts_port_task_have_outstanding_io_tasks() - && try_set_sys_scheduling()) { - if (!erts_port_task_have_outstanding_io_tasks()) - return 1; - clear_sys_scheduling(); - } - return 0; - } + return 1; } static ERTS_INLINE void -sched_change_waiting_sys_to_waiting(Uint no, ErtsRunQueue *rq) -{ - ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq)); - - ASSERT(!ERTS_RUNQ_IX_IS_DIRTY(rq->ix)); - - ASSERT(rq->waiting < 0); - rq->waiting *= -1; -} - -static ERTS_INLINE void sched_waiting(Uint no, ErtsRunQueue *rq) { ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq)); @@ -3083,7 +3045,7 @@ sched_set_sleeptype(ErtsSchedulerSleepInfo *ssi, erts_aint32_t sleep_type) erts_tse_reset(ssi->event); else { ASSERT(sleep_type == ERTS_SSI_FLG_POLL_SLEEPING); - erts_check_io_interrupt(0); + erts_check_io_interrupt(ssi->esdp->pollset, 0); } while (1) { @@ -3272,8 +3234,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) spincount = sched_busy_wait.tse; - tse_wait: - if (ERTS_SCHEDULER_IS_DIRTY(esdp)) dirty_sched_wall_time_change(esdp, working = 0); else if (thr_prgr_active != working) @@ -3402,7 +3362,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) else { - erts_atomic32_set_relb(&function_calls, 0); + esdp->function_calls = 0; *fcalls = 0; ASSERT(!ERTS_SCHEDULER_IS_DIRTY(esdp)); @@ -3428,7 +3388,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_CHECK_IO); - ASSERT(!erts_port_task_have_outstanding_io_tasks()); LTTNG2(scheduler_poll, esdp->no, 1); erl_sys_schedule(1); /* Might give us something to do */ @@ -3459,48 +3418,10 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) ASSERT(!(flgs & ERTS_SSI_FLG_SLEEPING)); goto sys_woken; } - - /* - * If we got new I/O tasks we aren't allowed to - * call erl_sys_schedule() until it is handled. - */ - if (erts_port_task_have_outstanding_io_tasks()) { - clear_sys_scheduling(); - /* - * Got to check that we still got I/O tasks; otherwise - * we have to continue checking for I/O... - */ - if (!prepare_for_sys_schedule(0)) { - spincount *= ERTS_SCHED_TSE_SLEEP_SPINCOUNT_FACT; - goto tse_wait; - } - } } erts_runq_lock(rq); - /* - * If we got new I/O tasks we aren't allowed to - * sleep in erl_sys_schedule(). - */ - if (erts_port_task_have_outstanding_io_tasks()) { - clear_sys_scheduling(); - - /* - * Got to check that we still got I/O tasks; otherwise - * we have to wait in erl_sys_schedule() after all... - */ - if (!prepare_for_sys_schedule(0)) { - /* - * Not allowed to wait in erl_sys_schedule; - * do tse wait instead... - */ - sched_change_waiting_sys_to_waiting(esdp->no, rq); - erts_runq_unlock(rq); - spincount = 0; - goto tse_wait; - } - } if (aux_work) { erts_runq_unlock(rq); goto sys_poll_aux_work; @@ -3517,7 +3438,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) ASSERT(!(flgs & ERTS_SSI_FLG_SLEEPING)); goto sys_woken; } - ASSERT(!erts_port_task_have_outstanding_io_tasks()); goto sys_poll_aux_work; } @@ -3532,8 +3452,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) if (thr_prgr_active) erts_thr_progress_active(esdp, thr_prgr_active = 0); - ASSERT(!erts_port_task_have_outstanding_io_tasks()); - ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_CHECK_IO); LTTNG2(scheduler_poll, esdp->no, 0); @@ -3561,7 +3479,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq) erts_thr_progress_active(esdp, thr_prgr_active = 1); erts_runq_lock(rq); } - clear_sys_scheduling(); if (flgs & ~(ERTS_SSI_FLG_SUSPENDED|ERTS_SSI_FLG_MSB_EXEC)) erts_atomic32_read_band_nob(&ssi->flags, (ERTS_SSI_FLG_SUSPENDED @@ -5840,6 +5757,7 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num, esdp->run_queue = runq; if (ERTS_RUNQ_IX_IS_DIRTY(runq->ix)) { esdp->no = 0; + esdp->pollset = NULL; if (runq == ERTS_DIRTY_CPU_RUNQ) esdp->type = ERTS_SCHED_DIRTY_CPU; else { @@ -5859,6 +5777,7 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num, esdp->type = ERTS_SCHED_NORMAL; esdp->no = (Uint) num; esdp->dirty_no = 0; + esdp->pollset = erts_get_pollset(num); runq->scheduler = esdp; } esdp->dirty_shadow_process = shadow_proc; @@ -5871,6 +5790,9 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num, shadow_proc->static_flags = ERTS_STC_FLG_SHADOW_PROC; } + esdp->function_calls = 0; + + ssi->esdp = esdp; esdp->ssi = ssi; esdp->current_process = NULL; esdp->current_port = NULL; @@ -6204,13 +6126,9 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online erts_atomic32_set_nob(&schdlr_sspnd.changing, set_schdlr_sspnd_change_flags); - erts_atomic32_init_nob(&doing_sys_schedule, 0); - init_misc_aux_work(); - erts_atomic32_init_nob(&function_calls, 0); - /* init port tasks */ erts_port_task_init(); @@ -7379,7 +7297,7 @@ msb_scheduler_type_switch(ErtsSchedType sched_type, calls = ERTS_MODIFIED_TIMING_INPUT_REDS + 1; else calls = INPUT_REDUCTIONS + 1; - erts_atomic32_set_nob(&function_calls, calls); + esdp->function_calls = calls; if ((nrml_prio == ERTS_MSB_NONE_PRIO_BIT) & ((dcpu_prio != ERTS_MSB_NONE_PRIO_BIT) @@ -9769,7 +9687,7 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) } rq = erts_get_runq_current(esdp); ASSERT(esdp); - fcalls = (int) erts_atomic32_read_acqb(&function_calls); + fcalls = esdp->function_calls; actual_reds = reds = 0; erts_runq_lock(rq); } else { @@ -9795,7 +9713,8 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) reds = ERTS_PROC_MIN_CONTEXT_SWITCH_REDS_COST; esdp->virtual_reds = 0; - fcalls = (int) erts_atomic32_add_read_acqb(&function_calls, reds); + esdp->function_calls += reds; + fcalls = esdp->function_calls; ASSERT(esdp && esdp == erts_get_scheduler_data()); rq = erts_get_runq_current(esdp); @@ -10044,11 +9963,11 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) ERTS_MSACC_PUSH_STATE_CACHED_M(); - erts_atomic32_set_relb(&function_calls, 0); + esdp->function_calls = 0; fcalls = 0; #if 0 /* Not needed since we wont wait in sys schedule */ - erts_check_io_interrupt(0); + erts_check_io_interrupt(esdp->pollset, 0); #endif erts_runq_unlock(rq); @@ -10063,7 +9982,6 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) erts_bump_timers(esdp->timer_wheel, current_time); erts_runq_lock(rq); - clear_sys_scheduling(); goto continue_check_activities_to_run; } @@ -10079,29 +9997,9 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) flags = ERTS_RUNQ_FLGS_GET_NOB(rq); if (flags & PORT_BIT) { - int have_outstanding_io; - have_outstanding_io = erts_port_task_execute(rq, &esdp->current_port); - if ((!erts_eager_check_io - && have_outstanding_io - && fcalls > 2*input_reductions) - || (flags & ERTS_RUNQ_FLG_HALTING)) { - /* - * If we have performed more than 2*INPUT_REDUCTIONS since - * last call to erl_sys_schedule() and we still haven't - * handled all I/O tasks we stop running processes and - * focus completely on ports. - * - * One could argue that this is a strange behavior. The - * reason for doing it this way is that it is similar - * to the behavior before port tasks were introduced. - * We don't want to change the behavior too much, at - * least not at the time of writing. This behavior - * might change in the future. - * - * /rickard - */ - goto check_activities_to_run; - } + erts_port_task_execute(rq, &esdp->current_port); + if (flags & ERTS_RUNQ_FLG_HALTING) + goto check_activities_to_run; } /* diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index e63da2d9db..b372146846 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -364,6 +364,7 @@ typedef struct { } ErtsSchedulerSleepList; struct ErtsSchedulerSleepInfo_ { + struct ErtsSchedulerData_ *esdp; ErtsSchedulerSleepInfo *next; ErtsSchedulerSleepInfo *prev; erts_atomic32_t flags; @@ -631,6 +632,8 @@ struct ErtsSchedulerData_ { void *match_pseudo_process; /* erl_db_util.c:db_prog_match() */ Process *free_process; ErtsThrPrgrData thr_progress_data; + struct pollset_info* pollset; + int function_calls; ErtsSchedulerSleepInfo *ssi; Process *current_process; ErtsSchedType type; diff --git a/erts/emulator/beam/erl_vm.h b/erts/emulator/beam/erl_vm.h index 076767c7cd..82977e62ea 100644 --- a/erts/emulator/beam/erl_vm.h +++ b/erts/emulator/beam/erl_vm.h @@ -46,7 +46,7 @@ */ #define ERTS_X_REGS_ALLOCATED (MAX_REG+3) -#define INPUT_REDUCTIONS (2 * CONTEXT_REDS) +#define INPUT_REDUCTIONS (CONTEXT_REDS / 4) #define H_DEFAULT_SIZE 233 /* default (heap + stack) min size */ #define VH_DEFAULT_SIZE 32768 /* default virtual (bin) heap min size (words) */ diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c index 52584e4246..ff0f3ea121 100644 --- a/erts/emulator/sys/common/erl_check_io.c +++ b/erts/emulator/sys/common/erl_check_io.c @@ -94,6 +94,7 @@ static struct pollset_info { ErtsPollSet ps; erts_atomic_t in_poll_wait; /* set while doing poll */ + erts_atomic_t check_io_time; struct { int six; /* start index */ int eix; /* end index */ @@ -102,7 +103,7 @@ static struct pollset_info ErtsSysFdType *array; } active_fd; erts_atomic_t removed_list; /* struct removed_fd* */ -}pollset; +}*pollsetv; #define NUM_OF_POLLSETS 1 @@ -114,21 +115,39 @@ int ERTS_CIO_EXPORT(enif_select)(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags Uint ERTS_CIO_EXPORT(erts_check_io_size)(void); Eterm ERTS_CIO_EXPORT(erts_check_io_info)(void *); int ERTS_CIO_EXPORT(erts_check_io_max_files)(void); -void ERTS_CIO_EXPORT(erts_check_io_interrupt)(int); -void ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(int, ErtsMonotonicTime); +void ERTS_CIO_EXPORT(erts_check_io_interrupt)(struct pollset_info*, int); +void ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(struct pollset_info*, int, ErtsMonotonicTime); void ERTS_CIO_EXPORT(erts_check_io)(int); +struct pollset_info* ERTS_CIO_EXPORT(erts_get_pollset)(int); int ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *); +void ERTS_CIO_EXPORT(erts_io_notify_port_task_executed)(ErtsPortTaskHandle *pthp); #ifdef ERTS_ENABLE_LOCK_COUNT void ERTS_CIO_EXPORT(erts_lcnt_update_cio_locks)(int enable); #endif #endif +/* ToDo: Was inline in erl_check_io.h but now need struct pollset_info */ +void +ERTS_CIO_EXPORT(erts_io_notify_port_task_executed)(ErtsPortTaskHandle *pthp) +{ + ErtsIoTask *itp = ErtsContainerStruct(pthp, ErtsIoTask, task); + erts_aint_t ci_time = erts_atomic_read_acqb(&itp->pollset->check_io_time); + erts_atomic_set_relb(&itp->executed_time, ci_time); +} + + +struct pollset_info* ERTS_CIO_EXPORT(erts_get_pollset)(int sched_nr) +{ + ASSERT(sched_nr > 0 && sched_nr <= erts_no_schedulers); + return &pollsetv[sched_nr - 1]; +} typedef struct { #ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS SafeHashBucket hb; #endif ErtsSysFdType fd; + struct pollset_info *pollset; struct { ErtsDrvSelectDataState *select; /* ERTS_EV_TYPE_DRV_SEL */ ErtsNifSelectDataState *nif; /* ERTS_EV_TYPE_NIF */ @@ -209,6 +228,7 @@ static ERTS_INLINE ErtsDrvEventState* hash_new_drv_ev_state(ErtsSysFdType fd) { ErtsDrvEventState tmpl; tmpl.fd = fd; + tmpl.pollset = NULL; tmpl.driver.select = NULL; tmpl.driver.nif = NULL; tmpl.driver.stop.drv_ptr = NULL; @@ -252,10 +272,11 @@ steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErtsResource*, ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(removed_fd, struct removed_fd, 64, ERTS_ALC_T_FD_LIST) static ERTS_INLINE void -init_iotask(ErtsIoTask *io_task) +init_iotask(ErtsIoTask *io_task, struct pollset_info* psi) { erts_port_task_handle_init(&io_task->task); erts_atomic_init_nob(&io_task->executed_time, ~((erts_aint_t) 0)); + io_task->pollset = psi; } static ERTS_INLINE int @@ -269,14 +290,14 @@ is_iotask_active(ErtsIoTask *io_task, erts_aint_t current_cio_time) } static ERTS_INLINE ErtsDrvSelectDataState * -alloc_drv_select_data(void) +alloc_drv_select_data(struct pollset_info* psi) { ErtsDrvSelectDataState *dsp = erts_alloc(ERTS_ALC_T_DRV_SEL_D_STATE, sizeof(ErtsDrvSelectDataState)); dsp->inport = NIL; dsp->outport = NIL; - init_iotask(&dsp->iniotask); - init_iotask(&dsp->outiotask); + init_iotask(&dsp->iniotask, psi); + init_iotask(&dsp->outiotask, psi); return dsp; } @@ -307,11 +328,11 @@ free_nif_select_data(ErtsNifSelectDataState *dsp) } static ERTS_INLINE void -remember_removed(ErtsDrvEventState *state, struct pollset_info* psi) +remember_removed(ErtsDrvEventState *state) { struct removed_fd *fdlp; ERTS_LC_ASSERT(erts_lc_mtx_is_locked(fd_mtx(state->fd))); - if (erts_atomic_read_nob(&psi->in_poll_wait)) { + if (erts_atomic_read_nob(&state->pollset->in_poll_wait)) { erts_aint_t was_next, exp_next; state->remove_cnt++; ASSERT(state->remove_cnt > 0); @@ -324,11 +345,11 @@ remember_removed(ErtsDrvEventState *state, struct pollset_info* psi) #endif /* Lockless atomic insertion in removed_list */ - was_next = erts_atomic_read_acqb(&psi->removed_list); + was_next = erts_atomic_read_acqb(&state->pollset->removed_list); do { exp_next = was_next; fdlp->next = (struct removed_fd*) exp_next; - was_next = erts_atomic_cmpxchg_mb(&psi->removed_list, + was_next = erts_atomic_cmpxchg_mb(&state->pollset->removed_list, (erts_aint_t) fdlp, exp_next); }while (was_next != exp_next); @@ -384,7 +405,7 @@ forget_removed(struct pollset_info* psi) state->driver.stop.resource = NULL; ASSERT(resource); state->type = ERTS_EV_TYPE_NONE; - state->flags &= ~ERTS_EV_FLAG_USED; + state->flags = 0; goto case_ERTS_EV_TYPE_NONE; case ERTS_EV_TYPE_STOP_USE: @@ -392,11 +413,12 @@ forget_removed(struct pollset_info* psi) drv_ptr = state->driver.stop.drv_ptr; ASSERT(drv_ptr); state->type = ERTS_EV_TYPE_NONE; - state->flags &= ~ERTS_EV_FLAG_USED; + state->flags = 0; state->driver.stop.drv_ptr = NULL; /* Fall through */ case ERTS_EV_TYPE_NONE: case_ERTS_EV_TYPE_NONE: + state->pollset = NULL; #ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS hash_erase_drv_ev_state(state); #endif @@ -455,6 +477,7 @@ grow_drv_ev_state(int min_ix) sizeof(ErtsDrvEventState)*new_len)); for (i = old_len; i < new_len; i++) { drv_ev_state.v[i].fd = (ErtsSysFdType) i; + drv_ev_state.v[i].pollset = NULL; drv_ev_state.v[i].driver.select = NULL; drv_ev_state.v[i].driver.stop.drv_ptr = NULL; drv_ev_state.v[i].driver.nif = NULL; @@ -541,7 +564,9 @@ deselect(ErtsDrvEventState *state, int mode) } } - state->events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, rm_events, 0, &do_wake); + ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, rm_events, + 0, &do_wake); + state->events &= ~rm_events; if (!(state->events)) { switch (state->type) { @@ -565,8 +590,8 @@ deselect(ErtsDrvEventState *state, int mode) } state->type = ERTS_EV_TYPE_NONE; - state->flags &= ~ERTS_EV_FLAG_USED; - remember_removed(state, &pollset); + state->flags = 0; + remember_removed(state); } } @@ -585,7 +610,7 @@ check_fd_cleanup(ErtsDrvEventState *state, ERTS_LC_ASSERT(erts_lc_mtx_is_locked(fd_mtx(state->fd))); - current_cio_time = erts_atomic_read_acqb(&erts_check_io_time); + current_cio_time = erts_atomic_read_acqb(&state->pollset->check_io_time); *free_select = NULL; if (state->driver.select && (state->type != ERTS_EV_TYPE_DRV_SEL) @@ -602,19 +627,27 @@ check_fd_cleanup(ErtsDrvEventState *state, state->driver.nif = NULL; } -#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS if (((state->type != ERTS_EV_TYPE_NONE) | state->remove_cnt + | (state->driver.nif != NULL) | (state->driver.select != NULL)) == 0) { + state->pollset = NULL; +#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS hash_erase_drv_ev_state(state); - - } #endif + } } +#ifdef __WIN32__ +# define MUST_DEFER(MAY_SLEEP) 1 +#else +# define MUST_DEFER(MAY_SLEEP) (MAY_SLEEP) +#endif + static ERTS_INLINE int -check_cleanup_active_fd(ErtsSysFdType fd, +check_cleanup_active_fd(struct pollset_info* psi, + ErtsSysFdType fd, #if ERTS_CIO_DEFER_ACTIVE_EVENTS ErtsPollControlEntry *pce, int *pce_ix, @@ -637,14 +670,15 @@ check_cleanup_active_fd(ErtsSysFdType fd, state = &drv_ev_state.v[(int) fd]; #else state = hash_get_drv_ev_state(fd); /* may be NULL! */ - if (state) #endif + if (state && state->pollset == psi) { if (state->driver.select) { #if ERTS_CIO_DEFER_ACTIVE_EVENTS if (is_iotask_active(&state->driver.select->iniotask, current_cio_time)) { active = 1; - if ((state->events & ERTS_POLL_EV_IN) + if (MUST_DEFER(may_sleep) + && (state->events & ERTS_POLL_EV_IN) && !(state->flags & ERTS_EV_FLAG_DEFER_IN_EV)) { evoff |= ERTS_POLL_EV_IN; state->flags |= ERTS_EV_FLAG_DEFER_IN_EV; @@ -657,15 +691,17 @@ check_cleanup_active_fd(ErtsSysFdType fd, } if (is_iotask_active(&state->driver.select->outiotask, current_cio_time)) { active = 1; - if ((state->events & ERTS_POLL_EV_OUT) + if (MUST_DEFER(may_sleep) + && (state->events & ERTS_POLL_EV_OUT) && !(state->flags & ERTS_EV_FLAG_DEFER_OUT_EV)) { evoff |= ERTS_POLL_EV_OUT; state->flags |= ERTS_EV_FLAG_DEFER_OUT_EV; } } else if (state->flags & ERTS_EV_FLAG_DEFER_OUT_EV) { - if (state->events & ERTS_POLL_EV_OUT) + if (state->events & ERTS_POLL_EV_OUT) { evon |= ERTS_POLL_EV_OUT; + } state->flags &= ~ERTS_EV_FLAG_DEFER_OUT_EV; } if (active) @@ -681,6 +717,12 @@ check_cleanup_active_fd(ErtsSysFdType fd, free_select = state->driver.select; state->driver.select = NULL; } +#if ERTS_CIO_DEFER_ACTIVE_EVENTS + if (evon) { + int do_wake = 0; + ERTS_CIO_POLL_CTL(psi->ps, state->fd, evon, 1, &do_wake); + } +#endif } if (state->driver.nif) { @@ -705,7 +747,7 @@ check_cleanup_active_fd(ErtsSysFdType fd, } if (rm_events) { int do_wake = 0; - state->events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, + state->events = ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, rm_events, 0, &do_wake); } if (state->events) @@ -720,7 +762,6 @@ check_cleanup_active_fd(ErtsSysFdType fd, if (((state->type != ERTS_EV_TYPE_NONE) | state->remove_cnt | active) == 0) hash_erase_drv_ev_state(state); #endif - } erts_mtx_unlock(mtx); @@ -737,28 +778,23 @@ check_cleanup_active_fd(ErtsSysFdType fd, pcep->events = evoff; pcep->on = 0; } - if (evon) { - ErtsPollControlEntry *pcep = &pce[(*pce_ix)++]; - pcep->fd = fd; - pcep->events = evon; - pcep->on = 1; - } #endif return active; } static void -check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep) +check_cleanup_active_fds(struct pollset_info* psi, + erts_aint_t current_cio_time, int may_sleep) { - int six = pollset.active_fd.six; - int eix = pollset.active_fd.eix; - erts_aint32_t no = erts_atomic32_read_dirty(&pollset.active_fd.no); - int size = pollset.active_fd.size; + int six = psi->active_fd.six; + int eix = psi->active_fd.eix; + erts_aint32_t no = erts_atomic32_read_dirty(&psi->active_fd.no); + const int size = psi->active_fd.size; int ix = six; #if ERTS_CIO_DEFER_ACTIVE_EVENTS - /* every fd might add two entries */ - Uint pce_sz = 2*sizeof(ErtsPollControlEntry)*no; + /* every fd might add one entry */ + Uint pce_sz = sizeof(ErtsPollControlEntry)*no; ErtsPollControlEntry *pctrl_entries = (pce_sz ? erts_alloc(ERTS_ALC_T_TMP, pce_sz) : NULL); @@ -766,12 +802,12 @@ check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep) #endif while (ix != eix) { - ErtsSysFdType fd = pollset.active_fd.array[ix]; + ErtsSysFdType fd = psi->active_fd.array[ix]; int nix = ix + 1; if (nix >= size) nix = 0; ASSERT(fd != ERTS_SYS_FD_INVALID); - if (!check_cleanup_active_fd(fd, + if (!check_cleanup_active_fd(psi, fd, #if ERTS_CIO_DEFER_ACTIVE_EVENTS pctrl_entries, &pctrl_ix, @@ -781,14 +817,14 @@ check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep) no--; if (ix == six) { #ifdef DEBUG - pollset.active_fd.array[ix] = ERTS_SYS_FD_INVALID; + psi->active_fd.array[ix] = ERTS_SYS_FD_INVALID; #endif six = nix; } else { - pollset.active_fd.array[ix] = pollset.active_fd.array[six]; + psi->active_fd.array[ix] = psi->active_fd.array[six]; #ifdef DEBUG - pollset.active_fd.array[six] = ERTS_SYS_FD_INVALID; + psi->active_fd.array[six] = ERTS_SYS_FD_INVALID; #endif six++; if (six >= size) @@ -801,53 +837,53 @@ check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep) #if ERTS_CIO_DEFER_ACTIVE_EVENTS ASSERT(pctrl_ix <= pce_sz/sizeof(ErtsPollControlEntry)); if (pctrl_ix) - ERTS_CIO_POLL_CTLV(pollset.ps, pctrl_entries, pctrl_ix); + ERTS_CIO_POLL_CTLV(psi->ps, pctrl_entries, pctrl_ix); if (pctrl_entries) erts_free(ERTS_ALC_T_TMP, pctrl_entries); #endif - pollset.active_fd.six = six; - pollset.active_fd.eix = eix; - erts_atomic32_set_relb(&pollset.active_fd.no, no); + psi->active_fd.six = six; + psi->active_fd.eix = eix; + erts_atomic32_set_relb(&psi->active_fd.no, no); } -static void grow_active_fds(void) +static void grow_active_fds(struct pollset_info *psi) { - ASSERT(pollset.active_fd.six == pollset.active_fd.eix); - pollset.active_fd.six = 0; - pollset.active_fd.eix = pollset.active_fd.size; - pollset.active_fd.size += ERTS_ACTIVE_FD_INC; - pollset.active_fd.array = erts_realloc(ERTS_ALC_T_ACTIVE_FD_ARR, - pollset.active_fd.array, - pollset.active_fd.size*sizeof(ErtsSysFdType)); + ASSERT(psi->active_fd.six == psi->active_fd.eix); + psi->active_fd.six = 0; + psi->active_fd.eix = psi->active_fd.size; + psi->active_fd.size += ERTS_ACTIVE_FD_INC; + psi->active_fd.array = erts_realloc(ERTS_ALC_T_ACTIVE_FD_ARR, + psi->active_fd.array, + psi->active_fd.size*sizeof(ErtsSysFdType)); #ifdef DEBUG { int i; - for (i = pollset.active_fd.eix + 1; i < pollset.active_fd.size; i++) - pollset.active_fd.array[i] = ERTS_SYS_FD_INVALID; + for (i = psi->active_fd.eix + 1; i < psi->active_fd.size; i++) + psi->active_fd.array[i] = ERTS_SYS_FD_INVALID; } #endif } static ERTS_INLINE void -add_active_fd(ErtsSysFdType fd) +add_active_fd(struct pollset_info *psi, ErtsSysFdType fd) { - int eix = pollset.active_fd.eix; - int size = pollset.active_fd.size; + int eix = psi->active_fd.eix; + const int size = psi->active_fd.size; - pollset.active_fd.array[eix] = fd; + psi->active_fd.array[eix] = fd; - erts_atomic32_set_relb(&pollset.active_fd.no, - (erts_atomic32_read_dirty(&pollset.active_fd.no) + erts_atomic32_set_relb(&psi->active_fd.no, + (erts_atomic32_read_dirty(&psi->active_fd.no) + 1)); eix++; if (eix >= size) eix = 0; - pollset.active_fd.eix = eix; + psi->active_fd.eix = eix; - if (pollset.active_fd.six == eix) { - grow_active_fds(); + if (psi->active_fd.six == eix) { + grow_active_fds(psi); } } @@ -862,9 +898,9 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, Eterm id = erts_drvport2id(ix); ErtsSysFdType fd = (ErtsSysFdType) e; ErtsPollEvents ctl_events = (ErtsPollEvents) 0; - ErtsPollEvents new_events, old_events; + ErtsPollEvents old_events; ErtsDrvEventState *state; - int wake_poller; + int wake_poller = 0; int ret; ErtsDrvSelectDataState *free_select = NULL; ErtsNifSelectDataState *free_nif = NULL; @@ -898,22 +934,25 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, state = hash_get_drv_ev_state(fd); /* may be NULL! */ #endif - if (!on && (mode&ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) { - if (IS_FD_UNKNOWN(state)) { - /* fast track to stop_select callback */ - stop_select_fn = prt->drv_ptr->stop_select; -#ifdef USE_VM_PROBES - strncpy(name, prt->drv_ptr->name, - sizeof(DTRACE_CHARBUF_NAME(name))-1); - name[sizeof(name)-1] = '\0'; -#endif - ret = 0; - goto done_unknown; - } - mode |= (ERL_DRV_READ | ERL_DRV_WRITE); - wake_poller = 1; /* to eject fd from pollset (if needed) */ + if (!on) { + if (IS_FD_UNKNOWN(state)) { + if ((mode&ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) { + /* fast track to stop_select callback */ + stop_select_fn = prt->drv_ptr->stop_select; + #ifdef USE_VM_PROBES + strncpy(name, prt->drv_ptr->name, + sizeof(DTRACE_CHARBUF_NAME(name))-1); + name[sizeof(name)-1] = '\0'; + #endif + } + ret = 0; + goto done_unknown; + } + else if ((mode&ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) { + mode |= (ERL_DRV_READ | ERL_DRV_WRITE); + wake_poller = 1; /* to eject fd from pollset (if needed) */ + } } - else wake_poller = 0; #ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS if (state == NULL) { @@ -951,7 +990,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, if (owner != id && is_not_nil(owner)) drv_select_steal(ix, state, mode, on); } - ctl_events |= ERTS_POLL_EV_IN; + ctl_events = ERTS_POLL_EV_IN; } if (mode & ERL_DRV_WRITE) { if (state->type == ERTS_EV_TYPE_DRV_SEL) { @@ -962,44 +1001,61 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, ctl_events |= ERTS_POLL_EV_OUT; } + ASSERT((state->type == ERTS_EV_TYPE_DRV_SEL) || (state->type == ERTS_EV_TYPE_NONE && !state->events)); - if (!on && !(state->flags & ERTS_EV_FLAG_USED) - && state->events && !(state->events & ~ctl_events)) { - /* Old driver removing all events. At least wake poller. - It will not make close() 100% safe but it will prevent - actions delayed by poll timeout. */ - wake_poller = 1; - } + old_events = state->events; - new_events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, ctl_events, on, &wake_poller); + if (on) { + ctl_events &= ~old_events; + state->events |= ctl_events; + } + else { + ctl_events &= old_events; + state->events &= ~ctl_events; - if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { - if (state->type == ERTS_EV_TYPE_DRV_SEL && !state->events) { - state->type = ERTS_EV_TYPE_NONE; - state->flags &= ~ERTS_EV_FLAG_USED; - state->driver.select->inport = NIL; - state->driver.select->outport = NIL; - } - ret = -1; - goto done; + if (!(state->flags & ERTS_EV_FLAG_USED) + && old_events && !state->events) { + /* + * Old driver removing all events. At least wake poller. + * It will not make close() 100% safe but it will prevent + * actions delayed by poll timeout. + */ + wake_poller = 1; + } } - old_events = state->events; + if (ctl_events) { + ErtsPollEvents new_events; - ASSERT(on - ? (new_events == (state->events | ctl_events)) - : (new_events == (state->events & ~ctl_events))); + if (!state->pollset) { + ErtsSchedulerData* esdp = erts_get_scheduler_data(); + ASSERT(esdp); + state->pollset = esdp->pollset; + } - ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL - || state->type == ERTS_EV_TYPE_NONE); + new_events = ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, ctl_events, on, &wake_poller); - state->events = new_events; - if (ctl_events) { - if (on) { + if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { + if (state->type == ERTS_EV_TYPE_DRV_SEL && !old_events) { + state->type = ERTS_EV_TYPE_NONE; + state->flags = 0; + state->driver.select->inport = NIL; + state->driver.select->outport = NIL; + } + ret = -1; + goto done; + } + + ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL + || state->type == ERTS_EV_TYPE_NONE); + } + + if (on) { + if (ctl_events) { if (!state->driver.select) - state->driver.select = alloc_drv_select_data(); + state->driver.select = alloc_drv_select_data(state->pollset); if (state->type == ERTS_EV_TYPE_NONE) state->type = ERTS_EV_TYPE_DRV_SEL; ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL); @@ -1010,8 +1066,9 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, if (mode & ERL_DRV_USE) { state->flags |= ERTS_EV_FLAG_USED; } - } - else { /* off */ + } + } + else { /* off */ if (state->type == ERTS_EV_TYPE_DRV_SEL) { if (ctl_events & ERTS_POLL_EV_IN) { abort_tasks(state, ERL_DRV_READ); @@ -1021,20 +1078,20 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, abort_tasks(state, ERL_DRV_WRITE); state->driver.select->outport = NIL; } - if (new_events == 0) { + if (state->events == 0) { if (old_events != 0) { - remember_removed(state, &pollset); + remember_removed(state); } if ((mode & ERL_DRV_USE) || !(state->flags & ERTS_EV_FLAG_USED)) { state->type = ERTS_EV_TYPE_NONE; - state->flags &= ~ERTS_EV_FLAG_USED; + state->flags = 0; } /*else keep it, as fd will probably be selected upon again */ } } if ((mode & ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) { erts_driver_t* drv_ptr = prt->drv_ptr; - ASSERT(new_events==0); + ASSERT(state->events==0); if (state->remove_cnt == 0 || !wake_poller) { /* Safe to close fd now as it is not in pollset or there was no need to eject fd (kernel poll) */ @@ -1053,7 +1110,6 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix, } } } - } } ret = 0; @@ -1093,7 +1149,7 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, ErtsResource* resource = DATA_TO_RESOURCE(obj); ErtsSysFdType fd = (ErtsSysFdType) e; ErtsPollEvents ctl_events = (ErtsPollEvents) 0; - ErtsPollEvents new_events, old_events; + ErtsPollEvents old_events; ErtsDrvEventState *state; int wake_poller; int ret; @@ -1192,32 +1248,45 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, ASSERT((state->type == ERTS_EV_TYPE_NIF) || (state->type == ERTS_EV_TYPE_NONE && !state->events)); - new_events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, ctl_events, on, &wake_poller); + old_events = state->events; - if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { - if (state->type == ERTS_EV_TYPE_NIF && !state->events) { - state->type = ERTS_EV_TYPE_NONE; - state->flags &= ~ERTS_EV_FLAG_USED; - state->driver.nif->in.pid = NIL; - state->driver.nif->out.pid = NIL; - state->driver.nif->in.ddeselect_cnt = 0; - state->driver.nif->out.ddeselect_cnt = 0; - state->driver.stop.resource = NULL; - } - ret = INT_MIN | ERL_NIF_SELECT_FAILED; - goto done; + if (on) { + ctl_events &= ~old_events; + state->events |= ctl_events; + } + else { + ctl_events &= old_events; + state->events &= ~ctl_events; } - old_events = state->events; + if (ctl_events) { + ErtsPollEvents new_events; + + if (!state->pollset) { + state->pollset = erts_get_scheduler_data()->pollset; + } - ASSERT(on - ? (new_events == (state->events | ctl_events)) - : (new_events == (state->events & ~ctl_events))); + new_events = ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, ctl_events, on, &wake_poller); + + if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) { + if (state->type == ERTS_EV_TYPE_NIF && !old_events) { + state->type = ERTS_EV_TYPE_NONE; + state->flags = 0; + state->driver.nif->in.pid = NIL; + state->driver.nif->out.pid = NIL; + state->driver.nif->in.ddeselect_cnt = 0; + state->driver.nif->out.ddeselect_cnt = 0; + state->driver.stop.resource = NULL; + } + ret = INT_MIN | ERL_NIF_SELECT_FAILED; + goto done; + } + ASSERT(new_events == state->events); + } ASSERT(state->type == ERTS_EV_TYPE_NIF || state->type == ERTS_EV_TYPE_NONE); - state->events = new_events; if (on) { const Eterm recipient = pid ? pid->pid : env->proc->common.id; Uint32* refn; @@ -1230,7 +1299,7 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, } ASSERT(state->type == ERTS_EV_TYPE_NIF); ASSERT(state->driver.stop.resource == resource); - if (ctl_events & ERTS_POLL_EV_IN) { + if (mode & ERL_DRV_READ) { state->driver.nif->in.pid = recipient; if (is_immed(ref)) { state->driver.nif->in.immed = ref; @@ -1244,7 +1313,7 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, } state->driver.nif->in.ddeselect_cnt = 0; } - if (ctl_events & ERTS_POLL_EV_OUT) { + if (mode & ERL_DRV_WRITE) { state->driver.nif->out.pid = recipient; if (is_immed(ref)) { state->driver.nif->out.immed = ref; @@ -1267,10 +1336,10 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env, state->driver.nif->in.ddeselect_cnt = 0; state->driver.nif->out.ddeselect_cnt = 0; if (old_events != 0) { - remember_removed(state, &pollset); + remember_removed(state); } } - ASSERT(new_events==0); + ASSERT(state->events==0); if (state->remove_cnt == 0 || !wake_poller) { /* * Safe to close fd now as it is not in pollset @@ -1567,7 +1636,7 @@ steal_pending_stop_use(erts_dsprintf_buf_t *dsbufp, ErlDrvPort ix, erts_ddll_dereference_driver(state->driver.stop.drv_ptr->handle); } state->type = ERTS_EV_TYPE_NONE; - state->flags &= ~ERTS_EV_FLAG_USED; + state->flags = 0; state->driver.stop.drv_ptr = NULL; } else { @@ -1606,7 +1675,7 @@ steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErtsResource* resource, enif_release_resource(state->driver.stop.resource); state->type = ERTS_EV_TYPE_NONE; - state->flags &= ~ERTS_EV_FLAG_USED; + state->flags = 0; state->driver.stop.resource = NULL; } else { @@ -1656,7 +1725,7 @@ iready(Eterm id, ErtsDrvEventState *state, erts_aint_t current_cio_time) (ErlDrvEvent) state->fd) != 0) { stale_drv_select(id, state, ERL_DRV_READ); } - add_active_fd(state->fd); + add_active_fd(state->pollset, state->fd); } } @@ -1674,7 +1743,7 @@ oready(Eterm id, ErtsDrvEventState *state, erts_aint_t current_cio_time) (ErlDrvEvent) state->fd) != 0) { stale_drv_select(id, state, ERL_DRV_WRITE); } - add_active_fd(state->fd); + add_active_fd(state->pollset, state->fd); } } @@ -1729,19 +1798,20 @@ send_event_tuple(struct erts_nif_select_event* e, ErtsResource* resource, static void bad_fd_in_pollset(ErtsDrvEventState *, Eterm inport, Eterm outport); void -ERTS_CIO_EXPORT(erts_check_io_interrupt)(int set) +ERTS_CIO_EXPORT(erts_check_io_interrupt)(struct pollset_info *psi, int set) { - ERTS_CIO_POLL_INTR(pollset.ps, set); + ERTS_CIO_POLL_INTR(psi->ps, set); } void -ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(int set, +ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(struct pollset_info *psi, + int set, ErtsMonotonicTime timeout_time) { - ERTS_CIO_POLL_INTR_TMD(pollset.ps, set, timeout_time); + ERTS_CIO_POLL_INTR_TMD(psi->ps, set, timeout_time); } -#if !ERTS_CIO_DEFER_ACTIVE_EVENTS +#ifndef __WIN32__ /* * Number of ignored events, for a lingering fd added by enif_select(), * until we deselect fd-event from pollset. @@ -1761,8 +1831,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) int poll_ret, i; erts_aint_t current_cio_time; ErtsSchedulerData *esdp = erts_get_scheduler_data(); - - ASSERT(esdp); + struct pollset_info *psi = esdp->pollset; restart: @@ -1781,24 +1850,25 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) * erts_check_io_time, since only one thread can * check io at a time. */ - current_cio_time = erts_atomic_read_dirty(&erts_check_io_time); + current_cio_time = erts_atomic_read_dirty(&psi->check_io_time); current_cio_time++; - erts_atomic_set_relb(&erts_check_io_time, current_cio_time); + erts_atomic_set_relb(&psi->check_io_time, current_cio_time); - check_cleanup_active_fds(current_cio_time, + check_cleanup_active_fds(psi, + current_cio_time, timeout_time != ERTS_POLL_NO_TIMEOUT); #ifdef ERTS_ENABLE_LOCK_CHECK erts_lc_check_exact(NULL, 0); /* No locks should be locked */ #endif - pollres_len = erts_atomic32_read_dirty(&pollset.active_fd.no) + ERTS_CHECK_IO_POLL_RES_LEN; + pollres_len = erts_atomic32_read_dirty(&psi->active_fd.no) + ERTS_CHECK_IO_POLL_RES_LEN; pollres = erts_alloc(ERTS_ALC_T_TMP, sizeof(ErtsPollResFd)*pollres_len); - erts_atomic_set_nob(&pollset.in_poll_wait, 1); + erts_atomic_set_nob(&psi->in_poll_wait, 1); - poll_ret = ERTS_CIO_POLL_WAIT(pollset.ps, pollres, &pollres_len, timeout_time); + poll_ret = ERTS_CIO_POLL_WAIT(psi->ps, pollres, &pollres_len, timeout_time); #ifdef ERTS_ENABLE_LOCK_CHECK erts_lc_check_exact(NULL, 0); /* No locks should be locked */ @@ -1810,8 +1880,8 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) #endif if (poll_ret != 0) { - erts_atomic_set_nob(&pollset.in_poll_wait, 0); - forget_removed(&pollset); + erts_atomic_set_nob(&psi->in_poll_wait, 0); + forget_removed(psi); erts_free(ERTS_ALC_T_TMP, pollres); if (poll_ret == EAGAIN) { goto restart; @@ -1838,6 +1908,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) erts_mtx_lock(fd_mtx(fd)); + #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS state = &drv_ev_state.v[ (int) fd]; #else @@ -1848,7 +1919,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) #endif /* Skip this fd if it was removed from pollset */ - if (is_removed(state)) { + if (is_removed(state) || state->pollset != psi) { goto next_pollres; } @@ -1884,7 +1955,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) bad_fd_in_pollset(state, state->driver.select->inport, state->driver.select->outport); - add_active_fd(state->fd); + add_active_fd(psi, state->fd); } break; } @@ -1915,7 +1986,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) resource = state->driver.stop.resource; state->driver.nif->out.ddeselect_cnt = ERTS_NIF_DELAYED_DESELECT; state->driver.nif->out.pid = NIL; - add_active_fd(state->fd); + add_active_fd(psi, state->fd); } else { ASSERT(state->driver.nif->out.ddeselect_cnt >= 2); @@ -1928,7 +1999,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) resource = state->driver.stop.resource; state->driver.nif->in.ddeselect_cnt = ERTS_NIF_DELAYED_DESELECT; state->driver.nif->in.pid = NIL; - add_active_fd(state->fd); + add_active_fd(psi, state->fd); } else { ASSERT(state->driver.nif->in.ddeselect_cnt >= 2); @@ -1938,7 +2009,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) } else if (revents & ERTS_POLL_EV_NVAL) { bad_fd_in_pollset(state, NIL, NIL); - add_active_fd(state->fd); + add_active_fd(psi, state->fd); } erts_mtx_unlock(fd_mtx(fd)); @@ -1963,7 +2034,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) (int) state->type); ASSERT(0); deselect(state, 0); - add_active_fd(state->fd); + add_active_fd(psi, state->fd); break; } } @@ -1973,9 +2044,9 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait) next_pollres_unlocked:; } - erts_atomic_set_nob(&pollset.in_poll_wait, 0); + erts_atomic_set_nob(&psi->in_poll_wait, 0); erts_free(ERTS_ALC_T_TMP, pollres); - forget_removed(&pollset); + forget_removed(psi); } static void @@ -2096,16 +2167,17 @@ static void drv_ev_state_free(void *des) } #endif - #ifdef ERTS_ENABLE_KERNEL_POLL struct io_functions { int (*select)(ErlDrvPort, ErlDrvEvent, int, int); int (*enif_select)(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags, void*, const ErlNifPid*, Eterm); - void (*check_io_as_interrupt)(void); - void (*check_io_interrupt)(int); - void (*check_io_interrupt_tmd)(int, ErtsMonotonicTime); + void (*check_io_as_interrupt)(struct pollset_info*); + void (*check_io_interrupt)(struct pollset_info*, int); + void (*check_io_interrupt_tmd)(struct pollset_info*, int, ErtsMonotonicTime); void (*check_io)(int); + struct pollset_info* (*get_pollset)(int sched_nr); + void (*notify_port_task_executed)(ErtsPortTaskHandle *pthp); int (*max_files)(void); Uint (*size)(void); Eterm (*info)(void *); @@ -2126,12 +2198,11 @@ extern struct io_functions erts_io_funcs; void ERTS_CIO_EXPORT(erts_init_check_io)(void) { + int j; ERTS_CT_ASSERT((INT_MIN & (ERL_NIF_SELECT_STOP_CALLED | ERL_NIF_SELECT_STOP_SCHEDULED | ERL_NIF_SELECT_INVALID_EVENT | ERL_NIF_SELECT_FAILED)) == 0); - erts_atomic_init_nob(&erts_check_io_time, 0); - erts_atomic_init_nob(&pollset.in_poll_wait, 0); #ifdef ERTS_ENABLE_KERNEL_POLL ASSERT(erts_io_funcs.select == NULL); @@ -2140,6 +2211,8 @@ ERTS_CIO_EXPORT(erts_init_check_io)(void) erts_io_funcs.check_io_interrupt = ERTS_CIO_EXPORT(erts_check_io_interrupt); erts_io_funcs.check_io_interrupt_tmd= ERTS_CIO_EXPORT(erts_check_io_interrupt_timed); erts_io_funcs.check_io = ERTS_CIO_EXPORT(erts_check_io); + erts_io_funcs.get_pollset = ERTS_CIO_EXPORT(erts_get_pollset); + erts_io_funcs.notify_port_task_executed = ERTS_CIO_EXPORT(erts_io_notify_port_task_executed); erts_io_funcs.max_files = ERTS_CIO_EXPORT(erts_check_io_max_files); erts_io_funcs.size = ERTS_CIO_EXPORT(erts_check_io_size); erts_io_funcs.info = ERTS_CIO_EXPORT(erts_check_io_info); @@ -2149,25 +2222,34 @@ ERTS_CIO_EXPORT(erts_init_check_io)(void) #endif #endif + init_removed_fd_alloc(); + ERTS_CIO_POLL_INIT(); - pollset.ps = ERTS_CIO_NEW_POLLSET(); - - pollset.active_fd.six = 0; - pollset.active_fd.eix = 0; - erts_atomic32_init_nob(&pollset.active_fd.no, 0); - pollset.active_fd.size = ERTS_ACTIVE_FD_INC; - pollset.active_fd.array = erts_alloc(ERTS_ALC_T_ACTIVE_FD_ARR, - sizeof(ErtsSysFdType)*ERTS_ACTIVE_FD_INC); + pollsetv = erts_alloc(ERTS_ALC_T_POLLSET, + sizeof(struct pollset_info)*erts_no_schedulers); + for (j=0; j < erts_no_schedulers; j++) { + struct pollset_info* psi = &pollsetv[j]; + + erts_atomic_init_nob(&psi->check_io_time, 0); + erts_atomic_init_nob(&psi->in_poll_wait, 0); + psi->ps = ERTS_CIO_NEW_POLLSET(); + psi->active_fd.six = 0; + psi->active_fd.eix = 0; + erts_atomic32_init_nob(&psi->active_fd.no, 0); + psi->active_fd.size = ERTS_ACTIVE_FD_INC; + psi->active_fd.array = erts_alloc(ERTS_ALC_T_ACTIVE_FD_ARR, + sizeof(ErtsSysFdType)*ERTS_ACTIVE_FD_INC); #ifdef DEBUG - { - int i; - for (i = 0; i < ERTS_ACTIVE_FD_INC; i++) - pollset.active_fd.array[i] = ERTS_SYS_FD_INVALID; - } + { + int i; + for (i = 0; i < ERTS_ACTIVE_FD_INC; i++) + psi->active_fd.array[i] = ERTS_SYS_FD_INVALID; + } #endif - init_removed_fd_alloc(); - erts_atomic_init_nob(&pollset.removed_list, (erts_aint_t)NULL); + erts_atomic_init_nob(&psi->removed_list, (erts_aint_t)NULL); + } + { int i; for (i=0; i<DRV_EV_STATE_LOCK_CNT; i++) { @@ -2211,10 +2293,14 @@ ERTS_CIO_EXPORT(erts_check_io_max_files)(void) Uint ERTS_CIO_EXPORT(erts_check_io_size)(void) { - Uint res; + Uint res = 0; ErtsPollInfo pi; - ERTS_CIO_POLL_INFO(pollset.ps, &pi); - res = pi.memory_size; + int i; + + for (i = 0; i < erts_no_schedulers; i++) { + ERTS_CIO_POLL_INFO(pollsetv[i].ps, &pi); + res += pi.memory_size; + } #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS res += sizeof(ErtsDrvEventState) * erts_atomic_read_nob(&drv_ev_state.len); #else @@ -2235,105 +2321,122 @@ Eterm ERTS_CIO_EXPORT(erts_check_io_info)(void *proc) { Process *p = (Process *) proc; - Eterm tags[16], values[16], res; - Uint sz, *szp, *hp, **hpp, memory_size; - Sint i; - ErtsPollInfo pi; - erts_aint_t cio_time = erts_atomic_read_acqb(&erts_check_io_time); - int active_fds = (int) erts_atomic32_read_acqb(&pollset.active_fd.no); - - while (1) { - erts_aint_t post_cio_time; - int post_active_fds; - - ERTS_CIO_POLL_INFO(pollset.ps, &pi); - - post_cio_time = erts_atomic_read_mb(&erts_check_io_time); - post_active_fds = (int) erts_atomic32_read_acqb(&pollset.active_fd.no); - if (cio_time == post_cio_time && active_fds == post_active_fds) - break; - cio_time = post_cio_time; - active_fds = post_active_fds; - } + Eterm tags[16], values[16], res, list = NIL; + Uint sz, *szp, *hp, **hpp; + ErtsPollInfo *piv; + Sint i, j; + + piv = erts_alloc(ERTS_ALC_T_TMP, + sizeof(ErtsPollInfo) * erts_no_schedulers); + + for (j = 0; j < erts_no_schedulers; j++) { + struct pollset_info *psi = &pollsetv[j]; + erts_aint_t cio_time = erts_atomic_read_acqb(&psi->check_io_time); + + piv[j].active_fds = (int) erts_atomic32_read_acqb(&psi->active_fd.no); + while (1) { + erts_aint_t post_cio_time; + int post_active_fds; + + ERTS_CIO_POLL_INFO(psi->ps, &piv[j]); + + post_cio_time = erts_atomic_read_mb(&psi->check_io_time); + post_active_fds = (int) erts_atomic32_read_acqb(&psi->active_fd.no); + if (cio_time == post_cio_time && piv[j].active_fds == post_active_fds) + break; + cio_time = post_cio_time; + piv[j].active_fds = post_active_fds; + } - memory_size = pi.memory_size; -#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - memory_size += sizeof(ErtsDrvEventState) * erts_atomic_read_nob(&drv_ev_state.len); -#else - memory_size += safe_hash_table_sz(&drv_ev_state.tab); - { - SafeHashInfo hi; - safe_hash_get_info(&hi, &drv_ev_state.tab); - memory_size += hi.objs * sizeof(ErtsDrvEventState); + #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS + piv[j].memory_size += sizeof(ErtsDrvEventState) * erts_atomic_read_nob(&drv_ev_state.len); + #else + piv[j].memory_size += safe_hash_table_sz(&drv_ev_state.tab); + { + SafeHashInfo hi; + safe_hash_get_info(&hi, &drv_ev_state.tab); + piv[j].memory_size += hi.objs * sizeof(ErtsDrvEventState); + } + erts_spin_lock(&drv_ev_state.prealloc_lock); + piv[j].memory_size += drv_ev_state.num_prealloc * sizeof(ErtsDrvEventState); + erts_spin_unlock(&drv_ev_state.prealloc_lock); + #endif } - erts_spin_lock(&drv_ev_state.prealloc_lock); - memory_size += drv_ev_state.num_prealloc * sizeof(ErtsDrvEventState); - erts_spin_unlock(&drv_ev_state.prealloc_lock); -#endif hpp = NULL; szp = &sz; sz = 0; bld_it: - i = 0; - tags[i] = erts_bld_atom(hpp, szp, "name"); - values[i++] = erts_bld_atom(hpp, szp, "erts_poll"); + for (j = erts_no_schedulers-1; j >= 0; j--) { + i = 0; - tags[i] = erts_bld_atom(hpp, szp, "primary"); - values[i++] = erts_bld_atom(hpp, szp, pi.primary); + tags[i] = erts_bld_atom(hpp, szp, "name"); + values[i++] = erts_bld_atom(hpp, szp, "erts_poll"); - tags[i] = erts_bld_atom(hpp, szp, "fallback"); - values[i++] = erts_bld_atom(hpp, szp, pi.fallback ? pi.fallback : "false"); + tags[i] = erts_bld_atom(hpp, szp, "primary"); + values[i++] = erts_bld_atom(hpp, szp, piv[j].primary); - tags[i] = erts_bld_atom(hpp, szp, "kernel_poll"); - values[i++] = erts_bld_atom(hpp, szp, - pi.kernel_poll ? pi.kernel_poll : "false"); + tags[i] = erts_bld_atom(hpp, szp, "fallback"); + values[i++] = erts_bld_atom(hpp, szp, piv[j].fallback ? piv[j].fallback : "false"); - tags[i] = erts_bld_atom(hpp, szp, "memory_size"); - values[i++] = erts_bld_uint(hpp, szp, memory_size); + tags[i] = erts_bld_atom(hpp, szp, "kernel_poll"); + values[i++] = erts_bld_atom(hpp, szp, + piv[j].kernel_poll ? piv[j].kernel_poll : "false"); - tags[i] = erts_bld_atom(hpp, szp, "total_poll_set_size"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.poll_set_size); + tags[i] = erts_bld_atom(hpp, szp, "memory_size"); + values[i++] = erts_bld_uint(hpp, szp, piv[j].memory_size); - if (pi.fallback) { - tags[i] = erts_bld_atom(hpp, szp, "fallback_poll_set_size"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.fallback_poll_set_size); - } + tags[i] = erts_bld_atom(hpp, szp, "total_poll_set_size"); + values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].poll_set_size); + + if (piv[j].fallback) { + tags[i] = erts_bld_atom(hpp, szp, "fallback_poll_set_size"); + values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].fallback_poll_set_size); + } - tags[i] = erts_bld_atom(hpp, szp, "lazy_updates"); - values[i++] = pi.lazy_updates ? am_true : am_false; + tags[i] = erts_bld_atom(hpp, szp, "lazy_updates"); + values[i++] = piv[j].lazy_updates ? am_true : am_false; - if (pi.lazy_updates) { - tags[i] = erts_bld_atom(hpp, szp, "pending_updates"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.pending_updates); - } + if (piv[j].lazy_updates) { + tags[i] = erts_bld_atom(hpp, szp, "pending_updates"); + values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].pending_updates); + } - tags[i] = erts_bld_atom(hpp, szp, "batch_updates"); - values[i++] = pi.batch_updates ? am_true : am_false; + tags[i] = erts_bld_atom(hpp, szp, "batch_updates"); + values[i++] = piv[j].batch_updates ? am_true : am_false; - tags[i] = erts_bld_atom(hpp, szp, "concurrent_updates"); - values[i++] = pi.concurrent_updates ? am_true : am_false; + tags[i] = erts_bld_atom(hpp, szp, "concurrent_updates"); + values[i++] = piv[j].concurrent_updates ? am_true : am_false; - tags[i] = erts_bld_atom(hpp, szp, "max_fds"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.max_fds); + tags[i] = erts_bld_atom(hpp, szp, "max_fds"); + values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].max_fds); - tags[i] = erts_bld_atom(hpp, szp, "active_fds"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) active_fds); + tags[i] = erts_bld_atom(hpp, szp, "active_fds"); + values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].active_fds); -#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS - tags[i] = erts_bld_atom(hpp, szp, "no_avoided_wakeups"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.no_avoided_wakeups); + #ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS + tags[i] = erts_bld_atom(hpp, szp, "no_avoided_wakeups"); + values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].no_avoided_wakeups); - tags[i] = erts_bld_atom(hpp, szp, "no_avoided_interrupts"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.no_avoided_interrupts); + tags[i] = erts_bld_atom(hpp, szp, "no_avoided_interrupts"); + values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].no_avoided_interrupts); - tags[i] = erts_bld_atom(hpp, szp, "no_interrupt_timed"); - values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.no_interrupt_timed); -#endif + tags[i] = erts_bld_atom(hpp, szp, "no_interrupt_timed"); + values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].no_interrupt_timed); + #endif - res = erts_bld_2tup_list(hpp, szp, i, tags, values); + res = erts_bld_2tup_list(hpp, szp, i, tags, values); + + if (!hpp) { + *szp += 2; + } + else { + list = CONS(*hpp, res, list); + *hpp += 2; + } + } if (!hpp) { hp = HAlloc(p, sz); @@ -2342,7 +2445,9 @@ ERTS_CIO_EXPORT(erts_check_io_info)(void *proc) goto bld_it; } - return res; + erts_free(ERTS_ALC_T_TMP, piv); + + return list; } static ERTS_INLINE ErtsPollEvents @@ -2373,6 +2478,23 @@ print_events(ErtsPollEvents ev) return ev; } +static ERTS_INLINE void +print_flags(EventStateFlags f) +{ + const char* delim = ""; + if(f & ERTS_EV_FLAG_USED) { + erts_printf("%s","USED"); + delim = "|"; + } + if(f & ERTS_EV_FLAG_DEFER_IN_EV) { + erts_printf("%s%s", delim, "DRIN"); + delim = "|"; + } + if(f & ERTS_EV_FLAG_DEFER_OUT_EV) { + erts_printf("%s%s", delim, "DROUT"); + } +} + typedef struct { int used_fds; int num_errors; @@ -2419,7 +2541,8 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters) counters->used_fds++; #endif - erts_printf("fd=%d ", (int) fd); + erts_printf("pollset=%d fd=%d ", + (int)(state->pollset - pollsetv), (int) fd); #if defined(HAVE_FSTAT) && !defined(NO_FSTAT_ON_SYS_FD_TYPE) if (fstat((int) fd, &stat_buf) < 0) @@ -2497,7 +2620,15 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters) err = 1; } else { - err = 1; + ErtsPollEvents ev = cio_events; +#if ERTS_CIO_DEFER_ACTIVE_EVENTS + if (state->flags & ERTS_EV_FLAG_DEFER_IN_EV) + ev &= ~ERTS_POLL_EV_IN; + if (state->flags & ERTS_EV_FLAG_DEFER_OUT_EV) + ev &= ~ERTS_POLL_EV_OUT; +#endif + if (ev != ep_events) + err = 1; erts_printf("cio_ev="); print_events(cio_events); erts_printf(" ep_ev="); @@ -2607,6 +2738,8 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters) #endif } + erts_printf(" flags="); print_flags(state->flags); + if (err) { counters->num_errors++; erts_printf(" ERROR"); @@ -2619,18 +2752,23 @@ int ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip) { #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - int fd, len; + int fd, len, i; #endif - IterDebugCounters counters; + IterDebugCounters counters = {0}; #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS ErtsDrvEventState null_des; + null_des.pollset = NULL; null_des.driver.select = NULL; null_des.driver.nif = NULL; null_des.driver.stop.drv_ptr = NULL; null_des.events = 0; null_des.remove_cnt = 0; null_des.type = ERTS_EV_TYPE_NONE; + null_des.flags = 0; + + counters.epep = erts_alloc(ERTS_ALC_T_TMP, + sizeof(ErtsPollEvents)*drv_ev_state.max_fds); #endif erts_printf("--- fds in pollset --------------------------------------\n"); @@ -2642,28 +2780,25 @@ ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip) erts_thr_progress_block(); /* stop the world to avoid messy locking */ #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS - counters.epep = erts_alloc(ERTS_ALC_T_TMP, sizeof(ErtsPollEvents)*drv_ev_state.max_fds); - ERTS_POLL_EXPORT(erts_poll_get_selected_events)(pollset.ps, counters.epep, drv_ev_state.max_fds); - counters.internal_fds = 0; -#endif - counters.used_fds = 0; - counters.num_errors = 0; - counters.no_driver_select_structs = 0; - counters.no_enif_select_structs = 0; - -#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS len = erts_atomic_read_nob(&drv_ev_state.len); - for (fd = 0; fd < len; fd++) { - doit_erts_check_io_debug((void *) &drv_ev_state.v[fd], (void *) &counters); + + for (i = 0; i < erts_no_schedulers; i++) { + ERTS_POLL_EXPORT(erts_poll_get_selected_events)(pollsetv[i].ps, + counters.epep, + drv_ev_state.max_fds); + for (fd = 0; fd < len; fd++) { + if (drv_ev_state.v[fd].pollset == &pollsetv[i]) + doit_erts_check_io_debug(&drv_ev_state.v[fd], &counters); + } } - for ( ; fd < drv_ev_state.max_fds; fd++) { - null_des.fd = fd; - doit_erts_check_io_debug((void *) &null_des, (void *) &counters); + for (fd = len ; fd < drv_ev_state.max_fds; fd++) { + null_des.fd = fd; + doit_erts_check_io_debug(&null_des, &counters); } #else - safe_hash_for_each(&drv_ev_state.tab, &doit_erts_check_io_debug, (void *) &counters); + safe_hash_for_each(&drv_ev_state.tab, &doit_erts_check_io_debug, + &counters); #endif - erts_thr_progress_unblock(); ciodip->no_used_fds = counters.used_fds; @@ -2696,7 +2831,6 @@ void ERTS_CIO_EXPORT(erts_lcnt_update_cio_locks)(int enable) { } #endif /* ERTS_ENABLE_LOCK_COUNT */ - #ifdef ERTS_ENABLE_KERNEL_POLL # ifdef ERTS_KERNEL_POLL_VERSION @@ -2724,6 +2858,16 @@ int enif_select(ErlNifEnv* env, ErlNifEvent event, return (*erts_io_funcs.enif_select)(env, event, flags, obj, pid, ref); } +struct pollset_info* erts_get_pollset(int sched_nr) +{ + return (*erts_io_funcs.get_pollset)(sched_nr); +} + +void erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp) +{ + erts_io_funcs.notify_port_task_executed(pthp); +} + void erts_check_io(int do_wait) { erts_io_funcs.check_io(do_wait); @@ -2751,15 +2895,15 @@ erts_check_io_debug(ErtsCheckIoDebugInfo *ip) return (*erts_io_funcs.check_io_debug)(ip); } -void erts_check_io_interrupt(int set) +void erts_check_io_interrupt(struct pollset_info* psi, int set) { - erts_io_funcs.check_io_interrupt(set); + erts_io_funcs.check_io_interrupt(psi, set); } -void erts_check_io_interrupt_timed(int set, +void erts_check_io_interrupt_timed(struct pollset_info* psi, int set, ErtsMonotonicTime timeout_time) { - erts_io_funcs.check_io_interrupt_tmd(set, timeout_time); + erts_io_funcs.check_io_interrupt_tmd(psi, set, timeout_time); } #ifdef ERTS_ENABLE_LOCK_COUNT diff --git a/erts/emulator/sys/common/erl_check_io.h b/erts/emulator/sys/common/erl_check_io.h index f4d7983002..ab53d91756 100644 --- a/erts/emulator/sys/common/erl_check_io.h +++ b/erts/emulator/sys/common/erl_check_io.h @@ -30,38 +30,29 @@ #include "sys.h" #include "erl_sys_driver.h" +struct pollset_info; + Uint erts_check_io_size(void); Eterm erts_check_io_info(void *); +void erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp); +void erts_check_io_async_sig_interrupt(struct pollset_info *psi); int erts_check_io_max_files(void); -void erts_check_io_interrupt(int); -void erts_check_io_interrupt_timed(int, ErtsMonotonicTime); void erts_check_io(int); void erts_init_check_io(void); #ifdef ERTS_ENABLE_LOCK_COUNT void erts_lcnt_update_cio_locks(int enable); #endif -extern erts_atomic_t erts_check_io_time; +void erts_check_io_interrupt(struct pollset_info*, int); +void erts_check_io_interrupt_timed(struct pollset_info*, int, ErtsMonotonicTime); +struct pollset_info* erts_get_pollset(int sched_num); typedef struct { ErtsPortTaskHandle task; erts_atomic_t executed_time; + struct pollset_info *pollset; } ErtsIoTask; -ERTS_GLB_INLINE void erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp); - -#if ERTS_GLB_INLINE_INCL_FUNC_DEF - -ERTS_GLB_INLINE void -erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp) -{ - ErtsIoTask *itp = (ErtsIoTask *) (((char *) pthp) - offsetof(ErtsIoTask, task)); - erts_aint_t ci_time = erts_atomic_read_acqb(&erts_check_io_time); - erts_atomic_set_relb(&itp->executed_time, ci_time); -} - -#endif - #endif /* ERL_CHECK_IO_H__ */ #if !defined(ERL_CHECK_IO_C__) && !defined(ERTS_ALLOC_C__) @@ -80,7 +71,7 @@ erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp) */ # define ERTS_CIO_DEFER_ACTIVE_EVENTS 1 #else -# define ERTS_CIO_DEFER_ACTIVE_EVENTS 0 +# define ERTS_CIO_DEFER_ACTIVE_EVENTS 1 #endif typedef struct { diff --git a/erts/emulator/sys/common/erl_poll.h b/erts/emulator/sys/common/erl_poll.h index 12dfc66e51..6c961205fe 100644 --- a/erts/emulator/sys/common/erl_poll.h +++ b/erts/emulator/sys/common/erl_poll.h @@ -225,6 +225,7 @@ typedef struct { long no_avoided_interrupts; long no_interrupt_timed; #endif + int active_fds; } ErtsPollInfo; void ERTS_POLL_EXPORT(erts_poll_interrupt)(ErtsPollSet, diff --git a/erts/emulator/sys/common/erl_sys_common_misc.c b/erts/emulator/sys/common/erl_sys_common_misc.c index 09237c81ce..420138ff0a 100644 --- a/erts/emulator/sys/common/erl_sys_common_misc.c +++ b/erts/emulator/sys/common/erl_sys_common_misc.c @@ -45,14 +45,6 @@ #endif #endif -/* - * erts_check_io_time is used by the erl_check_io implementation. The - * global erts_check_io_time variable is declared here since there - * (often) exist two versions of erl_check_io (kernel-poll and - * non-kernel-poll), and we dont want two versions of this variable. - */ -erts_atomic_t erts_check_io_time; - /* Written once and only once */ static int filename_encoding = ERL_FILENAME_UNKNOWN; diff --git a/erts/emulator/sys/unix/sys.c b/erts/emulator/sys/unix/sys.c index 09c515291a..acd7920e86 100644 --- a/erts/emulator/sys/unix/sys.c +++ b/erts/emulator/sys/unix/sys.c @@ -499,6 +499,7 @@ static void signal_notify_requested(Eterm type) { static ERTS_INLINE void break_requested(void) { + int i; /* * just set a flag - checked for and handled by * scheduler threads erts_check_io() (not signal handler). @@ -510,7 +511,10 @@ break_requested(void) erts_exit(ERTS_INTR_EXIT, ""); ERTS_SET_BREAK_REQUESTED; - erts_check_io_interrupt(1); + for (i=0; i < erts_no_schedulers; i++) { + /* Make sure we don't sleep in poll */ + erts_check_io_interrupt(ERTS_SCHEDULER_IX(i)->pollset, 1); + } } static RETSIGTYPE request_break(int signum) diff --git a/erts/emulator/sys/unix/sys_drivers.c b/erts/emulator/sys/unix/sys_drivers.c index 7c9a532fed..0228e1af54 100644 --- a/erts/emulator/sys/unix/sys_drivers.c +++ b/erts/emulator/sys/unix/sys_drivers.c @@ -1723,8 +1723,6 @@ static ErlDrvData forker_start(ErlDrvPort port_num, char* name, SET_NONBLOCKING(forker_fd); - driver_select(port_num, forker_fd, ERL_DRV_READ|ERL_DRV_USE, 1); - return (ErlDrvData)port_num; } @@ -1821,10 +1819,19 @@ static void forker_ready_output(ErlDrvData e, ErlDrvEvent fd) static ErlDrvSSizeT forker_control(ErlDrvData e, unsigned int cmd, char *buf, ErlDrvSizeT len, char **rbuf, ErlDrvSizeT rlen) { + static int first_call = 1; ErtsSysForkerProto *proto = (ErtsSysForkerProto *)buf; ErlDrvPort port_num = (ErlDrvPort)e; int res; + if (first_call) { + /* + * Do driver_select here when schedulers and their pollsets have started. + */ + driver_select(port_num, forker_fd, ERL_DRV_READ|ERL_DRV_USE, 1); + first_call = 0; + } + driver_enq(port_num, buf, len); if (driver_sizeq(port_num) > sizeof(*proto)) { return 0; diff --git a/erts/emulator/test/driver_SUITE.erl b/erts/emulator/test/driver_SUITE.erl index c6d7f708be..332d11345b 100644 --- a/erts/emulator/test/driver_SUITE.erl +++ b/erts/emulator/test/driver_SUITE.erl @@ -83,6 +83,8 @@ -export([bin_prefix/2]). +-export([get_check_io_total/1]). % for z_SUITE.erl + -include_lib("common_test/include/ct.hrl"). @@ -783,7 +785,7 @@ io_ready_exit(Config) when is_list(Config) -> use_fallback_pollset(Config) when is_list(Config) -> FlbkFun = fun () -> - ChkIoDuring = erlang:system_info(check_io), + ChkIoDuring = get_check_io_total(erlang:system_info(check_io)), case lists:keysearch(fallback_poll_set_size, 1, ChkIoDuring) of @@ -939,7 +941,7 @@ chkio_test({erts_poll_info, Before}, "ok" -> chk_chkio_port(Port), Fun(), - During = erlang:system_info(check_io), + During = get_check_io_total(erlang:system_info(check_io)), erlang:display(During), 0 = element(1, erts_debug:get_internal_state(check_io_debug)), io:format("During test: ~p~n", [During]), @@ -992,14 +994,14 @@ verify_chkio_state(Before, After) -> ok. get_stable_check_io_info() -> - ChkIo = erlang:system_info(check_io), - PendUpdNo = case lists:keysearch(pending_updates, 1, ChkIo) of - {value, {pending_updates, PendNo}} -> - PendNo; + ChkIo = get_check_io_total(erlang:system_info(check_io)), + PendUpdNo = case lists:keyfind(pending_updates, 1, ChkIo) of + {pending_updates, Value} -> + Value; false -> 0 end, - {value, {active_fds, ActFds}} = lists:keysearch(active_fds, 1, ChkIo), + {active_fds, ActFds} = lists:keyfind(active_fds, 1, ChkIo), case {PendUpdNo, ActFds} of {0, 0} -> ChkIo; @@ -1008,6 +1010,46 @@ get_stable_check_io_info() -> get_stable_check_io_info() end. +%% Merge return from erlang:system_info(check_io) +%% as if it was one big pollset. +get_check_io_total(ChkIo) -> + lists:foldl(fun(Pollset, Acc) -> + lists:zipwith(fun(A, B) -> + add_pollset_infos(A,B) + end, + Pollset, Acc) + end, + hd(ChkIo), + tl(ChkIo)). + +add_pollset_infos({Tag, A}=TA , {Tag, B}=TB) -> + case tag_type(Tag) of + sum -> + {Tag, A + B}; + const -> + case A of + B -> TA; + _ -> + ct:fail("Unexpected diff in pollsets ~p != ~p", + [TA,TB]) + end + end. + +tag_type(name) -> const; +tag_type(primary) -> const; +tag_type(fallback) -> const; +tag_type(kernel_poll) -> const; +tag_type(memory_size) -> sum; +tag_type(total_poll_set_size) -> sum; +tag_type(fallback_poll_set_size) -> sum; +tag_type(lazy_updates) -> const; +tag_type(pending_updates) -> sum; +tag_type(batch_updates) -> const; +tag_type(concurrent_updates) -> const; +tag_type(max_fds) -> const; +tag_type(active_fds) -> sum. + + %% Missed port lock when stealing control of fd from a %% driver that didn't use the same lock. The lock checker %% used to trigger on this and dump core. diff --git a/erts/emulator/test/z_SUITE.erl b/erts/emulator/test/z_SUITE.erl index feea7432a9..ac3df8bfbf 100644 --- a/erts/emulator/test/z_SUITE.erl +++ b/erts/emulator/test/z_SUITE.erl @@ -330,7 +330,7 @@ display_check_io(ChkIo) -> ok. get_check_io_info() -> - ChkIo = erlang:system_info(check_io), + ChkIo = driver_SUITE:get_check_io_total(erlang:system_info(check_io)), PendUpdNo = case lists:keysearch(pending_updates, 1, ChkIo) of {value, {pending_updates, PendNo}} -> PendNo; |