diff options
Diffstat (limited to 'erts/emulator/beam/erl_port_task.c')
-rw-r--r-- | erts/emulator/beam/erl_port_task.c | 81 |
1 files changed, 72 insertions, 9 deletions
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c index 4928d80f27..c8f2e88127 100644 --- a/erts/emulator/beam/erl_port_task.c +++ b/erts/emulator/beam/erl_port_task.c @@ -97,6 +97,9 @@ static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_q typedef union { struct { /* I/O tasks */ ErlDrvEvent event; +#if ERTS_POLL_USE_SCHEDULER_POLLING + int is_scheduler_event; +#endif } io; struct { ErtsProc2PortSigCallback callback; @@ -141,6 +144,9 @@ struct ErtsPortTaskBusyCallerTable_ { ErtsPortTaskBusyCaller pre_alloc_busy_caller; }; +#if ERTS_POLL_USE_SCHEDULER_POLLING +erts_atomic_t erts_port_task_outstanding_io_tasks; +#endif static void begin_port_cleanup(Port *pp, ErtsPortTask **execq, @@ -578,13 +584,26 @@ reset_handle(ErtsPortTask *ptp) } static ERTS_INLINE void -reset_executed_io_task_handle(ErtsPortTask *ptp) +reset_executed_io_task_handle(Port *prt, ErtsPortTask *ptp) { if (ptp->u.alive.handle) { ASSERT(ptp == handle2task(ptp->u.alive.handle)); - /* The port task handle is reset inside task_executed */ - erts_io_notify_port_task_executed(ptp->type, ptp->u.alive.handle, - reset_port_task_handle); +#if ERTS_POLL_USE_SCHEDULER_POLLING + if (ptp->u.alive.td.io.is_scheduler_event) { + if ((erts_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLG_CHECK_FD_CLEANUP)) { + erts_io_notify_port_task_executed(ptp->type, ptp->u.alive.handle, + reset_port_task_handle); + erts_atomic32_read_band_nob(&prt->state, ~ERTS_PORT_SFLG_CHECK_FD_CLEANUP); + } else { + reset_port_task_handle(ptp->u.alive.handle); + } + } else +#endif + { + /* The port task handle is reset inside task_executed */ + erts_io_notify_port_task_executed(ptp->type, ptp->u.alive.handle, + reset_port_task_handle); + } } } @@ -1307,6 +1326,22 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp) res = - 1; /* Task already aborted, executing, or executed */ else { reset_port_task_handle(pthp); + +#if ERTS_POLL_USE_SCHEDULER_POLLING + switch (ptp->type) { + case ERTS_PORT_TASK_INPUT: + case ERTS_PORT_TASK_OUTPUT: + if (ptp->u.alive.td.io.is_scheduler_event) { + ASSERT(erts_atomic_read_nob( + &erts_port_task_outstanding_io_tasks) > 0); + erts_atomic_dec_relb(&erts_port_task_outstanding_io_tasks); + } + break; + default: + break; + } +#endif + res = 0; } } @@ -1442,7 +1477,14 @@ erts_port_task_schedule(Eterm id, va_list argp; va_start(argp, type); ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent); +#if ERTS_POLL_USE_SCHEDULER_POLLING + ptp->u.alive.td.io.is_scheduler_event = va_arg(argp, int); +#endif va_end(argp); +#if ERTS_POLL_USE_SCHEDULER_POLLING + if (ptp->u.alive.td.io.is_scheduler_event) + erts_atomic_inc_relb(&erts_port_task_outstanding_io_tasks); +#endif break; } case ERTS_PORT_TASK_PROC_SIG: { @@ -1621,12 +1663,14 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) int processing_busy_q; int vreds = 0; int reds = 0; - erts_aint_t io_tasks_executed = 0; int fpe_was_unmasked; erts_aint32_t state; int active; Uint64 start_time = 0; ErtsSchedulerData *esdp = runq->scheduler; +#if ERTS_POLL_USE_SCHEDULER_POLLING + erts_aint_t io_tasks_executed = 0; +#endif ERTS_MSACC_PUSH_STATE_M(); ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq)); @@ -1722,8 +1766,11 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) for input and output */ (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data, ptp->u.alive.td.io.event); - reset_executed_io_task_handle(ptp); - io_tasks_executed++; + reset_executed_io_task_handle(pp, ptp); +#if ERTS_POLL_USE_SCHEDULER_POLLING + if (ptp->u.alive.td.io.is_scheduler_event) + io_tasks_executed++; +#endif break; case ERTS_PORT_TASK_OUTPUT: reds = ERTS_PORT_REDS_OUTPUT; @@ -1732,8 +1779,11 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) LTTNG_DRIVER(driver_ready_output, pp); (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, ptp->u.alive.td.io.event); - reset_executed_io_task_handle(ptp); - io_tasks_executed++; + reset_executed_io_task_handle(pp, ptp); +#if ERTS_POLL_USE_SCHEDULER_POLLING + if (ptp->u.alive.td.io.is_scheduler_event) + io_tasks_executed++; +#endif break; case ERTS_PORT_TASK_PROC_SIG: { ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data; @@ -1799,6 +1849,15 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) erts_unblock_fpe(fpe_was_unmasked); ERTS_MSACC_POP_STATE_M(); +#if ERTS_POLL_USE_SCHEDULER_POLLING + if (io_tasks_executed) { + ASSERT(erts_atomic_read_nob(&erts_port_task_outstanding_io_tasks) + >= io_tasks_executed); + erts_atomic_add_relb(&erts_port_task_outstanding_io_tasks, + -1*io_tasks_executed); + } +#endif + ASSERT(runq == erts_get_runq_port(pp)); active = finalize_exec(pp, &execq, processing_busy_q); @@ -2086,6 +2145,10 @@ erts_dequeue_port(ErtsRunQueue *rq) void erts_port_task_init(void) { +#if ERTS_POLL_USE_SCHEDULER_POLLING + erts_atomic_init_nob(&erts_port_task_outstanding_io_tasks, + (erts_aint_t) 0); +#endif init_port_task_alloc(erts_no_schedulers + erts_no_poll_threads + 1); /* aux_thread */ init_busy_caller_table_alloc(); |