diff options
Diffstat (limited to 'erts/emulator/beam/erl_port_task.c')
-rw-r--r-- | erts/emulator/beam/erl_port_task.c | 114 |
1 files changed, 86 insertions, 28 deletions
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c index dbc4a06c2d..ce045ec94e 100644 --- a/erts/emulator/beam/erl_port_task.c +++ b/erts/emulator/beam/erl_port_task.c @@ -685,12 +685,12 @@ enqueue_proc2port_data(Port *pp, void erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp) { - Port *pp = erts_drvport2port(dport, NULL); - ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq; + Port *pp = erts_drvport2port(dport); + ErtsPortTaskBusyPortQ *bpq; int written = 0, resume_procs = 0; ErlDrvSizeT low, high; - if (!pp || !bpq) { + if (pp == ERTS_INVALID_ERL_DRV_PORT || !(bpq = pp->sched.taskq.bpq)) { if (lowp) *lowp = ERL_DRV_BUSY_MSGQ_DISABLED; if (highp) @@ -1160,6 +1160,27 @@ select_task_for_exec(Port *pp, } /* + * Cut time slice + */ + +int +erl_drv_consume_timeslice(ErlDrvPort dprt, int percent) +{ + Port *pp = erts_drvport2port(dprt); + if (pp == ERTS_INVALID_ERL_DRV_PORT) + return -1; + if (percent < 1) + percent = 1; + else if (100 < percent) + percent = 100; + pp->reds += percent*((CONTEXT_REDS+99)/100); + if (pp->reds < CONTEXT_REDS) + return 0; + pp->reds = CONTEXT_REDS; + return 1; +} + +/* * Abort a scheduled task. */ @@ -1509,7 +1530,6 @@ fail: void erts_port_task_free_port(Port *pp) { - ErtsProcList *suspended; erts_aint32_t flags; ErtsRunQueue *runq; @@ -1522,19 +1542,16 @@ erts_port_task_free_port(Port *pp) erts_port_task_sched_lock(&pp->sched); flags = erts_smp_atomic32_read_bor_relb(&pp->sched.flags, ERTS_PTS_FLG_EXIT); - suspended = pp->suspended; - pp->suspended = NULL; erts_port_task_sched_unlock(&pp->sched); erts_atomic32_read_bset_relb(&pp->state, - (ERTS_PORT_SFLG_CLOSING + (ERTS_PORT_SFLG_CONNECTED + | ERTS_PORT_SFLG_EXITING + | ERTS_PORT_SFLG_CLOSING | ERTS_PORT_SFLG_FREE), ERTS_PORT_SFLG_FREE); erts_smp_runq_unlock(runq); - if (erts_proclist_fetch(&suspended, NULL)) - erts_resume_processes(suspended); - if (!(flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) begin_port_cleanup(pp, NULL, NULL); } @@ -1554,7 +1571,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) int processing_busy_q; int res = 0; int vreds = 0; - int reds = ERTS_PORT_REDS_EXECUTE; + int reds = 0; erts_aint_t io_tasks_executed = 0; int fpe_was_unmasked; erts_aint32_t state; @@ -1599,6 +1616,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) fpe_was_unmasked = erts_block_fpe(); state = erts_atomic32_read_nob(&pp->state); + pp->reds = ERTS_PORT_REDS_EXECUTE; goto begin_handle_tasks; while (1) { @@ -1625,14 +1643,14 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) switch (ptp->type) { case ERTS_PORT_TASK_TIMEOUT: - reds += ERTS_PORT_REDS_TIMEOUT; + reds = ERTS_PORT_REDS_TIMEOUT; if (!(state & ERTS_PORT_SFLGS_DEAD)) { DTRACE_DRIVER(driver_timeout, pp); (*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data); } break; case ERTS_PORT_TASK_INPUT: - reds += ERTS_PORT_REDS_INPUT; + reds = ERTS_PORT_REDS_INPUT; ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_ready_input, pp); /* NOTE some windows drivers use ->ready_input for input and output */ @@ -1641,7 +1659,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) io_tasks_executed++; break; case ERTS_PORT_TASK_OUTPUT: - reds += ERTS_PORT_REDS_OUTPUT; + reds = ERTS_PORT_REDS_OUTPUT; ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_ready_output, pp); (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, @@ -1649,7 +1667,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) io_tasks_executed++; break; case ERTS_PORT_TASK_EVENT: - reds += ERTS_PORT_REDS_EVENT; + reds = ERTS_PORT_REDS_EVENT; ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_event, pp); (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data, @@ -1661,22 +1679,22 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data; ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); if (!pp->sched.taskq.bpq) - reds += ptp->u.alive.td.psig.callback(pp, - state, - ERTS_PROC2PORT_SIG_EXEC, - sigdp); + reds = ptp->u.alive.td.psig.callback(pp, + state, + ERTS_PROC2PORT_SIG_EXEC, + sigdp); else { ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp); - reds += ptp->u.alive.td.psig.callback(pp, - state, - ERTS_PROC2PORT_SIG_EXEC, - sigdp); + reds = ptp->u.alive.td.psig.callback(pp, + state, + ERTS_PROC2PORT_SIG_EXEC, + sigdp); dequeued_proc2port_data(pp, size); } break; } case ERTS_PORT_TASK_DIST_CMD: - reds += erts_dist_command(pp, CONTEXT_REDS-reds); + reds = erts_dist_command(pp, CONTEXT_REDS - pp->reds); break; default: erl_exit(ERTS_ABORT_EXIT, @@ -1701,7 +1719,10 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) vreds += ERTS_PORT_CALLBACK_VREDS; reds += ERTS_PORT_CALLBACK_VREDS; - if (reds >= CONTEXT_REDS) + pp->reds += reds; + reds = 0; + + if (pp->reds >= CONTEXT_REDS) break; } @@ -1725,6 +1746,8 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) active = finalize_exec(pp, &execq, processing_busy_q); + reds = pp->reds - vreds; + erts_port_release(pp); *curr_port_pp = NULL; @@ -1770,7 +1793,6 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) != (erts_aint_t) 0); - reds -= vreds; runq->scheduler->reductions += reds; ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); @@ -1793,10 +1815,11 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) int i, max; ErtsPortTaskBusyCallerTable *tabp; ErtsPortTask *qs[3]; + ErtsPortTaskHandleList *free_nshp = NULL; + ErtsProcList *plp; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); - /* * Abort remaining tasks... * @@ -1935,7 +1958,42 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) erts_smp_atomic32_read_band_nob(&pp->sched.flags, ~(ERTS_PTS_FLG_HAVE_BUSY_TASKS - |ERTS_PTS_FLG_HAVE_TASKS)); + |ERTS_PTS_FLG_HAVE_TASKS + |ERTS_PTS_FLGS_BUSY)); + + erts_port_task_sched_lock(&pp->sched); + + /* Cleanup nosuspend handles... */ + free_nshp = (pp->sched.taskq.local.busy.nosuspend + ? get_free_nosuspend_handles(pp) + : NULL); + ASSERT(!pp->sched.taskq.local.busy.nosuspend); + + /* Make sure not to leave any processes suspended on the port... */ + plp = pp->suspended; + pp->suspended = NULL; + + erts_port_task_sched_unlock(&pp->sched); + + if (free_nshp) + free_nosuspend_handles(free_nshp); + + if (erts_proclist_fetch(&plp, NULL)) { +#ifdef USE_VM_PROBES + if (DTRACE_ENABLED(process_port_unblocked)) { + DTRACE_CHARBUF(port_str, 16); + DTRACE_CHARBUF(pid_str, 16); + ErtsProcList* plp2 = plp; + + erts_snprintf(port_str, sizeof(port_str), "%T", pp->common.id); + while (plp2 != NULL) { + erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid); + DTRACE2(process_port_unblocked, pid_str, port_str); + } + } +#endif + erts_resume_processes(plp); + } /* * Schedule cleanup of port structure... |