aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/emulator/beam/erl_port_task.c37
-rw-r--r--erts/emulator/beam/erl_port_task.h22
-rw-r--r--erts/emulator/beam/erl_process.c144
-rw-r--r--erts/emulator/beam/erl_process.h3
-rw-r--r--erts/emulator/beam/erl_vm.h2
-rw-r--r--erts/emulator/sys/common/erl_check_io.c734
-rw-r--r--erts/emulator/sys/common/erl_check_io.h27
-rw-r--r--erts/emulator/sys/common/erl_poll.h1
-rw-r--r--erts/emulator/sys/common/erl_sys_common_misc.c8
-rw-r--r--erts/emulator/sys/unix/sys.c6
-rw-r--r--erts/emulator/sys/unix/sys_drivers.c11
-rw-r--r--erts/emulator/test/driver_SUITE.erl56
-rw-r--r--erts/emulator/test/z_SUITE.erl2
13 files changed, 543 insertions, 510 deletions
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index bdce811847..141e815d1f 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -91,8 +91,6 @@ static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_q
erts_atomic_read_nob(&(PP)->run_queue))); \
} while (0)
-erts_atomic_t erts_port_task_outstanding_io_tasks;
-
#define ERTS_PT_STATE_SCHEDULED 0
#define ERTS_PT_STATE_ABORTED 1
#define ERTS_PT_STATE_EXECUTING 2
@@ -584,9 +582,12 @@ static ERTS_INLINE void
reset_executed_io_task_handle(ErtsPortTask *ptp)
{
if (ptp->u.alive.handle) {
+ ErtsIoTask *itp = ErtsContainerStruct(ptp->u.alive.handle, ErtsIoTask, task);
+
ASSERT(ptp == handle2task(ptp->u.alive.handle));
erts_io_notify_port_task_executed(ptp->u.alive.handle);
reset_port_task_handle(ptp->u.alive.handle);
+ erts_check_io_interrupt(itp->pollset, 1);
}
}
@@ -1308,20 +1309,7 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp)
if (old_state != ERTS_PT_STATE_SCHEDULED)
res = - 1; /* Task already aborted, executing, or executed */
else {
-
reset_port_task_handle(pthp);
-
- switch (ptp->type) {
- case ERTS_PORT_TASK_INPUT:
- case ERTS_PORT_TASK_OUTPUT:
- ASSERT(erts_atomic_read_nob(
- &erts_port_task_outstanding_io_tasks) > 0);
- erts_atomic_dec_relb(&erts_port_task_outstanding_io_tasks);
- break;
- default:
- break;
- }
-
res = 0;
}
}
@@ -1458,7 +1446,6 @@ erts_port_task_schedule(Eterm id,
va_start(argp, type);
ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent);
va_end(argp);
- erts_atomic_inc_relb(&erts_port_task_outstanding_io_tasks);
break;
}
case ERTS_PORT_TASK_PROC_SIG: {
@@ -1634,13 +1621,12 @@ erts_port_task_free_port(Port *pp)
* scheduling of processes. Run-queue lock should be held by caller.
*/
-int
+void
erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
{
Port *pp;
ErtsPortTask *execq;
int processing_busy_q;
- int res = 0;
int vreds = 0;
int reds = 0;
erts_aint_t io_tasks_executed = 0;
@@ -1655,7 +1641,6 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
pp = pop_port(runq);
if (!pp) {
- res = 0;
goto done;
}
@@ -1822,14 +1807,6 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
erts_unblock_fpe(fpe_was_unmasked);
ERTS_MSACC_POP_STATE_M();
-
- if (io_tasks_executed) {
- ASSERT(erts_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
- >= io_tasks_executed);
- erts_atomic_add_relb(&erts_port_task_outstanding_io_tasks,
- -1*io_tasks_executed);
- }
-
ASSERT(runq == (ErtsRunQueue *) erts_atomic_read_nob(&pp->run_queue));
active = finalize_exec(pp, &execq, processing_busy_q);
@@ -1870,15 +1847,11 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
}
done:
- res = (erts_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
- != (erts_aint_t) 0);
runq->scheduler->reductions += reds;
ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
ERTS_PORT_REDUCTIONS_EXECUTED(esdp, runq, reds);
-
- return res;
}
static void
@@ -2123,8 +2096,6 @@ erts_dequeue_port(ErtsRunQueue *rq)
void
erts_port_task_init(void)
{
- erts_atomic_init_nob(&erts_port_task_outstanding_io_tasks,
- (erts_aint_t) 0);
init_port_task_alloc();
init_busy_caller_table_alloc();
}
diff --git a/erts/emulator/beam/erl_port_task.h b/erts/emulator/beam/erl_port_task.h
index ffd42c9bab..ce63e4a7c0 100644
--- a/erts/emulator/beam/erl_port_task.h
+++ b/erts/emulator/beam/erl_port_task.h
@@ -61,11 +61,6 @@ typedef enum {
ERTS_PORT_TASK_PROC_SIG
} ErtsPortTaskType;
-#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
-/* NOTE: Do not access any of the exported variables directly */
-extern erts_atomic_t erts_port_task_outstanding_io_tasks;
-#endif
-
#define ERTS_PTS_FLG_IN_RUNQ (((erts_aint32_t) 1) << 0)
#define ERTS_PTS_FLG_EXEC (((erts_aint32_t) 1) << 1)
#define ERTS_PTS_FLG_HAVE_TASKS (((erts_aint32_t) 1) << 2)
@@ -139,10 +134,6 @@ ERTS_GLB_INLINE void erts_port_task_sched_unlock(ErtsPortTaskSched *ptsp);
ERTS_GLB_INLINE int erts_port_task_sched_lock_is_locked(ErtsPortTaskSched *ptsp);
ERTS_GLB_INLINE void erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched *ptsp);
-#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
-ERTS_GLB_INLINE int erts_port_task_have_outstanding_io_tasks(void);
-#endif
-
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE void
@@ -220,21 +211,10 @@ erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched *ptsp)
erts_atomic32_read_bor_nob(&ptsp->flags, ERTS_PTS_FLG_EXITING);
}
-#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
-
-ERTS_GLB_INLINE int
-erts_port_task_have_outstanding_io_tasks(void)
-{
- return (erts_atomic_read_acqb(&erts_port_task_outstanding_io_tasks)
- != 0);
-}
-
-#endif /* ERTS_INCLUDE_SCHEDULER_INTERNALS */
-
#endif
#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
-int erts_port_task_execute(ErtsRunQueue *, Port **);
+void erts_port_task_execute(ErtsRunQueue *, Port **);
void erts_port_task_init(void);
#endif
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 5d33c9f6d6..2a34410d5b 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -365,9 +365,6 @@ erts_sched_stat_t erts_sched_stat;
static erts_tsd_key_t ERTS_WRITE_UNLIKELY(sched_data_key);
-static erts_atomic32_t function_calls;
-
-static erts_atomic32_t doing_sys_schedule;
static erts_atomic32_t no_empty_run_queues;
long erts_runq_supervision_interval = 0;
static ethr_event runq_supervision_event;
@@ -1558,18 +1555,19 @@ erts_psd_set_init(Process *p, int ix, void *data)
void
-erts_sched_finish_poke(ErtsSchedulerSleepInfo *ssi, erts_aint32_t flags)
+erts_sched_finish_poke(ErtsSchedulerSleepInfo *ssi,
+ erts_aint32_t flags)
{
switch (flags & ERTS_SSI_FLGS_SLEEP_TYPE) {
case ERTS_SSI_FLG_POLL_SLEEPING:
- erts_check_io_interrupt(1);
+ erts_check_io_interrupt(ssi->esdp->pollset, 1);
break;
case ERTS_SSI_FLG_POLL_SLEEPING|ERTS_SSI_FLG_TSE_SLEEPING:
/*
* Thread progress blocking while poll sleeping; need
* to signal on both...
*/
- erts_check_io_interrupt(1);
+ erts_check_io_interrupt(ssi->esdp->pollset, 1);
/* fall through */
case ERTS_SSI_FLG_TSE_SLEEPING:
erts_tse_set(ssi->event);
@@ -2869,50 +2867,14 @@ erts_active_schedulers(void)
return as;
}
-
-static ERTS_INLINE void
-clear_sys_scheduling(void)
-{
- erts_atomic32_set_mb(&doing_sys_schedule, 0);
-}
-
-static ERTS_INLINE int
-try_set_sys_scheduling(void)
-{
- return 0 == erts_atomic32_cmpxchg_acqb(&doing_sys_schedule, 1, 0);
-}
-
-
static ERTS_INLINE int
prepare_for_sys_schedule(int non_blocking)
{
- if (non_blocking && erts_eager_check_io) {
- return try_set_sys_scheduling();
- }
- else {
- while (!erts_port_task_have_outstanding_io_tasks()
- && try_set_sys_scheduling()) {
- if (!erts_port_task_have_outstanding_io_tasks())
- return 1;
- clear_sys_scheduling();
- }
- return 0;
- }
+ return 1;
}
static ERTS_INLINE void
-sched_change_waiting_sys_to_waiting(Uint no, ErtsRunQueue *rq)
-{
- ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
-
- ASSERT(!ERTS_RUNQ_IX_IS_DIRTY(rq->ix));
-
- ASSERT(rq->waiting < 0);
- rq->waiting *= -1;
-}
-
-static ERTS_INLINE void
sched_waiting(Uint no, ErtsRunQueue *rq)
{
ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
@@ -3083,7 +3045,7 @@ sched_set_sleeptype(ErtsSchedulerSleepInfo *ssi, erts_aint32_t sleep_type)
erts_tse_reset(ssi->event);
else {
ASSERT(sleep_type == ERTS_SSI_FLG_POLL_SLEEPING);
- erts_check_io_interrupt(0);
+ erts_check_io_interrupt(ssi->esdp->pollset, 0);
}
while (1) {
@@ -3272,8 +3234,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
spincount = sched_busy_wait.tse;
- tse_wait:
-
if (ERTS_SCHEDULER_IS_DIRTY(esdp))
dirty_sched_wall_time_change(esdp, working = 0);
else if (thr_prgr_active != working)
@@ -3402,7 +3362,7 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
else
{
- erts_atomic32_set_relb(&function_calls, 0);
+ esdp->function_calls = 0;
*fcalls = 0;
ASSERT(!ERTS_SCHEDULER_IS_DIRTY(esdp));
@@ -3428,7 +3388,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_CHECK_IO);
- ASSERT(!erts_port_task_have_outstanding_io_tasks());
LTTNG2(scheduler_poll, esdp->no, 1);
erl_sys_schedule(1); /* Might give us something to do */
@@ -3459,48 +3418,10 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
ASSERT(!(flgs & ERTS_SSI_FLG_SLEEPING));
goto sys_woken;
}
-
- /*
- * If we got new I/O tasks we aren't allowed to
- * call erl_sys_schedule() until it is handled.
- */
- if (erts_port_task_have_outstanding_io_tasks()) {
- clear_sys_scheduling();
- /*
- * Got to check that we still got I/O tasks; otherwise
- * we have to continue checking for I/O...
- */
- if (!prepare_for_sys_schedule(0)) {
- spincount *= ERTS_SCHED_TSE_SLEEP_SPINCOUNT_FACT;
- goto tse_wait;
- }
- }
}
erts_runq_lock(rq);
- /*
- * If we got new I/O tasks we aren't allowed to
- * sleep in erl_sys_schedule().
- */
- if (erts_port_task_have_outstanding_io_tasks()) {
- clear_sys_scheduling();
-
- /*
- * Got to check that we still got I/O tasks; otherwise
- * we have to wait in erl_sys_schedule() after all...
- */
- if (!prepare_for_sys_schedule(0)) {
- /*
- * Not allowed to wait in erl_sys_schedule;
- * do tse wait instead...
- */
- sched_change_waiting_sys_to_waiting(esdp->no, rq);
- erts_runq_unlock(rq);
- spincount = 0;
- goto tse_wait;
- }
- }
if (aux_work) {
erts_runq_unlock(rq);
goto sys_poll_aux_work;
@@ -3517,7 +3438,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
ASSERT(!(flgs & ERTS_SSI_FLG_SLEEPING));
goto sys_woken;
}
- ASSERT(!erts_port_task_have_outstanding_io_tasks());
goto sys_poll_aux_work;
}
@@ -3532,8 +3452,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
if (thr_prgr_active)
erts_thr_progress_active(esdp, thr_prgr_active = 0);
- ASSERT(!erts_port_task_have_outstanding_io_tasks());
-
ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_CHECK_IO);
LTTNG2(scheduler_poll, esdp->no, 0);
@@ -3561,7 +3479,6 @@ scheduler_wait(int *fcalls, ErtsSchedulerData *esdp, ErtsRunQueue *rq)
erts_thr_progress_active(esdp, thr_prgr_active = 1);
erts_runq_lock(rq);
}
- clear_sys_scheduling();
if (flgs & ~(ERTS_SSI_FLG_SUSPENDED|ERTS_SSI_FLG_MSB_EXEC))
erts_atomic32_read_band_nob(&ssi->flags,
(ERTS_SSI_FLG_SUSPENDED
@@ -5840,6 +5757,7 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num,
esdp->run_queue = runq;
if (ERTS_RUNQ_IX_IS_DIRTY(runq->ix)) {
esdp->no = 0;
+ esdp->pollset = NULL;
if (runq == ERTS_DIRTY_CPU_RUNQ)
esdp->type = ERTS_SCHED_DIRTY_CPU;
else {
@@ -5859,6 +5777,7 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num,
esdp->type = ERTS_SCHED_NORMAL;
esdp->no = (Uint) num;
esdp->dirty_no = 0;
+ esdp->pollset = erts_get_pollset(num);
runq->scheduler = esdp;
}
esdp->dirty_shadow_process = shadow_proc;
@@ -5871,6 +5790,9 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num,
shadow_proc->static_flags = ERTS_STC_FLG_SHADOW_PROC;
}
+ esdp->function_calls = 0;
+
+ ssi->esdp = esdp;
esdp->ssi = ssi;
esdp->current_process = NULL;
esdp->current_port = NULL;
@@ -6204,13 +6126,9 @@ erts_init_scheduling(int no_schedulers, int no_schedulers_online
erts_atomic32_set_nob(&schdlr_sspnd.changing,
set_schdlr_sspnd_change_flags);
- erts_atomic32_init_nob(&doing_sys_schedule, 0);
-
init_misc_aux_work();
- erts_atomic32_init_nob(&function_calls, 0);
-
/* init port tasks */
erts_port_task_init();
@@ -7379,7 +7297,7 @@ msb_scheduler_type_switch(ErtsSchedType sched_type,
calls = ERTS_MODIFIED_TIMING_INPUT_REDS + 1;
else
calls = INPUT_REDUCTIONS + 1;
- erts_atomic32_set_nob(&function_calls, calls);
+ esdp->function_calls = calls;
if ((nrml_prio == ERTS_MSB_NONE_PRIO_BIT)
& ((dcpu_prio != ERTS_MSB_NONE_PRIO_BIT)
@@ -9769,7 +9687,7 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
}
rq = erts_get_runq_current(esdp);
ASSERT(esdp);
- fcalls = (int) erts_atomic32_read_acqb(&function_calls);
+ fcalls = esdp->function_calls;
actual_reds = reds = 0;
erts_runq_lock(rq);
} else {
@@ -9795,7 +9713,8 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
reds = ERTS_PROC_MIN_CONTEXT_SWITCH_REDS_COST;
esdp->virtual_reds = 0;
- fcalls = (int) erts_atomic32_add_read_acqb(&function_calls, reds);
+ esdp->function_calls += reds;
+ fcalls = esdp->function_calls;
ASSERT(esdp && esdp == erts_get_scheduler_data());
rq = erts_get_runq_current(esdp);
@@ -10044,11 +9963,11 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
ERTS_MSACC_PUSH_STATE_CACHED_M();
- erts_atomic32_set_relb(&function_calls, 0);
+ esdp->function_calls = 0;
fcalls = 0;
#if 0 /* Not needed since we wont wait in sys schedule */
- erts_check_io_interrupt(0);
+ erts_check_io_interrupt(esdp->pollset, 0);
#endif
erts_runq_unlock(rq);
@@ -10063,7 +9982,6 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
erts_bump_timers(esdp->timer_wheel, current_time);
erts_runq_lock(rq);
- clear_sys_scheduling();
goto continue_check_activities_to_run;
}
@@ -10079,29 +9997,9 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
flags = ERTS_RUNQ_FLGS_GET_NOB(rq);
if (flags & PORT_BIT) {
- int have_outstanding_io;
- have_outstanding_io = erts_port_task_execute(rq, &esdp->current_port);
- if ((!erts_eager_check_io
- && have_outstanding_io
- && fcalls > 2*input_reductions)
- || (flags & ERTS_RUNQ_FLG_HALTING)) {
- /*
- * If we have performed more than 2*INPUT_REDUCTIONS since
- * last call to erl_sys_schedule() and we still haven't
- * handled all I/O tasks we stop running processes and
- * focus completely on ports.
- *
- * One could argue that this is a strange behavior. The
- * reason for doing it this way is that it is similar
- * to the behavior before port tasks were introduced.
- * We don't want to change the behavior too much, at
- * least not at the time of writing. This behavior
- * might change in the future.
- *
- * /rickard
- */
- goto check_activities_to_run;
- }
+ erts_port_task_execute(rq, &esdp->current_port);
+ if (flags & ERTS_RUNQ_FLG_HALTING)
+ goto check_activities_to_run;
}
/*
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index e63da2d9db..b372146846 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -364,6 +364,7 @@ typedef struct {
} ErtsSchedulerSleepList;
struct ErtsSchedulerSleepInfo_ {
+ struct ErtsSchedulerData_ *esdp;
ErtsSchedulerSleepInfo *next;
ErtsSchedulerSleepInfo *prev;
erts_atomic32_t flags;
@@ -631,6 +632,8 @@ struct ErtsSchedulerData_ {
void *match_pseudo_process; /* erl_db_util.c:db_prog_match() */
Process *free_process;
ErtsThrPrgrData thr_progress_data;
+ struct pollset_info* pollset;
+ int function_calls;
ErtsSchedulerSleepInfo *ssi;
Process *current_process;
ErtsSchedType type;
diff --git a/erts/emulator/beam/erl_vm.h b/erts/emulator/beam/erl_vm.h
index 076767c7cd..82977e62ea 100644
--- a/erts/emulator/beam/erl_vm.h
+++ b/erts/emulator/beam/erl_vm.h
@@ -46,7 +46,7 @@
*/
#define ERTS_X_REGS_ALLOCATED (MAX_REG+3)
-#define INPUT_REDUCTIONS (2 * CONTEXT_REDS)
+#define INPUT_REDUCTIONS (CONTEXT_REDS / 4)
#define H_DEFAULT_SIZE 233 /* default (heap + stack) min size */
#define VH_DEFAULT_SIZE 32768 /* default virtual (bin) heap min size (words) */
diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c
index 52584e4246..ff0f3ea121 100644
--- a/erts/emulator/sys/common/erl_check_io.c
+++ b/erts/emulator/sys/common/erl_check_io.c
@@ -94,6 +94,7 @@ static struct pollset_info
{
ErtsPollSet ps;
erts_atomic_t in_poll_wait; /* set while doing poll */
+ erts_atomic_t check_io_time;
struct {
int six; /* start index */
int eix; /* end index */
@@ -102,7 +103,7 @@ static struct pollset_info
ErtsSysFdType *array;
} active_fd;
erts_atomic_t removed_list; /* struct removed_fd* */
-}pollset;
+}*pollsetv;
#define NUM_OF_POLLSETS 1
@@ -114,21 +115,39 @@ int ERTS_CIO_EXPORT(enif_select)(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags
Uint ERTS_CIO_EXPORT(erts_check_io_size)(void);
Eterm ERTS_CIO_EXPORT(erts_check_io_info)(void *);
int ERTS_CIO_EXPORT(erts_check_io_max_files)(void);
-void ERTS_CIO_EXPORT(erts_check_io_interrupt)(int);
-void ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(int, ErtsMonotonicTime);
+void ERTS_CIO_EXPORT(erts_check_io_interrupt)(struct pollset_info*, int);
+void ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(struct pollset_info*, int, ErtsMonotonicTime);
void ERTS_CIO_EXPORT(erts_check_io)(int);
+struct pollset_info* ERTS_CIO_EXPORT(erts_get_pollset)(int);
int ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *);
+void ERTS_CIO_EXPORT(erts_io_notify_port_task_executed)(ErtsPortTaskHandle *pthp);
#ifdef ERTS_ENABLE_LOCK_COUNT
void ERTS_CIO_EXPORT(erts_lcnt_update_cio_locks)(int enable);
#endif
#endif
+/* ToDo: Was inline in erl_check_io.h but now need struct pollset_info */
+void
+ERTS_CIO_EXPORT(erts_io_notify_port_task_executed)(ErtsPortTaskHandle *pthp)
+{
+ ErtsIoTask *itp = ErtsContainerStruct(pthp, ErtsIoTask, task);
+ erts_aint_t ci_time = erts_atomic_read_acqb(&itp->pollset->check_io_time);
+ erts_atomic_set_relb(&itp->executed_time, ci_time);
+}
+
+
+struct pollset_info* ERTS_CIO_EXPORT(erts_get_pollset)(int sched_nr)
+{
+ ASSERT(sched_nr > 0 && sched_nr <= erts_no_schedulers);
+ return &pollsetv[sched_nr - 1];
+}
typedef struct {
#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
SafeHashBucket hb;
#endif
ErtsSysFdType fd;
+ struct pollset_info *pollset;
struct {
ErtsDrvSelectDataState *select; /* ERTS_EV_TYPE_DRV_SEL */
ErtsNifSelectDataState *nif; /* ERTS_EV_TYPE_NIF */
@@ -209,6 +228,7 @@ static ERTS_INLINE ErtsDrvEventState* hash_new_drv_ev_state(ErtsSysFdType fd)
{
ErtsDrvEventState tmpl;
tmpl.fd = fd;
+ tmpl.pollset = NULL;
tmpl.driver.select = NULL;
tmpl.driver.nif = NULL;
tmpl.driver.stop.drv_ptr = NULL;
@@ -252,10 +272,11 @@ steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErtsResource*,
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(removed_fd, struct removed_fd, 64, ERTS_ALC_T_FD_LIST)
static ERTS_INLINE void
-init_iotask(ErtsIoTask *io_task)
+init_iotask(ErtsIoTask *io_task, struct pollset_info* psi)
{
erts_port_task_handle_init(&io_task->task);
erts_atomic_init_nob(&io_task->executed_time, ~((erts_aint_t) 0));
+ io_task->pollset = psi;
}
static ERTS_INLINE int
@@ -269,14 +290,14 @@ is_iotask_active(ErtsIoTask *io_task, erts_aint_t current_cio_time)
}
static ERTS_INLINE ErtsDrvSelectDataState *
-alloc_drv_select_data(void)
+alloc_drv_select_data(struct pollset_info* psi)
{
ErtsDrvSelectDataState *dsp = erts_alloc(ERTS_ALC_T_DRV_SEL_D_STATE,
sizeof(ErtsDrvSelectDataState));
dsp->inport = NIL;
dsp->outport = NIL;
- init_iotask(&dsp->iniotask);
- init_iotask(&dsp->outiotask);
+ init_iotask(&dsp->iniotask, psi);
+ init_iotask(&dsp->outiotask, psi);
return dsp;
}
@@ -307,11 +328,11 @@ free_nif_select_data(ErtsNifSelectDataState *dsp)
}
static ERTS_INLINE void
-remember_removed(ErtsDrvEventState *state, struct pollset_info* psi)
+remember_removed(ErtsDrvEventState *state)
{
struct removed_fd *fdlp;
ERTS_LC_ASSERT(erts_lc_mtx_is_locked(fd_mtx(state->fd)));
- if (erts_atomic_read_nob(&psi->in_poll_wait)) {
+ if (erts_atomic_read_nob(&state->pollset->in_poll_wait)) {
erts_aint_t was_next, exp_next;
state->remove_cnt++;
ASSERT(state->remove_cnt > 0);
@@ -324,11 +345,11 @@ remember_removed(ErtsDrvEventState *state, struct pollset_info* psi)
#endif
/* Lockless atomic insertion in removed_list */
- was_next = erts_atomic_read_acqb(&psi->removed_list);
+ was_next = erts_atomic_read_acqb(&state->pollset->removed_list);
do {
exp_next = was_next;
fdlp->next = (struct removed_fd*) exp_next;
- was_next = erts_atomic_cmpxchg_mb(&psi->removed_list,
+ was_next = erts_atomic_cmpxchg_mb(&state->pollset->removed_list,
(erts_aint_t) fdlp,
exp_next);
}while (was_next != exp_next);
@@ -384,7 +405,7 @@ forget_removed(struct pollset_info* psi)
state->driver.stop.resource = NULL;
ASSERT(resource);
state->type = ERTS_EV_TYPE_NONE;
- state->flags &= ~ERTS_EV_FLAG_USED;
+ state->flags = 0;
goto case_ERTS_EV_TYPE_NONE;
case ERTS_EV_TYPE_STOP_USE:
@@ -392,11 +413,12 @@ forget_removed(struct pollset_info* psi)
drv_ptr = state->driver.stop.drv_ptr;
ASSERT(drv_ptr);
state->type = ERTS_EV_TYPE_NONE;
- state->flags &= ~ERTS_EV_FLAG_USED;
+ state->flags = 0;
state->driver.stop.drv_ptr = NULL;
/* Fall through */
case ERTS_EV_TYPE_NONE:
case_ERTS_EV_TYPE_NONE:
+ state->pollset = NULL;
#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
hash_erase_drv_ev_state(state);
#endif
@@ -455,6 +477,7 @@ grow_drv_ev_state(int min_ix)
sizeof(ErtsDrvEventState)*new_len));
for (i = old_len; i < new_len; i++) {
drv_ev_state.v[i].fd = (ErtsSysFdType) i;
+ drv_ev_state.v[i].pollset = NULL;
drv_ev_state.v[i].driver.select = NULL;
drv_ev_state.v[i].driver.stop.drv_ptr = NULL;
drv_ev_state.v[i].driver.nif = NULL;
@@ -541,7 +564,9 @@ deselect(ErtsDrvEventState *state, int mode)
}
}
- state->events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, rm_events, 0, &do_wake);
+ ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, rm_events,
+ 0, &do_wake);
+ state->events &= ~rm_events;
if (!(state->events)) {
switch (state->type) {
@@ -565,8 +590,8 @@ deselect(ErtsDrvEventState *state, int mode)
}
state->type = ERTS_EV_TYPE_NONE;
- state->flags &= ~ERTS_EV_FLAG_USED;
- remember_removed(state, &pollset);
+ state->flags = 0;
+ remember_removed(state);
}
}
@@ -585,7 +610,7 @@ check_fd_cleanup(ErtsDrvEventState *state,
ERTS_LC_ASSERT(erts_lc_mtx_is_locked(fd_mtx(state->fd)));
- current_cio_time = erts_atomic_read_acqb(&erts_check_io_time);
+ current_cio_time = erts_atomic_read_acqb(&state->pollset->check_io_time);
*free_select = NULL;
if (state->driver.select
&& (state->type != ERTS_EV_TYPE_DRV_SEL)
@@ -602,19 +627,27 @@ check_fd_cleanup(ErtsDrvEventState *state,
state->driver.nif = NULL;
}
-#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
if (((state->type != ERTS_EV_TYPE_NONE)
| state->remove_cnt
+ | (state->driver.nif != NULL)
| (state->driver.select != NULL)) == 0) {
+ state->pollset = NULL;
+#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
hash_erase_drv_ev_state(state);
-
- }
#endif
+ }
}
+#ifdef __WIN32__
+# define MUST_DEFER(MAY_SLEEP) 1
+#else
+# define MUST_DEFER(MAY_SLEEP) (MAY_SLEEP)
+#endif
+
static ERTS_INLINE int
-check_cleanup_active_fd(ErtsSysFdType fd,
+check_cleanup_active_fd(struct pollset_info* psi,
+ ErtsSysFdType fd,
#if ERTS_CIO_DEFER_ACTIVE_EVENTS
ErtsPollControlEntry *pce,
int *pce_ix,
@@ -637,14 +670,15 @@ check_cleanup_active_fd(ErtsSysFdType fd,
state = &drv_ev_state.v[(int) fd];
#else
state = hash_get_drv_ev_state(fd); /* may be NULL! */
- if (state)
#endif
+ if (state && state->pollset == psi)
{
if (state->driver.select) {
#if ERTS_CIO_DEFER_ACTIVE_EVENTS
if (is_iotask_active(&state->driver.select->iniotask, current_cio_time)) {
active = 1;
- if ((state->events & ERTS_POLL_EV_IN)
+ if (MUST_DEFER(may_sleep)
+ && (state->events & ERTS_POLL_EV_IN)
&& !(state->flags & ERTS_EV_FLAG_DEFER_IN_EV)) {
evoff |= ERTS_POLL_EV_IN;
state->flags |= ERTS_EV_FLAG_DEFER_IN_EV;
@@ -657,15 +691,17 @@ check_cleanup_active_fd(ErtsSysFdType fd,
}
if (is_iotask_active(&state->driver.select->outiotask, current_cio_time)) {
active = 1;
- if ((state->events & ERTS_POLL_EV_OUT)
+ if (MUST_DEFER(may_sleep)
+ && (state->events & ERTS_POLL_EV_OUT)
&& !(state->flags & ERTS_EV_FLAG_DEFER_OUT_EV)) {
evoff |= ERTS_POLL_EV_OUT;
state->flags |= ERTS_EV_FLAG_DEFER_OUT_EV;
}
}
else if (state->flags & ERTS_EV_FLAG_DEFER_OUT_EV) {
- if (state->events & ERTS_POLL_EV_OUT)
+ if (state->events & ERTS_POLL_EV_OUT) {
evon |= ERTS_POLL_EV_OUT;
+ }
state->flags &= ~ERTS_EV_FLAG_DEFER_OUT_EV;
}
if (active)
@@ -681,6 +717,12 @@ check_cleanup_active_fd(ErtsSysFdType fd,
free_select = state->driver.select;
state->driver.select = NULL;
}
+#if ERTS_CIO_DEFER_ACTIVE_EVENTS
+ if (evon) {
+ int do_wake = 0;
+ ERTS_CIO_POLL_CTL(psi->ps, state->fd, evon, 1, &do_wake);
+ }
+#endif
}
if (state->driver.nif) {
@@ -705,7 +747,7 @@ check_cleanup_active_fd(ErtsSysFdType fd,
}
if (rm_events) {
int do_wake = 0;
- state->events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd,
+ state->events = ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd,
rm_events, 0, &do_wake);
}
if (state->events)
@@ -720,7 +762,6 @@ check_cleanup_active_fd(ErtsSysFdType fd,
if (((state->type != ERTS_EV_TYPE_NONE) | state->remove_cnt | active) == 0)
hash_erase_drv_ev_state(state);
#endif
-
}
erts_mtx_unlock(mtx);
@@ -737,28 +778,23 @@ check_cleanup_active_fd(ErtsSysFdType fd,
pcep->events = evoff;
pcep->on = 0;
}
- if (evon) {
- ErtsPollControlEntry *pcep = &pce[(*pce_ix)++];
- pcep->fd = fd;
- pcep->events = evon;
- pcep->on = 1;
- }
#endif
return active;
}
static void
-check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep)
+check_cleanup_active_fds(struct pollset_info* psi,
+ erts_aint_t current_cio_time, int may_sleep)
{
- int six = pollset.active_fd.six;
- int eix = pollset.active_fd.eix;
- erts_aint32_t no = erts_atomic32_read_dirty(&pollset.active_fd.no);
- int size = pollset.active_fd.size;
+ int six = psi->active_fd.six;
+ int eix = psi->active_fd.eix;
+ erts_aint32_t no = erts_atomic32_read_dirty(&psi->active_fd.no);
+ const int size = psi->active_fd.size;
int ix = six;
#if ERTS_CIO_DEFER_ACTIVE_EVENTS
- /* every fd might add two entries */
- Uint pce_sz = 2*sizeof(ErtsPollControlEntry)*no;
+ /* every fd might add one entry */
+ Uint pce_sz = sizeof(ErtsPollControlEntry)*no;
ErtsPollControlEntry *pctrl_entries = (pce_sz
? erts_alloc(ERTS_ALC_T_TMP, pce_sz)
: NULL);
@@ -766,12 +802,12 @@ check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep)
#endif
while (ix != eix) {
- ErtsSysFdType fd = pollset.active_fd.array[ix];
+ ErtsSysFdType fd = psi->active_fd.array[ix];
int nix = ix + 1;
if (nix >= size)
nix = 0;
ASSERT(fd != ERTS_SYS_FD_INVALID);
- if (!check_cleanup_active_fd(fd,
+ if (!check_cleanup_active_fd(psi, fd,
#if ERTS_CIO_DEFER_ACTIVE_EVENTS
pctrl_entries,
&pctrl_ix,
@@ -781,14 +817,14 @@ check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep)
no--;
if (ix == six) {
#ifdef DEBUG
- pollset.active_fd.array[ix] = ERTS_SYS_FD_INVALID;
+ psi->active_fd.array[ix] = ERTS_SYS_FD_INVALID;
#endif
six = nix;
}
else {
- pollset.active_fd.array[ix] = pollset.active_fd.array[six];
+ psi->active_fd.array[ix] = psi->active_fd.array[six];
#ifdef DEBUG
- pollset.active_fd.array[six] = ERTS_SYS_FD_INVALID;
+ psi->active_fd.array[six] = ERTS_SYS_FD_INVALID;
#endif
six++;
if (six >= size)
@@ -801,53 +837,53 @@ check_cleanup_active_fds(erts_aint_t current_cio_time, int may_sleep)
#if ERTS_CIO_DEFER_ACTIVE_EVENTS
ASSERT(pctrl_ix <= pce_sz/sizeof(ErtsPollControlEntry));
if (pctrl_ix)
- ERTS_CIO_POLL_CTLV(pollset.ps, pctrl_entries, pctrl_ix);
+ ERTS_CIO_POLL_CTLV(psi->ps, pctrl_entries, pctrl_ix);
if (pctrl_entries)
erts_free(ERTS_ALC_T_TMP, pctrl_entries);
#endif
- pollset.active_fd.six = six;
- pollset.active_fd.eix = eix;
- erts_atomic32_set_relb(&pollset.active_fd.no, no);
+ psi->active_fd.six = six;
+ psi->active_fd.eix = eix;
+ erts_atomic32_set_relb(&psi->active_fd.no, no);
}
-static void grow_active_fds(void)
+static void grow_active_fds(struct pollset_info *psi)
{
- ASSERT(pollset.active_fd.six == pollset.active_fd.eix);
- pollset.active_fd.six = 0;
- pollset.active_fd.eix = pollset.active_fd.size;
- pollset.active_fd.size += ERTS_ACTIVE_FD_INC;
- pollset.active_fd.array = erts_realloc(ERTS_ALC_T_ACTIVE_FD_ARR,
- pollset.active_fd.array,
- pollset.active_fd.size*sizeof(ErtsSysFdType));
+ ASSERT(psi->active_fd.six == psi->active_fd.eix);
+ psi->active_fd.six = 0;
+ psi->active_fd.eix = psi->active_fd.size;
+ psi->active_fd.size += ERTS_ACTIVE_FD_INC;
+ psi->active_fd.array = erts_realloc(ERTS_ALC_T_ACTIVE_FD_ARR,
+ psi->active_fd.array,
+ psi->active_fd.size*sizeof(ErtsSysFdType));
#ifdef DEBUG
{
int i;
- for (i = pollset.active_fd.eix + 1; i < pollset.active_fd.size; i++)
- pollset.active_fd.array[i] = ERTS_SYS_FD_INVALID;
+ for (i = psi->active_fd.eix + 1; i < psi->active_fd.size; i++)
+ psi->active_fd.array[i] = ERTS_SYS_FD_INVALID;
}
#endif
}
static ERTS_INLINE void
-add_active_fd(ErtsSysFdType fd)
+add_active_fd(struct pollset_info *psi, ErtsSysFdType fd)
{
- int eix = pollset.active_fd.eix;
- int size = pollset.active_fd.size;
+ int eix = psi->active_fd.eix;
+ const int size = psi->active_fd.size;
- pollset.active_fd.array[eix] = fd;
+ psi->active_fd.array[eix] = fd;
- erts_atomic32_set_relb(&pollset.active_fd.no,
- (erts_atomic32_read_dirty(&pollset.active_fd.no)
+ erts_atomic32_set_relb(&psi->active_fd.no,
+ (erts_atomic32_read_dirty(&psi->active_fd.no)
+ 1));
eix++;
if (eix >= size)
eix = 0;
- pollset.active_fd.eix = eix;
+ psi->active_fd.eix = eix;
- if (pollset.active_fd.six == eix) {
- grow_active_fds();
+ if (psi->active_fd.six == eix) {
+ grow_active_fds(psi);
}
}
@@ -862,9 +898,9 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
Eterm id = erts_drvport2id(ix);
ErtsSysFdType fd = (ErtsSysFdType) e;
ErtsPollEvents ctl_events = (ErtsPollEvents) 0;
- ErtsPollEvents new_events, old_events;
+ ErtsPollEvents old_events;
ErtsDrvEventState *state;
- int wake_poller;
+ int wake_poller = 0;
int ret;
ErtsDrvSelectDataState *free_select = NULL;
ErtsNifSelectDataState *free_nif = NULL;
@@ -898,22 +934,25 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
state = hash_get_drv_ev_state(fd); /* may be NULL! */
#endif
- if (!on && (mode&ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) {
- if (IS_FD_UNKNOWN(state)) {
- /* fast track to stop_select callback */
- stop_select_fn = prt->drv_ptr->stop_select;
-#ifdef USE_VM_PROBES
- strncpy(name, prt->drv_ptr->name,
- sizeof(DTRACE_CHARBUF_NAME(name))-1);
- name[sizeof(name)-1] = '\0';
-#endif
- ret = 0;
- goto done_unknown;
- }
- mode |= (ERL_DRV_READ | ERL_DRV_WRITE);
- wake_poller = 1; /* to eject fd from pollset (if needed) */
+ if (!on) {
+ if (IS_FD_UNKNOWN(state)) {
+ if ((mode&ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) {
+ /* fast track to stop_select callback */
+ stop_select_fn = prt->drv_ptr->stop_select;
+ #ifdef USE_VM_PROBES
+ strncpy(name, prt->drv_ptr->name,
+ sizeof(DTRACE_CHARBUF_NAME(name))-1);
+ name[sizeof(name)-1] = '\0';
+ #endif
+ }
+ ret = 0;
+ goto done_unknown;
+ }
+ else if ((mode&ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) {
+ mode |= (ERL_DRV_READ | ERL_DRV_WRITE);
+ wake_poller = 1; /* to eject fd from pollset (if needed) */
+ }
}
- else wake_poller = 0;
#ifndef ERTS_SYS_CONTINOUS_FD_NUMBERS
if (state == NULL) {
@@ -951,7 +990,7 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
if (owner != id && is_not_nil(owner))
drv_select_steal(ix, state, mode, on);
}
- ctl_events |= ERTS_POLL_EV_IN;
+ ctl_events = ERTS_POLL_EV_IN;
}
if (mode & ERL_DRV_WRITE) {
if (state->type == ERTS_EV_TYPE_DRV_SEL) {
@@ -962,44 +1001,61 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
ctl_events |= ERTS_POLL_EV_OUT;
}
+
ASSERT((state->type == ERTS_EV_TYPE_DRV_SEL) ||
(state->type == ERTS_EV_TYPE_NONE && !state->events));
- if (!on && !(state->flags & ERTS_EV_FLAG_USED)
- && state->events && !(state->events & ~ctl_events)) {
- /* Old driver removing all events. At least wake poller.
- It will not make close() 100% safe but it will prevent
- actions delayed by poll timeout. */
- wake_poller = 1;
- }
+ old_events = state->events;
- new_events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, ctl_events, on, &wake_poller);
+ if (on) {
+ ctl_events &= ~old_events;
+ state->events |= ctl_events;
+ }
+ else {
+ ctl_events &= old_events;
+ state->events &= ~ctl_events;
- if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) {
- if (state->type == ERTS_EV_TYPE_DRV_SEL && !state->events) {
- state->type = ERTS_EV_TYPE_NONE;
- state->flags &= ~ERTS_EV_FLAG_USED;
- state->driver.select->inport = NIL;
- state->driver.select->outport = NIL;
- }
- ret = -1;
- goto done;
+ if (!(state->flags & ERTS_EV_FLAG_USED)
+ && old_events && !state->events) {
+ /*
+ * Old driver removing all events. At least wake poller.
+ * It will not make close() 100% safe but it will prevent
+ * actions delayed by poll timeout.
+ */
+ wake_poller = 1;
+ }
}
- old_events = state->events;
+ if (ctl_events) {
+ ErtsPollEvents new_events;
- ASSERT(on
- ? (new_events == (state->events | ctl_events))
- : (new_events == (state->events & ~ctl_events)));
+ if (!state->pollset) {
+ ErtsSchedulerData* esdp = erts_get_scheduler_data();
+ ASSERT(esdp);
+ state->pollset = esdp->pollset;
+ }
- ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL
- || state->type == ERTS_EV_TYPE_NONE);
+ new_events = ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, ctl_events, on, &wake_poller);
- state->events = new_events;
- if (ctl_events) {
- if (on) {
+ if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) {
+ if (state->type == ERTS_EV_TYPE_DRV_SEL && !old_events) {
+ state->type = ERTS_EV_TYPE_NONE;
+ state->flags = 0;
+ state->driver.select->inport = NIL;
+ state->driver.select->outport = NIL;
+ }
+ ret = -1;
+ goto done;
+ }
+
+ ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL
+ || state->type == ERTS_EV_TYPE_NONE);
+ }
+
+ if (on) {
+ if (ctl_events) {
if (!state->driver.select)
- state->driver.select = alloc_drv_select_data();
+ state->driver.select = alloc_drv_select_data(state->pollset);
if (state->type == ERTS_EV_TYPE_NONE)
state->type = ERTS_EV_TYPE_DRV_SEL;
ASSERT(state->type == ERTS_EV_TYPE_DRV_SEL);
@@ -1010,8 +1066,9 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
if (mode & ERL_DRV_USE) {
state->flags |= ERTS_EV_FLAG_USED;
}
- }
- else { /* off */
+ }
+ }
+ else { /* off */
if (state->type == ERTS_EV_TYPE_DRV_SEL) {
if (ctl_events & ERTS_POLL_EV_IN) {
abort_tasks(state, ERL_DRV_READ);
@@ -1021,20 +1078,20 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
abort_tasks(state, ERL_DRV_WRITE);
state->driver.select->outport = NIL;
}
- if (new_events == 0) {
+ if (state->events == 0) {
if (old_events != 0) {
- remember_removed(state, &pollset);
+ remember_removed(state);
}
if ((mode & ERL_DRV_USE) || !(state->flags & ERTS_EV_FLAG_USED)) {
state->type = ERTS_EV_TYPE_NONE;
- state->flags &= ~ERTS_EV_FLAG_USED;
+ state->flags = 0;
}
/*else keep it, as fd will probably be selected upon again */
}
}
if ((mode & ERL_DRV_USE_NO_CALLBACK) == ERL_DRV_USE) {
erts_driver_t* drv_ptr = prt->drv_ptr;
- ASSERT(new_events==0);
+ ASSERT(state->events==0);
if (state->remove_cnt == 0 || !wake_poller) {
/* Safe to close fd now as it is not in pollset
or there was no need to eject fd (kernel poll) */
@@ -1053,7 +1110,6 @@ ERTS_CIO_EXPORT(driver_select)(ErlDrvPort ix,
}
}
}
- }
}
ret = 0;
@@ -1093,7 +1149,7 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
ErtsResource* resource = DATA_TO_RESOURCE(obj);
ErtsSysFdType fd = (ErtsSysFdType) e;
ErtsPollEvents ctl_events = (ErtsPollEvents) 0;
- ErtsPollEvents new_events, old_events;
+ ErtsPollEvents old_events;
ErtsDrvEventState *state;
int wake_poller;
int ret;
@@ -1192,32 +1248,45 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
ASSERT((state->type == ERTS_EV_TYPE_NIF) ||
(state->type == ERTS_EV_TYPE_NONE && !state->events));
- new_events = ERTS_CIO_POLL_CTL(pollset.ps, state->fd, ctl_events, on, &wake_poller);
+ old_events = state->events;
- if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) {
- if (state->type == ERTS_EV_TYPE_NIF && !state->events) {
- state->type = ERTS_EV_TYPE_NONE;
- state->flags &= ~ERTS_EV_FLAG_USED;
- state->driver.nif->in.pid = NIL;
- state->driver.nif->out.pid = NIL;
- state->driver.nif->in.ddeselect_cnt = 0;
- state->driver.nif->out.ddeselect_cnt = 0;
- state->driver.stop.resource = NULL;
- }
- ret = INT_MIN | ERL_NIF_SELECT_FAILED;
- goto done;
+ if (on) {
+ ctl_events &= ~old_events;
+ state->events |= ctl_events;
+ }
+ else {
+ ctl_events &= old_events;
+ state->events &= ~ctl_events;
}
- old_events = state->events;
+ if (ctl_events) {
+ ErtsPollEvents new_events;
+
+ if (!state->pollset) {
+ state->pollset = erts_get_scheduler_data()->pollset;
+ }
- ASSERT(on
- ? (new_events == (state->events | ctl_events))
- : (new_events == (state->events & ~ctl_events)));
+ new_events = ERTS_CIO_POLL_CTL(state->pollset->ps, state->fd, ctl_events, on, &wake_poller);
+
+ if (new_events & (ERTS_POLL_EV_ERR|ERTS_POLL_EV_NVAL)) {
+ if (state->type == ERTS_EV_TYPE_NIF && !old_events) {
+ state->type = ERTS_EV_TYPE_NONE;
+ state->flags = 0;
+ state->driver.nif->in.pid = NIL;
+ state->driver.nif->out.pid = NIL;
+ state->driver.nif->in.ddeselect_cnt = 0;
+ state->driver.nif->out.ddeselect_cnt = 0;
+ state->driver.stop.resource = NULL;
+ }
+ ret = INT_MIN | ERL_NIF_SELECT_FAILED;
+ goto done;
+ }
+ ASSERT(new_events == state->events);
+ }
ASSERT(state->type == ERTS_EV_TYPE_NIF
|| state->type == ERTS_EV_TYPE_NONE);
- state->events = new_events;
if (on) {
const Eterm recipient = pid ? pid->pid : env->proc->common.id;
Uint32* refn;
@@ -1230,7 +1299,7 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
}
ASSERT(state->type == ERTS_EV_TYPE_NIF);
ASSERT(state->driver.stop.resource == resource);
- if (ctl_events & ERTS_POLL_EV_IN) {
+ if (mode & ERL_DRV_READ) {
state->driver.nif->in.pid = recipient;
if (is_immed(ref)) {
state->driver.nif->in.immed = ref;
@@ -1244,7 +1313,7 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
}
state->driver.nif->in.ddeselect_cnt = 0;
}
- if (ctl_events & ERTS_POLL_EV_OUT) {
+ if (mode & ERL_DRV_WRITE) {
state->driver.nif->out.pid = recipient;
if (is_immed(ref)) {
state->driver.nif->out.immed = ref;
@@ -1267,10 +1336,10 @@ ERTS_CIO_EXPORT(enif_select)(ErlNifEnv* env,
state->driver.nif->in.ddeselect_cnt = 0;
state->driver.nif->out.ddeselect_cnt = 0;
if (old_events != 0) {
- remember_removed(state, &pollset);
+ remember_removed(state);
}
}
- ASSERT(new_events==0);
+ ASSERT(state->events==0);
if (state->remove_cnt == 0 || !wake_poller) {
/*
* Safe to close fd now as it is not in pollset
@@ -1567,7 +1636,7 @@ steal_pending_stop_use(erts_dsprintf_buf_t *dsbufp, ErlDrvPort ix,
erts_ddll_dereference_driver(state->driver.stop.drv_ptr->handle);
}
state->type = ERTS_EV_TYPE_NONE;
- state->flags &= ~ERTS_EV_FLAG_USED;
+ state->flags = 0;
state->driver.stop.drv_ptr = NULL;
}
else {
@@ -1606,7 +1675,7 @@ steal_pending_stop_nif(erts_dsprintf_buf_t *dsbufp, ErtsResource* resource,
enif_release_resource(state->driver.stop.resource);
state->type = ERTS_EV_TYPE_NONE;
- state->flags &= ~ERTS_EV_FLAG_USED;
+ state->flags = 0;
state->driver.stop.resource = NULL;
}
else {
@@ -1656,7 +1725,7 @@ iready(Eterm id, ErtsDrvEventState *state, erts_aint_t current_cio_time)
(ErlDrvEvent) state->fd) != 0) {
stale_drv_select(id, state, ERL_DRV_READ);
}
- add_active_fd(state->fd);
+ add_active_fd(state->pollset, state->fd);
}
}
@@ -1674,7 +1743,7 @@ oready(Eterm id, ErtsDrvEventState *state, erts_aint_t current_cio_time)
(ErlDrvEvent) state->fd) != 0) {
stale_drv_select(id, state, ERL_DRV_WRITE);
}
- add_active_fd(state->fd);
+ add_active_fd(state->pollset, state->fd);
}
}
@@ -1729,19 +1798,20 @@ send_event_tuple(struct erts_nif_select_event* e, ErtsResource* resource,
static void bad_fd_in_pollset(ErtsDrvEventState *, Eterm inport, Eterm outport);
void
-ERTS_CIO_EXPORT(erts_check_io_interrupt)(int set)
+ERTS_CIO_EXPORT(erts_check_io_interrupt)(struct pollset_info *psi, int set)
{
- ERTS_CIO_POLL_INTR(pollset.ps, set);
+ ERTS_CIO_POLL_INTR(psi->ps, set);
}
void
-ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(int set,
+ERTS_CIO_EXPORT(erts_check_io_interrupt_timed)(struct pollset_info *psi,
+ int set,
ErtsMonotonicTime timeout_time)
{
- ERTS_CIO_POLL_INTR_TMD(pollset.ps, set, timeout_time);
+ ERTS_CIO_POLL_INTR_TMD(psi->ps, set, timeout_time);
}
-#if !ERTS_CIO_DEFER_ACTIVE_EVENTS
+#ifndef __WIN32__
/*
* Number of ignored events, for a lingering fd added by enif_select(),
* until we deselect fd-event from pollset.
@@ -1761,8 +1831,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
int poll_ret, i;
erts_aint_t current_cio_time;
ErtsSchedulerData *esdp = erts_get_scheduler_data();
-
- ASSERT(esdp);
+ struct pollset_info *psi = esdp->pollset;
restart:
@@ -1781,24 +1850,25 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
* erts_check_io_time, since only one thread can
* check io at a time.
*/
- current_cio_time = erts_atomic_read_dirty(&erts_check_io_time);
+ current_cio_time = erts_atomic_read_dirty(&psi->check_io_time);
current_cio_time++;
- erts_atomic_set_relb(&erts_check_io_time, current_cio_time);
+ erts_atomic_set_relb(&psi->check_io_time, current_cio_time);
- check_cleanup_active_fds(current_cio_time,
+ check_cleanup_active_fds(psi,
+ current_cio_time,
timeout_time != ERTS_POLL_NO_TIMEOUT);
#ifdef ERTS_ENABLE_LOCK_CHECK
erts_lc_check_exact(NULL, 0); /* No locks should be locked */
#endif
- pollres_len = erts_atomic32_read_dirty(&pollset.active_fd.no) + ERTS_CHECK_IO_POLL_RES_LEN;
+ pollres_len = erts_atomic32_read_dirty(&psi->active_fd.no) + ERTS_CHECK_IO_POLL_RES_LEN;
pollres = erts_alloc(ERTS_ALC_T_TMP, sizeof(ErtsPollResFd)*pollres_len);
- erts_atomic_set_nob(&pollset.in_poll_wait, 1);
+ erts_atomic_set_nob(&psi->in_poll_wait, 1);
- poll_ret = ERTS_CIO_POLL_WAIT(pollset.ps, pollres, &pollres_len, timeout_time);
+ poll_ret = ERTS_CIO_POLL_WAIT(psi->ps, pollres, &pollres_len, timeout_time);
#ifdef ERTS_ENABLE_LOCK_CHECK
erts_lc_check_exact(NULL, 0); /* No locks should be locked */
@@ -1810,8 +1880,8 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
#endif
if (poll_ret != 0) {
- erts_atomic_set_nob(&pollset.in_poll_wait, 0);
- forget_removed(&pollset);
+ erts_atomic_set_nob(&psi->in_poll_wait, 0);
+ forget_removed(psi);
erts_free(ERTS_ALC_T_TMP, pollres);
if (poll_ret == EAGAIN) {
goto restart;
@@ -1838,6 +1908,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
erts_mtx_lock(fd_mtx(fd));
+
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
state = &drv_ev_state.v[ (int) fd];
#else
@@ -1848,7 +1919,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
#endif
/* Skip this fd if it was removed from pollset */
- if (is_removed(state)) {
+ if (is_removed(state) || state->pollset != psi) {
goto next_pollres;
}
@@ -1884,7 +1955,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
bad_fd_in_pollset(state,
state->driver.select->inport,
state->driver.select->outport);
- add_active_fd(state->fd);
+ add_active_fd(psi, state->fd);
}
break;
}
@@ -1915,7 +1986,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
resource = state->driver.stop.resource;
state->driver.nif->out.ddeselect_cnt = ERTS_NIF_DELAYED_DESELECT;
state->driver.nif->out.pid = NIL;
- add_active_fd(state->fd);
+ add_active_fd(psi, state->fd);
}
else {
ASSERT(state->driver.nif->out.ddeselect_cnt >= 2);
@@ -1928,7 +1999,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
resource = state->driver.stop.resource;
state->driver.nif->in.ddeselect_cnt = ERTS_NIF_DELAYED_DESELECT;
state->driver.nif->in.pid = NIL;
- add_active_fd(state->fd);
+ add_active_fd(psi, state->fd);
}
else {
ASSERT(state->driver.nif->in.ddeselect_cnt >= 2);
@@ -1938,7 +2009,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
}
else if (revents & ERTS_POLL_EV_NVAL) {
bad_fd_in_pollset(state, NIL, NIL);
- add_active_fd(state->fd);
+ add_active_fd(psi, state->fd);
}
erts_mtx_unlock(fd_mtx(fd));
@@ -1963,7 +2034,7 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
(int) state->type);
ASSERT(0);
deselect(state, 0);
- add_active_fd(state->fd);
+ add_active_fd(psi, state->fd);
break;
}
}
@@ -1973,9 +2044,9 @@ ERTS_CIO_EXPORT(erts_check_io)(int do_wait)
next_pollres_unlocked:;
}
- erts_atomic_set_nob(&pollset.in_poll_wait, 0);
+ erts_atomic_set_nob(&psi->in_poll_wait, 0);
erts_free(ERTS_ALC_T_TMP, pollres);
- forget_removed(&pollset);
+ forget_removed(psi);
}
static void
@@ -2096,16 +2167,17 @@ static void drv_ev_state_free(void *des)
}
#endif
-
#ifdef ERTS_ENABLE_KERNEL_POLL
struct io_functions {
int (*select)(ErlDrvPort, ErlDrvEvent, int, int);
int (*enif_select)(ErlNifEnv*, ErlNifEvent, enum ErlNifSelectFlags, void*, const ErlNifPid*, Eterm);
- void (*check_io_as_interrupt)(void);
- void (*check_io_interrupt)(int);
- void (*check_io_interrupt_tmd)(int, ErtsMonotonicTime);
+ void (*check_io_as_interrupt)(struct pollset_info*);
+ void (*check_io_interrupt)(struct pollset_info*, int);
+ void (*check_io_interrupt_tmd)(struct pollset_info*, int, ErtsMonotonicTime);
void (*check_io)(int);
+ struct pollset_info* (*get_pollset)(int sched_nr);
+ void (*notify_port_task_executed)(ErtsPortTaskHandle *pthp);
int (*max_files)(void);
Uint (*size)(void);
Eterm (*info)(void *);
@@ -2126,12 +2198,11 @@ extern struct io_functions erts_io_funcs;
void
ERTS_CIO_EXPORT(erts_init_check_io)(void)
{
+ int j;
ERTS_CT_ASSERT((INT_MIN & (ERL_NIF_SELECT_STOP_CALLED |
ERL_NIF_SELECT_STOP_SCHEDULED |
ERL_NIF_SELECT_INVALID_EVENT |
ERL_NIF_SELECT_FAILED)) == 0);
- erts_atomic_init_nob(&erts_check_io_time, 0);
- erts_atomic_init_nob(&pollset.in_poll_wait, 0);
#ifdef ERTS_ENABLE_KERNEL_POLL
ASSERT(erts_io_funcs.select == NULL);
@@ -2140,6 +2211,8 @@ ERTS_CIO_EXPORT(erts_init_check_io)(void)
erts_io_funcs.check_io_interrupt = ERTS_CIO_EXPORT(erts_check_io_interrupt);
erts_io_funcs.check_io_interrupt_tmd= ERTS_CIO_EXPORT(erts_check_io_interrupt_timed);
erts_io_funcs.check_io = ERTS_CIO_EXPORT(erts_check_io);
+ erts_io_funcs.get_pollset = ERTS_CIO_EXPORT(erts_get_pollset);
+ erts_io_funcs.notify_port_task_executed = ERTS_CIO_EXPORT(erts_io_notify_port_task_executed);
erts_io_funcs.max_files = ERTS_CIO_EXPORT(erts_check_io_max_files);
erts_io_funcs.size = ERTS_CIO_EXPORT(erts_check_io_size);
erts_io_funcs.info = ERTS_CIO_EXPORT(erts_check_io_info);
@@ -2149,25 +2222,34 @@ ERTS_CIO_EXPORT(erts_init_check_io)(void)
#endif
#endif
+ init_removed_fd_alloc();
+
ERTS_CIO_POLL_INIT();
- pollset.ps = ERTS_CIO_NEW_POLLSET();
-
- pollset.active_fd.six = 0;
- pollset.active_fd.eix = 0;
- erts_atomic32_init_nob(&pollset.active_fd.no, 0);
- pollset.active_fd.size = ERTS_ACTIVE_FD_INC;
- pollset.active_fd.array = erts_alloc(ERTS_ALC_T_ACTIVE_FD_ARR,
- sizeof(ErtsSysFdType)*ERTS_ACTIVE_FD_INC);
+ pollsetv = erts_alloc(ERTS_ALC_T_POLLSET,
+ sizeof(struct pollset_info)*erts_no_schedulers);
+ for (j=0; j < erts_no_schedulers; j++) {
+ struct pollset_info* psi = &pollsetv[j];
+
+ erts_atomic_init_nob(&psi->check_io_time, 0);
+ erts_atomic_init_nob(&psi->in_poll_wait, 0);
+ psi->ps = ERTS_CIO_NEW_POLLSET();
+ psi->active_fd.six = 0;
+ psi->active_fd.eix = 0;
+ erts_atomic32_init_nob(&psi->active_fd.no, 0);
+ psi->active_fd.size = ERTS_ACTIVE_FD_INC;
+ psi->active_fd.array = erts_alloc(ERTS_ALC_T_ACTIVE_FD_ARR,
+ sizeof(ErtsSysFdType)*ERTS_ACTIVE_FD_INC);
#ifdef DEBUG
- {
- int i;
- for (i = 0; i < ERTS_ACTIVE_FD_INC; i++)
- pollset.active_fd.array[i] = ERTS_SYS_FD_INVALID;
- }
+ {
+ int i;
+ for (i = 0; i < ERTS_ACTIVE_FD_INC; i++)
+ psi->active_fd.array[i] = ERTS_SYS_FD_INVALID;
+ }
#endif
- init_removed_fd_alloc();
- erts_atomic_init_nob(&pollset.removed_list, (erts_aint_t)NULL);
+ erts_atomic_init_nob(&psi->removed_list, (erts_aint_t)NULL);
+ }
+
{
int i;
for (i=0; i<DRV_EV_STATE_LOCK_CNT; i++) {
@@ -2211,10 +2293,14 @@ ERTS_CIO_EXPORT(erts_check_io_max_files)(void)
Uint
ERTS_CIO_EXPORT(erts_check_io_size)(void)
{
- Uint res;
+ Uint res = 0;
ErtsPollInfo pi;
- ERTS_CIO_POLL_INFO(pollset.ps, &pi);
- res = pi.memory_size;
+ int i;
+
+ for (i = 0; i < erts_no_schedulers; i++) {
+ ERTS_CIO_POLL_INFO(pollsetv[i].ps, &pi);
+ res += pi.memory_size;
+ }
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
res += sizeof(ErtsDrvEventState) * erts_atomic_read_nob(&drv_ev_state.len);
#else
@@ -2235,105 +2321,122 @@ Eterm
ERTS_CIO_EXPORT(erts_check_io_info)(void *proc)
{
Process *p = (Process *) proc;
- Eterm tags[16], values[16], res;
- Uint sz, *szp, *hp, **hpp, memory_size;
- Sint i;
- ErtsPollInfo pi;
- erts_aint_t cio_time = erts_atomic_read_acqb(&erts_check_io_time);
- int active_fds = (int) erts_atomic32_read_acqb(&pollset.active_fd.no);
-
- while (1) {
- erts_aint_t post_cio_time;
- int post_active_fds;
-
- ERTS_CIO_POLL_INFO(pollset.ps, &pi);
-
- post_cio_time = erts_atomic_read_mb(&erts_check_io_time);
- post_active_fds = (int) erts_atomic32_read_acqb(&pollset.active_fd.no);
- if (cio_time == post_cio_time && active_fds == post_active_fds)
- break;
- cio_time = post_cio_time;
- active_fds = post_active_fds;
- }
+ Eterm tags[16], values[16], res, list = NIL;
+ Uint sz, *szp, *hp, **hpp;
+ ErtsPollInfo *piv;
+ Sint i, j;
+
+ piv = erts_alloc(ERTS_ALC_T_TMP,
+ sizeof(ErtsPollInfo) * erts_no_schedulers);
+
+ for (j = 0; j < erts_no_schedulers; j++) {
+ struct pollset_info *psi = &pollsetv[j];
+ erts_aint_t cio_time = erts_atomic_read_acqb(&psi->check_io_time);
+
+ piv[j].active_fds = (int) erts_atomic32_read_acqb(&psi->active_fd.no);
+ while (1) {
+ erts_aint_t post_cio_time;
+ int post_active_fds;
+
+ ERTS_CIO_POLL_INFO(psi->ps, &piv[j]);
+
+ post_cio_time = erts_atomic_read_mb(&psi->check_io_time);
+ post_active_fds = (int) erts_atomic32_read_acqb(&psi->active_fd.no);
+ if (cio_time == post_cio_time && piv[j].active_fds == post_active_fds)
+ break;
+ cio_time = post_cio_time;
+ piv[j].active_fds = post_active_fds;
+ }
- memory_size = pi.memory_size;
-#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- memory_size += sizeof(ErtsDrvEventState) * erts_atomic_read_nob(&drv_ev_state.len);
-#else
- memory_size += safe_hash_table_sz(&drv_ev_state.tab);
- {
- SafeHashInfo hi;
- safe_hash_get_info(&hi, &drv_ev_state.tab);
- memory_size += hi.objs * sizeof(ErtsDrvEventState);
+ #ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
+ piv[j].memory_size += sizeof(ErtsDrvEventState) * erts_atomic_read_nob(&drv_ev_state.len);
+ #else
+ piv[j].memory_size += safe_hash_table_sz(&drv_ev_state.tab);
+ {
+ SafeHashInfo hi;
+ safe_hash_get_info(&hi, &drv_ev_state.tab);
+ piv[j].memory_size += hi.objs * sizeof(ErtsDrvEventState);
+ }
+ erts_spin_lock(&drv_ev_state.prealloc_lock);
+ piv[j].memory_size += drv_ev_state.num_prealloc * sizeof(ErtsDrvEventState);
+ erts_spin_unlock(&drv_ev_state.prealloc_lock);
+ #endif
}
- erts_spin_lock(&drv_ev_state.prealloc_lock);
- memory_size += drv_ev_state.num_prealloc * sizeof(ErtsDrvEventState);
- erts_spin_unlock(&drv_ev_state.prealloc_lock);
-#endif
hpp = NULL;
szp = &sz;
sz = 0;
bld_it:
- i = 0;
- tags[i] = erts_bld_atom(hpp, szp, "name");
- values[i++] = erts_bld_atom(hpp, szp, "erts_poll");
+ for (j = erts_no_schedulers-1; j >= 0; j--) {
+ i = 0;
- tags[i] = erts_bld_atom(hpp, szp, "primary");
- values[i++] = erts_bld_atom(hpp, szp, pi.primary);
+ tags[i] = erts_bld_atom(hpp, szp, "name");
+ values[i++] = erts_bld_atom(hpp, szp, "erts_poll");
- tags[i] = erts_bld_atom(hpp, szp, "fallback");
- values[i++] = erts_bld_atom(hpp, szp, pi.fallback ? pi.fallback : "false");
+ tags[i] = erts_bld_atom(hpp, szp, "primary");
+ values[i++] = erts_bld_atom(hpp, szp, piv[j].primary);
- tags[i] = erts_bld_atom(hpp, szp, "kernel_poll");
- values[i++] = erts_bld_atom(hpp, szp,
- pi.kernel_poll ? pi.kernel_poll : "false");
+ tags[i] = erts_bld_atom(hpp, szp, "fallback");
+ values[i++] = erts_bld_atom(hpp, szp, piv[j].fallback ? piv[j].fallback : "false");
- tags[i] = erts_bld_atom(hpp, szp, "memory_size");
- values[i++] = erts_bld_uint(hpp, szp, memory_size);
+ tags[i] = erts_bld_atom(hpp, szp, "kernel_poll");
+ values[i++] = erts_bld_atom(hpp, szp,
+ piv[j].kernel_poll ? piv[j].kernel_poll : "false");
- tags[i] = erts_bld_atom(hpp, szp, "total_poll_set_size");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.poll_set_size);
+ tags[i] = erts_bld_atom(hpp, szp, "memory_size");
+ values[i++] = erts_bld_uint(hpp, szp, piv[j].memory_size);
- if (pi.fallback) {
- tags[i] = erts_bld_atom(hpp, szp, "fallback_poll_set_size");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.fallback_poll_set_size);
- }
+ tags[i] = erts_bld_atom(hpp, szp, "total_poll_set_size");
+ values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].poll_set_size);
+
+ if (piv[j].fallback) {
+ tags[i] = erts_bld_atom(hpp, szp, "fallback_poll_set_size");
+ values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].fallback_poll_set_size);
+ }
- tags[i] = erts_bld_atom(hpp, szp, "lazy_updates");
- values[i++] = pi.lazy_updates ? am_true : am_false;
+ tags[i] = erts_bld_atom(hpp, szp, "lazy_updates");
+ values[i++] = piv[j].lazy_updates ? am_true : am_false;
- if (pi.lazy_updates) {
- tags[i] = erts_bld_atom(hpp, szp, "pending_updates");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.pending_updates);
- }
+ if (piv[j].lazy_updates) {
+ tags[i] = erts_bld_atom(hpp, szp, "pending_updates");
+ values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].pending_updates);
+ }
- tags[i] = erts_bld_atom(hpp, szp, "batch_updates");
- values[i++] = pi.batch_updates ? am_true : am_false;
+ tags[i] = erts_bld_atom(hpp, szp, "batch_updates");
+ values[i++] = piv[j].batch_updates ? am_true : am_false;
- tags[i] = erts_bld_atom(hpp, szp, "concurrent_updates");
- values[i++] = pi.concurrent_updates ? am_true : am_false;
+ tags[i] = erts_bld_atom(hpp, szp, "concurrent_updates");
+ values[i++] = piv[j].concurrent_updates ? am_true : am_false;
- tags[i] = erts_bld_atom(hpp, szp, "max_fds");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.max_fds);
+ tags[i] = erts_bld_atom(hpp, szp, "max_fds");
+ values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].max_fds);
- tags[i] = erts_bld_atom(hpp, szp, "active_fds");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) active_fds);
+ tags[i] = erts_bld_atom(hpp, szp, "active_fds");
+ values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].active_fds);
-#ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS
- tags[i] = erts_bld_atom(hpp, szp, "no_avoided_wakeups");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.no_avoided_wakeups);
+ #ifdef ERTS_POLL_COUNT_AVOIDED_WAKEUPS
+ tags[i] = erts_bld_atom(hpp, szp, "no_avoided_wakeups");
+ values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].no_avoided_wakeups);
- tags[i] = erts_bld_atom(hpp, szp, "no_avoided_interrupts");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.no_avoided_interrupts);
+ tags[i] = erts_bld_atom(hpp, szp, "no_avoided_interrupts");
+ values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].no_avoided_interrupts);
- tags[i] = erts_bld_atom(hpp, szp, "no_interrupt_timed");
- values[i++] = erts_bld_uint(hpp, szp, (Uint) pi.no_interrupt_timed);
-#endif
+ tags[i] = erts_bld_atom(hpp, szp, "no_interrupt_timed");
+ values[i++] = erts_bld_uint(hpp, szp, (Uint) piv[j].no_interrupt_timed);
+ #endif
- res = erts_bld_2tup_list(hpp, szp, i, tags, values);
+ res = erts_bld_2tup_list(hpp, szp, i, tags, values);
+
+ if (!hpp) {
+ *szp += 2;
+ }
+ else {
+ list = CONS(*hpp, res, list);
+ *hpp += 2;
+ }
+ }
if (!hpp) {
hp = HAlloc(p, sz);
@@ -2342,7 +2445,9 @@ ERTS_CIO_EXPORT(erts_check_io_info)(void *proc)
goto bld_it;
}
- return res;
+ erts_free(ERTS_ALC_T_TMP, piv);
+
+ return list;
}
static ERTS_INLINE ErtsPollEvents
@@ -2373,6 +2478,23 @@ print_events(ErtsPollEvents ev)
return ev;
}
+static ERTS_INLINE void
+print_flags(EventStateFlags f)
+{
+ const char* delim = "";
+ if(f & ERTS_EV_FLAG_USED) {
+ erts_printf("%s","USED");
+ delim = "|";
+ }
+ if(f & ERTS_EV_FLAG_DEFER_IN_EV) {
+ erts_printf("%s%s", delim, "DRIN");
+ delim = "|";
+ }
+ if(f & ERTS_EV_FLAG_DEFER_OUT_EV) {
+ erts_printf("%s%s", delim, "DROUT");
+ }
+}
+
typedef struct {
int used_fds;
int num_errors;
@@ -2419,7 +2541,8 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters)
counters->used_fds++;
#endif
- erts_printf("fd=%d ", (int) fd);
+ erts_printf("pollset=%d fd=%d ",
+ (int)(state->pollset - pollsetv), (int) fd);
#if defined(HAVE_FSTAT) && !defined(NO_FSTAT_ON_SYS_FD_TYPE)
if (fstat((int) fd, &stat_buf) < 0)
@@ -2497,7 +2620,15 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters)
err = 1;
}
else {
- err = 1;
+ ErtsPollEvents ev = cio_events;
+#if ERTS_CIO_DEFER_ACTIVE_EVENTS
+ if (state->flags & ERTS_EV_FLAG_DEFER_IN_EV)
+ ev &= ~ERTS_POLL_EV_IN;
+ if (state->flags & ERTS_EV_FLAG_DEFER_OUT_EV)
+ ev &= ~ERTS_POLL_EV_OUT;
+#endif
+ if (ev != ep_events)
+ err = 1;
erts_printf("cio_ev=");
print_events(cio_events);
erts_printf(" ep_ev=");
@@ -2607,6 +2738,8 @@ static void doit_erts_check_io_debug(void *vstate, void *vcounters)
#endif
}
+ erts_printf(" flags="); print_flags(state->flags);
+
if (err) {
counters->num_errors++;
erts_printf(" ERROR");
@@ -2619,18 +2752,23 @@ int
ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip)
{
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- int fd, len;
+ int fd, len, i;
#endif
- IterDebugCounters counters;
+ IterDebugCounters counters = {0};
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
ErtsDrvEventState null_des;
+ null_des.pollset = NULL;
null_des.driver.select = NULL;
null_des.driver.nif = NULL;
null_des.driver.stop.drv_ptr = NULL;
null_des.events = 0;
null_des.remove_cnt = 0;
null_des.type = ERTS_EV_TYPE_NONE;
+ null_des.flags = 0;
+
+ counters.epep = erts_alloc(ERTS_ALC_T_TMP,
+ sizeof(ErtsPollEvents)*drv_ev_state.max_fds);
#endif
erts_printf("--- fds in pollset --------------------------------------\n");
@@ -2642,28 +2780,25 @@ ERTS_CIO_EXPORT(erts_check_io_debug)(ErtsCheckIoDebugInfo *ciodip)
erts_thr_progress_block(); /* stop the world to avoid messy locking */
#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
- counters.epep = erts_alloc(ERTS_ALC_T_TMP, sizeof(ErtsPollEvents)*drv_ev_state.max_fds);
- ERTS_POLL_EXPORT(erts_poll_get_selected_events)(pollset.ps, counters.epep, drv_ev_state.max_fds);
- counters.internal_fds = 0;
-#endif
- counters.used_fds = 0;
- counters.num_errors = 0;
- counters.no_driver_select_structs = 0;
- counters.no_enif_select_structs = 0;
-
-#ifdef ERTS_SYS_CONTINOUS_FD_NUMBERS
len = erts_atomic_read_nob(&drv_ev_state.len);
- for (fd = 0; fd < len; fd++) {
- doit_erts_check_io_debug((void *) &drv_ev_state.v[fd], (void *) &counters);
+
+ for (i = 0; i < erts_no_schedulers; i++) {
+ ERTS_POLL_EXPORT(erts_poll_get_selected_events)(pollsetv[i].ps,
+ counters.epep,
+ drv_ev_state.max_fds);
+ for (fd = 0; fd < len; fd++) {
+ if (drv_ev_state.v[fd].pollset == &pollsetv[i])
+ doit_erts_check_io_debug(&drv_ev_state.v[fd], &counters);
+ }
}
- for ( ; fd < drv_ev_state.max_fds; fd++) {
- null_des.fd = fd;
- doit_erts_check_io_debug((void *) &null_des, (void *) &counters);
+ for (fd = len ; fd < drv_ev_state.max_fds; fd++) {
+ null_des.fd = fd;
+ doit_erts_check_io_debug(&null_des, &counters);
}
#else
- safe_hash_for_each(&drv_ev_state.tab, &doit_erts_check_io_debug, (void *) &counters);
+ safe_hash_for_each(&drv_ev_state.tab, &doit_erts_check_io_debug,
+ &counters);
#endif
-
erts_thr_progress_unblock();
ciodip->no_used_fds = counters.used_fds;
@@ -2696,7 +2831,6 @@ void ERTS_CIO_EXPORT(erts_lcnt_update_cio_locks)(int enable) {
}
#endif /* ERTS_ENABLE_LOCK_COUNT */
-
#ifdef ERTS_ENABLE_KERNEL_POLL
# ifdef ERTS_KERNEL_POLL_VERSION
@@ -2724,6 +2858,16 @@ int enif_select(ErlNifEnv* env, ErlNifEvent event,
return (*erts_io_funcs.enif_select)(env, event, flags, obj, pid, ref);
}
+struct pollset_info* erts_get_pollset(int sched_nr)
+{
+ return (*erts_io_funcs.get_pollset)(sched_nr);
+}
+
+void erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp)
+{
+ erts_io_funcs.notify_port_task_executed(pthp);
+}
+
void erts_check_io(int do_wait)
{
erts_io_funcs.check_io(do_wait);
@@ -2751,15 +2895,15 @@ erts_check_io_debug(ErtsCheckIoDebugInfo *ip)
return (*erts_io_funcs.check_io_debug)(ip);
}
-void erts_check_io_interrupt(int set)
+void erts_check_io_interrupt(struct pollset_info* psi, int set)
{
- erts_io_funcs.check_io_interrupt(set);
+ erts_io_funcs.check_io_interrupt(psi, set);
}
-void erts_check_io_interrupt_timed(int set,
+void erts_check_io_interrupt_timed(struct pollset_info* psi, int set,
ErtsMonotonicTime timeout_time)
{
- erts_io_funcs.check_io_interrupt_tmd(set, timeout_time);
+ erts_io_funcs.check_io_interrupt_tmd(psi, set, timeout_time);
}
#ifdef ERTS_ENABLE_LOCK_COUNT
diff --git a/erts/emulator/sys/common/erl_check_io.h b/erts/emulator/sys/common/erl_check_io.h
index f4d7983002..ab53d91756 100644
--- a/erts/emulator/sys/common/erl_check_io.h
+++ b/erts/emulator/sys/common/erl_check_io.h
@@ -30,38 +30,29 @@
#include "sys.h"
#include "erl_sys_driver.h"
+struct pollset_info;
+
Uint erts_check_io_size(void);
Eterm erts_check_io_info(void *);
+void erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp);
+void erts_check_io_async_sig_interrupt(struct pollset_info *psi);
int erts_check_io_max_files(void);
-void erts_check_io_interrupt(int);
-void erts_check_io_interrupt_timed(int, ErtsMonotonicTime);
void erts_check_io(int);
void erts_init_check_io(void);
#ifdef ERTS_ENABLE_LOCK_COUNT
void erts_lcnt_update_cio_locks(int enable);
#endif
-extern erts_atomic_t erts_check_io_time;
+void erts_check_io_interrupt(struct pollset_info*, int);
+void erts_check_io_interrupt_timed(struct pollset_info*, int, ErtsMonotonicTime);
+struct pollset_info* erts_get_pollset(int sched_num);
typedef struct {
ErtsPortTaskHandle task;
erts_atomic_t executed_time;
+ struct pollset_info *pollset;
} ErtsIoTask;
-ERTS_GLB_INLINE void erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp);
-
-#if ERTS_GLB_INLINE_INCL_FUNC_DEF
-
-ERTS_GLB_INLINE void
-erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp)
-{
- ErtsIoTask *itp = (ErtsIoTask *) (((char *) pthp) - offsetof(ErtsIoTask, task));
- erts_aint_t ci_time = erts_atomic_read_acqb(&erts_check_io_time);
- erts_atomic_set_relb(&itp->executed_time, ci_time);
-}
-
-#endif
-
#endif /* ERL_CHECK_IO_H__ */
#if !defined(ERL_CHECK_IO_C__) && !defined(ERTS_ALLOC_C__)
@@ -80,7 +71,7 @@ erts_io_notify_port_task_executed(ErtsPortTaskHandle *pthp)
*/
# define ERTS_CIO_DEFER_ACTIVE_EVENTS 1
#else
-# define ERTS_CIO_DEFER_ACTIVE_EVENTS 0
+# define ERTS_CIO_DEFER_ACTIVE_EVENTS 1
#endif
typedef struct {
diff --git a/erts/emulator/sys/common/erl_poll.h b/erts/emulator/sys/common/erl_poll.h
index 12dfc66e51..6c961205fe 100644
--- a/erts/emulator/sys/common/erl_poll.h
+++ b/erts/emulator/sys/common/erl_poll.h
@@ -225,6 +225,7 @@ typedef struct {
long no_avoided_interrupts;
long no_interrupt_timed;
#endif
+ int active_fds;
} ErtsPollInfo;
void ERTS_POLL_EXPORT(erts_poll_interrupt)(ErtsPollSet,
diff --git a/erts/emulator/sys/common/erl_sys_common_misc.c b/erts/emulator/sys/common/erl_sys_common_misc.c
index 09237c81ce..420138ff0a 100644
--- a/erts/emulator/sys/common/erl_sys_common_misc.c
+++ b/erts/emulator/sys/common/erl_sys_common_misc.c
@@ -45,14 +45,6 @@
#endif
#endif
-/*
- * erts_check_io_time is used by the erl_check_io implementation. The
- * global erts_check_io_time variable is declared here since there
- * (often) exist two versions of erl_check_io (kernel-poll and
- * non-kernel-poll), and we dont want two versions of this variable.
- */
-erts_atomic_t erts_check_io_time;
-
/* Written once and only once */
static int filename_encoding = ERL_FILENAME_UNKNOWN;
diff --git a/erts/emulator/sys/unix/sys.c b/erts/emulator/sys/unix/sys.c
index 09c515291a..acd7920e86 100644
--- a/erts/emulator/sys/unix/sys.c
+++ b/erts/emulator/sys/unix/sys.c
@@ -499,6 +499,7 @@ static void signal_notify_requested(Eterm type) {
static ERTS_INLINE void
break_requested(void)
{
+ int i;
/*
* just set a flag - checked for and handled by
* scheduler threads erts_check_io() (not signal handler).
@@ -510,7 +511,10 @@ break_requested(void)
erts_exit(ERTS_INTR_EXIT, "");
ERTS_SET_BREAK_REQUESTED;
- erts_check_io_interrupt(1);
+ for (i=0; i < erts_no_schedulers; i++) {
+ /* Make sure we don't sleep in poll */
+ erts_check_io_interrupt(ERTS_SCHEDULER_IX(i)->pollset, 1);
+ }
}
static RETSIGTYPE request_break(int signum)
diff --git a/erts/emulator/sys/unix/sys_drivers.c b/erts/emulator/sys/unix/sys_drivers.c
index 7c9a532fed..0228e1af54 100644
--- a/erts/emulator/sys/unix/sys_drivers.c
+++ b/erts/emulator/sys/unix/sys_drivers.c
@@ -1723,8 +1723,6 @@ static ErlDrvData forker_start(ErlDrvPort port_num, char* name,
SET_NONBLOCKING(forker_fd);
- driver_select(port_num, forker_fd, ERL_DRV_READ|ERL_DRV_USE, 1);
-
return (ErlDrvData)port_num;
}
@@ -1821,10 +1819,19 @@ static void forker_ready_output(ErlDrvData e, ErlDrvEvent fd)
static ErlDrvSSizeT forker_control(ErlDrvData e, unsigned int cmd, char *buf,
ErlDrvSizeT len, char **rbuf, ErlDrvSizeT rlen)
{
+ static int first_call = 1;
ErtsSysForkerProto *proto = (ErtsSysForkerProto *)buf;
ErlDrvPort port_num = (ErlDrvPort)e;
int res;
+ if (first_call) {
+ /*
+ * Do driver_select here when schedulers and their pollsets have started.
+ */
+ driver_select(port_num, forker_fd, ERL_DRV_READ|ERL_DRV_USE, 1);
+ first_call = 0;
+ }
+
driver_enq(port_num, buf, len);
if (driver_sizeq(port_num) > sizeof(*proto)) {
return 0;
diff --git a/erts/emulator/test/driver_SUITE.erl b/erts/emulator/test/driver_SUITE.erl
index c6d7f708be..332d11345b 100644
--- a/erts/emulator/test/driver_SUITE.erl
+++ b/erts/emulator/test/driver_SUITE.erl
@@ -83,6 +83,8 @@
-export([bin_prefix/2]).
+-export([get_check_io_total/1]). % for z_SUITE.erl
+
-include_lib("common_test/include/ct.hrl").
@@ -783,7 +785,7 @@ io_ready_exit(Config) when is_list(Config) ->
use_fallback_pollset(Config) when is_list(Config) ->
FlbkFun = fun () ->
- ChkIoDuring = erlang:system_info(check_io),
+ ChkIoDuring = get_check_io_total(erlang:system_info(check_io)),
case lists:keysearch(fallback_poll_set_size,
1,
ChkIoDuring) of
@@ -939,7 +941,7 @@ chkio_test({erts_poll_info, Before},
"ok" ->
chk_chkio_port(Port),
Fun(),
- During = erlang:system_info(check_io),
+ During = get_check_io_total(erlang:system_info(check_io)),
erlang:display(During),
0 = element(1, erts_debug:get_internal_state(check_io_debug)),
io:format("During test: ~p~n", [During]),
@@ -992,14 +994,14 @@ verify_chkio_state(Before, After) ->
ok.
get_stable_check_io_info() ->
- ChkIo = erlang:system_info(check_io),
- PendUpdNo = case lists:keysearch(pending_updates, 1, ChkIo) of
- {value, {pending_updates, PendNo}} ->
- PendNo;
+ ChkIo = get_check_io_total(erlang:system_info(check_io)),
+ PendUpdNo = case lists:keyfind(pending_updates, 1, ChkIo) of
+ {pending_updates, Value} ->
+ Value;
false ->
0
end,
- {value, {active_fds, ActFds}} = lists:keysearch(active_fds, 1, ChkIo),
+ {active_fds, ActFds} = lists:keyfind(active_fds, 1, ChkIo),
case {PendUpdNo, ActFds} of
{0, 0} ->
ChkIo;
@@ -1008,6 +1010,46 @@ get_stable_check_io_info() ->
get_stable_check_io_info()
end.
+%% Merge return from erlang:system_info(check_io)
+%% as if it was one big pollset.
+get_check_io_total(ChkIo) ->
+ lists:foldl(fun(Pollset, Acc) ->
+ lists:zipwith(fun(A, B) ->
+ add_pollset_infos(A,B)
+ end,
+ Pollset, Acc)
+ end,
+ hd(ChkIo),
+ tl(ChkIo)).
+
+add_pollset_infos({Tag, A}=TA , {Tag, B}=TB) ->
+ case tag_type(Tag) of
+ sum ->
+ {Tag, A + B};
+ const ->
+ case A of
+ B -> TA;
+ _ ->
+ ct:fail("Unexpected diff in pollsets ~p != ~p",
+ [TA,TB])
+ end
+ end.
+
+tag_type(name) -> const;
+tag_type(primary) -> const;
+tag_type(fallback) -> const;
+tag_type(kernel_poll) -> const;
+tag_type(memory_size) -> sum;
+tag_type(total_poll_set_size) -> sum;
+tag_type(fallback_poll_set_size) -> sum;
+tag_type(lazy_updates) -> const;
+tag_type(pending_updates) -> sum;
+tag_type(batch_updates) -> const;
+tag_type(concurrent_updates) -> const;
+tag_type(max_fds) -> const;
+tag_type(active_fds) -> sum.
+
+
%% Missed port lock when stealing control of fd from a
%% driver that didn't use the same lock. The lock checker
%% used to trigger on this and dump core.
diff --git a/erts/emulator/test/z_SUITE.erl b/erts/emulator/test/z_SUITE.erl
index feea7432a9..ac3df8bfbf 100644
--- a/erts/emulator/test/z_SUITE.erl
+++ b/erts/emulator/test/z_SUITE.erl
@@ -330,7 +330,7 @@ display_check_io(ChkIo) ->
ok.
get_check_io_info() ->
- ChkIo = erlang:system_info(check_io),
+ ChkIo = driver_SUITE:get_check_io_total(erlang:system_info(check_io)),
PendUpdNo = case lists:keysearch(pending_updates, 1, ChkIo) of
{value, {pending_updates, PendNo}} ->
PendNo;