aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_port_task.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_port_task.c')
-rw-r--r--erts/emulator/beam/erl_port_task.c114
1 files changed, 86 insertions, 28 deletions
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index dbc4a06c2d..ce045ec94e 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -685,12 +685,12 @@ enqueue_proc2port_data(Port *pp,
void
erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp)
{
- Port *pp = erts_drvport2port(dport, NULL);
- ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq;
+ Port *pp = erts_drvport2port(dport);
+ ErtsPortTaskBusyPortQ *bpq;
int written = 0, resume_procs = 0;
ErlDrvSizeT low, high;
- if (!pp || !bpq) {
+ if (pp == ERTS_INVALID_ERL_DRV_PORT || !(bpq = pp->sched.taskq.bpq)) {
if (lowp)
*lowp = ERL_DRV_BUSY_MSGQ_DISABLED;
if (highp)
@@ -1160,6 +1160,27 @@ select_task_for_exec(Port *pp,
}
/*
+ * Cut time slice
+ */
+
+int
+erl_drv_consume_timeslice(ErlDrvPort dprt, int percent)
+{
+ Port *pp = erts_drvport2port(dprt);
+ if (pp == ERTS_INVALID_ERL_DRV_PORT)
+ return -1;
+ if (percent < 1)
+ percent = 1;
+ else if (100 < percent)
+ percent = 100;
+ pp->reds += percent*((CONTEXT_REDS+99)/100);
+ if (pp->reds < CONTEXT_REDS)
+ return 0;
+ pp->reds = CONTEXT_REDS;
+ return 1;
+}
+
+/*
* Abort a scheduled task.
*/
@@ -1509,7 +1530,6 @@ fail:
void
erts_port_task_free_port(Port *pp)
{
- ErtsProcList *suspended;
erts_aint32_t flags;
ErtsRunQueue *runq;
@@ -1522,19 +1542,16 @@ erts_port_task_free_port(Port *pp)
erts_port_task_sched_lock(&pp->sched);
flags = erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
ERTS_PTS_FLG_EXIT);
- suspended = pp->suspended;
- pp->suspended = NULL;
erts_port_task_sched_unlock(&pp->sched);
erts_atomic32_read_bset_relb(&pp->state,
- (ERTS_PORT_SFLG_CLOSING
+ (ERTS_PORT_SFLG_CONNECTED
+ | ERTS_PORT_SFLG_EXITING
+ | ERTS_PORT_SFLG_CLOSING
| ERTS_PORT_SFLG_FREE),
ERTS_PORT_SFLG_FREE);
erts_smp_runq_unlock(runq);
- if (erts_proclist_fetch(&suspended, NULL))
- erts_resume_processes(suspended);
-
if (!(flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
begin_port_cleanup(pp, NULL, NULL);
}
@@ -1554,7 +1571,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
int processing_busy_q;
int res = 0;
int vreds = 0;
- int reds = ERTS_PORT_REDS_EXECUTE;
+ int reds = 0;
erts_aint_t io_tasks_executed = 0;
int fpe_was_unmasked;
erts_aint32_t state;
@@ -1599,6 +1616,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
fpe_was_unmasked = erts_block_fpe();
state = erts_atomic32_read_nob(&pp->state);
+ pp->reds = ERTS_PORT_REDS_EXECUTE;
goto begin_handle_tasks;
while (1) {
@@ -1625,14 +1643,14 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
switch (ptp->type) {
case ERTS_PORT_TASK_TIMEOUT:
- reds += ERTS_PORT_REDS_TIMEOUT;
+ reds = ERTS_PORT_REDS_TIMEOUT;
if (!(state & ERTS_PORT_SFLGS_DEAD)) {
DTRACE_DRIVER(driver_timeout, pp);
(*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data);
}
break;
case ERTS_PORT_TASK_INPUT:
- reds += ERTS_PORT_REDS_INPUT;
+ reds = ERTS_PORT_REDS_INPUT;
ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_ready_input, pp);
/* NOTE some windows drivers use ->ready_input for input and output */
@@ -1641,7 +1659,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
io_tasks_executed++;
break;
case ERTS_PORT_TASK_OUTPUT:
- reds += ERTS_PORT_REDS_OUTPUT;
+ reds = ERTS_PORT_REDS_OUTPUT;
ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_ready_output, pp);
(*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data,
@@ -1649,7 +1667,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
io_tasks_executed++;
break;
case ERTS_PORT_TASK_EVENT:
- reds += ERTS_PORT_REDS_EVENT;
+ reds = ERTS_PORT_REDS_EVENT;
ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_event, pp);
(*pp->drv_ptr->event)((ErlDrvData) pp->drv_data,
@@ -1661,22 +1679,22 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
if (!pp->sched.taskq.bpq)
- reds += ptp->u.alive.td.psig.callback(pp,
- state,
- ERTS_PROC2PORT_SIG_EXEC,
- sigdp);
+ reds = ptp->u.alive.td.psig.callback(pp,
+ state,
+ ERTS_PROC2PORT_SIG_EXEC,
+ sigdp);
else {
ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
- reds += ptp->u.alive.td.psig.callback(pp,
- state,
- ERTS_PROC2PORT_SIG_EXEC,
- sigdp);
+ reds = ptp->u.alive.td.psig.callback(pp,
+ state,
+ ERTS_PROC2PORT_SIG_EXEC,
+ sigdp);
dequeued_proc2port_data(pp, size);
}
break;
}
case ERTS_PORT_TASK_DIST_CMD:
- reds += erts_dist_command(pp, CONTEXT_REDS-reds);
+ reds = erts_dist_command(pp, CONTEXT_REDS - pp->reds);
break;
default:
erl_exit(ERTS_ABORT_EXIT,
@@ -1701,7 +1719,10 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
vreds += ERTS_PORT_CALLBACK_VREDS;
reds += ERTS_PORT_CALLBACK_VREDS;
- if (reds >= CONTEXT_REDS)
+ pp->reds += reds;
+ reds = 0;
+
+ if (pp->reds >= CONTEXT_REDS)
break;
}
@@ -1725,6 +1746,8 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
active = finalize_exec(pp, &execq, processing_busy_q);
+ reds = pp->reds - vreds;
+
erts_port_release(pp);
*curr_port_pp = NULL;
@@ -1770,7 +1793,6 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
!= (erts_aint_t) 0);
- reds -= vreds;
runq->scheduler->reductions += reds;
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
@@ -1793,10 +1815,11 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
int i, max;
ErtsPortTaskBusyCallerTable *tabp;
ErtsPortTask *qs[3];
+ ErtsPortTaskHandleList *free_nshp = NULL;
+ ErtsProcList *plp;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
-
/*
* Abort remaining tasks...
*
@@ -1935,7 +1958,42 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
erts_smp_atomic32_read_band_nob(&pp->sched.flags,
~(ERTS_PTS_FLG_HAVE_BUSY_TASKS
- |ERTS_PTS_FLG_HAVE_TASKS));
+ |ERTS_PTS_FLG_HAVE_TASKS
+ |ERTS_PTS_FLGS_BUSY));
+
+ erts_port_task_sched_lock(&pp->sched);
+
+ /* Cleanup nosuspend handles... */
+ free_nshp = (pp->sched.taskq.local.busy.nosuspend
+ ? get_free_nosuspend_handles(pp)
+ : NULL);
+ ASSERT(!pp->sched.taskq.local.busy.nosuspend);
+
+ /* Make sure not to leave any processes suspended on the port... */
+ plp = pp->suspended;
+ pp->suspended = NULL;
+
+ erts_port_task_sched_unlock(&pp->sched);
+
+ if (free_nshp)
+ free_nosuspend_handles(free_nshp);
+
+ if (erts_proclist_fetch(&plp, NULL)) {
+#ifdef USE_VM_PROBES
+ if (DTRACE_ENABLED(process_port_unblocked)) {
+ DTRACE_CHARBUF(port_str, 16);
+ DTRACE_CHARBUF(pid_str, 16);
+ ErtsProcList* plp2 = plp;
+
+ erts_snprintf(port_str, sizeof(port_str), "%T", pp->common.id);
+ while (plp2 != NULL) {
+ erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid);
+ DTRACE2(process_port_unblocked, pid_str, port_str);
+ }
+ }
+#endif
+ erts_resume_processes(plp);
+ }
/*
* Schedule cleanup of port structure...