aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_port_task.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_port_task.c')
-rw-r--r--erts/emulator/beam/erl_port_task.c100
1 files changed, 41 insertions, 59 deletions
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index 6eb8c4869c..112d27c94e 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -51,12 +51,9 @@
#define ERTS_PORT_TASK_INVALID_PORT(P, ID) \
- ((erts_smp_atomic32_read_acqb(&(P)->state) & ERTS_PORT_SFLGS_DEAD) \
+ ((erts_atomic32_read_acqb(&(P)->state) & ERTS_PORT_SFLGS_DEAD) \
|| (P)->common.id != (ID))
-#define ERTS_PORT_IS_IN_RUNQ(RQ, P) \
- ((P)->sched.next || (P)->sched.prev || (RQ)->ports.start == (P))
-
#ifdef USE_VM_PROBES
#define DTRACE_DRIVER(PROBE_NAME, PP) \
if (DTRACE_ENABLED(driver_ready_input)) { \
@@ -76,7 +73,6 @@ erts_smp_atomic_t erts_port_task_outstanding_io_tasks;
struct ErtsPortTaskQueue_ {
ErtsPortTask *first;
ErtsPortTask *last;
- Port *port;
};
struct ErtsPortTask_ {
@@ -275,7 +271,6 @@ port_taskq_init(ErtsPortTaskQueue *ptqp, Port *pp)
if (ptqp) {
ptqp->first = NULL;
ptqp->last = NULL;
- ptqp->port = pp;
}
return ptqp;
}
@@ -429,7 +424,10 @@ erts_port_task_abort(Eterm id, ErtsPortTaskHandle *pthp)
ErtsPortTask *ptp;
Port *pp;
- pp = &erts_port[internal_port_index(id)];
+ pp = erts_port_lookup_raw(id);
+ if (!pp)
+ return 1;
+
runq = erts_port_runq(pp);
if (!runq)
return 1;
@@ -512,7 +510,10 @@ erts_port_task_schedule(Eterm id,
ptp = port_task_alloc();
ASSERT(is_internal_port(id));
- pp = &erts_port[internal_port_index(id)];
+ pp = erts_port_lookup_raw(id);
+ if (!pp)
+ return -1;
+
runq = erts_port_runq(pp);
if (!runq || ERTS_PORT_TASK_INVALID_PORT(pp, id)) {
@@ -616,7 +617,7 @@ erts_port_task_free_port(Port *pp)
ErtsPortTaskQueue *ptqp;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
- ASSERT(!(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
+ ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
runq = erts_port_runq(pp);
ASSERT(runq);
ERTS_PT_CHK_PRES_PORTQ(runq, pp);
@@ -627,14 +628,10 @@ erts_port_task_free_port(Port *pp)
ErtsPortTask *ptp;
enqueue_free:
ptp = port_task_alloc();
- erts_smp_port_minor_lock(pp);
- erts_smp_atomic32_read_bset_relb(&pp->state,
- (ERTS_PORT_SFLG_CLOSING
- | ERTS_PORT_SFLG_FREE_SCHEDULED),
- ERTS_PORT_SFLG_FREE_SCHEDULED);
- erts_may_save_closed_port(pp);
- erts_smp_port_minor_unlock(pp);
- ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&pp->common.refc) > 1);
+ erts_atomic32_read_bset_relb(&pp->state,
+ (ERTS_PORT_SFLG_CLOSING
+ | ERTS_PORT_SFLG_FREE_SCHEDULED),
+ ERTS_PORT_SFLG_FREE_SCHEDULED);
ptp->type = ERTS_PORT_TASK_FREE;
ptp->event = (ErlDrvEvent) -1;
ptp->event_data = NULL;
@@ -651,20 +648,18 @@ erts_port_task_free_port(Port *pp)
goto enqueue_free;
}
ASSERT(!pp->sched.taskq);
- erts_smp_port_minor_lock(pp);
- erts_smp_atomic32_read_bset_relb(&pp->state,
- (ERTS_PORT_SFLG_CLOSING
- | ERTS_PORT_SFLG_FREE_SCHEDULED),
- ERTS_PORT_SFLG_FREE_SCHEDULED);
- erts_may_save_closed_port(pp);
- erts_smp_port_minor_unlock(pp);
- erts_smp_atomic32_dec_nob(&pp->common.refc); /* Not alive */
- ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&pp->common.refc) > 0); /* Lock */
+ erts_atomic32_read_bset_relb(&pp->state,
+ (ERTS_PORT_SFLG_CLOSING
+ | ERTS_PORT_SFLG_FREE_SCHEDULED),
+ ERTS_PORT_SFLG_FREE_SCHEDULED);
handle_remaining_tasks(runq, pp); /* May release runq lock */
ASSERT(!pp->sched.exe_taskq && (!ptqp || !ptqp->first));
pp->sched.taskq = NULL;
ERTS_PT_CHK_PRES_PORTQ(runq, pp);
erts_smp_runq_unlock(runq);
+#ifndef ERTS_SMP
+ pp->cleanup = 1;
+#endif
}
}
@@ -703,14 +698,21 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
goto done;
}
+ if (erts_smp_port_trylock(pp) == EBUSY) {
+ erts_smp_runq_unlock(runq);
+ erts_smp_port_lock(pp);
+ erts_smp_runq_lock(runq);
+ }
+
ASSERT(pp->sched.in_runq);
pp->sched.in_runq = 0;
+
if (!pp->sched.taskq) {
if (erts_system_profile_flags.runnable_ports)
profile_runnable_port(pp, am_inactive);
res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
!= (erts_aint_t) 0);
- goto done;
+ goto release_port_done;
}
*curr_port_pp = pp;
@@ -721,12 +723,6 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
ASSERT(!pp->sched.exe_taskq);
pp->sched.exe_taskq = ptqp;
-
- if (erts_smp_port_trylock(pp) == EBUSY) {
- erts_smp_runq_unlock(runq);
- erts_smp_port_lock(pp);
- erts_smp_runq_lock(runq);
- }
if (erts_sched_stat.enabled) {
ErtsSchedulerData *esdp = erts_get_scheduler_data();
@@ -771,31 +767,31 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
erts_smp_runq_lock(runq);
erts_unblock_fpe(fpe_was_unmasked);
- ASSERT(erts_smp_atomic32_read_nob(&pp->state)
+ ASSERT(erts_atomic32_read_nob(&pp->state)
& ERTS_PORT_SFLG_FREE_SCHEDULED);
if (ptqp->first || (pp->sched.taskq && pp->sched.taskq->first))
handle_remaining_tasks(runq, pp);
ASSERT(!ptqp->first
&& (!pp->sched.taskq || !pp->sched.taskq->first));
- erts_smp_atomic32_dec_nob(&pp->common.refc); /* Not alive */
- ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&pp->common.refc) > 0); /* Lock */
port_task_free(ptp);
if (pp->sched.taskq)
port_taskq_free(pp->sched.taskq);
pp->sched.taskq = NULL;
-
+#ifndef ERTS_SMP
+ pp->cleanup = 1;
+#endif
goto tasks_done;
case ERTS_PORT_TASK_TIMEOUT:
reds += ERTS_PORT_REDS_TIMEOUT;
- if (!(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)) {
+ if (!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)) {
DTRACE_DRIVER(driver_timeout, pp);
(*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data);
}
break;
case ERTS_PORT_TASK_INPUT:
reds += ERTS_PORT_REDS_INPUT;
- ASSERT((erts_smp_atomic32_read_nob(&pp->state)
+ ASSERT((erts_atomic32_read_nob(&pp->state)
& ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_ready_input, pp);
/* NOTE some windows drivers use ->ready_input for input and output */
@@ -804,7 +800,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
break;
case ERTS_PORT_TASK_OUTPUT:
reds += ERTS_PORT_REDS_OUTPUT;
- ASSERT((erts_smp_atomic32_read_nob(&pp->state)
+ ASSERT((erts_atomic32_read_nob(&pp->state)
& ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_ready_output, pp);
(*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, ptp->event);
@@ -812,7 +808,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
break;
case ERTS_PORT_TASK_EVENT:
reds += ERTS_PORT_REDS_EVENT;
- ASSERT((erts_smp_atomic32_read_nob(&pp->state)
+ ASSERT((erts_atomic32_read_nob(&pp->state)
& ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_event, pp);
(*pp->drv_ptr->event)((ErlDrvData) pp->drv_data, ptp->event, ptp->event_data);
@@ -828,7 +824,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
break;
}
- if ((erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLG_CLOSING)
+ if ((erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLG_CLOSING)
&& erts_is_port_ioq_empty(pp)) {
reds += ERTS_PORT_REDS_TERMINATE;
erts_terminate_port(pp);
@@ -879,7 +875,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
ErtsRunQueue *xrunq;
#endif
- ASSERT(!(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
+ ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
ASSERT(pp->sched.taskq->first);
#ifdef ERTS_SMP
@@ -922,23 +918,9 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
trace_sched_ports(pp, am_out);
}
-#ifndef ERTS_SMP
+
+release_port_done:
erts_port_release(pp);
-#else
- {
- erts_aint32_t refc;
- erts_smp_mtx_unlock(pp->lock);
- refc = erts_smp_atomic32_dec_read_nob(&pp->common.refc);
- ASSERT(refc >= 0);
- if (refc == 0) {
- erts_smp_runq_unlock(runq);
- erts_port_cleanup(pp); /* Might aquire runq lock */
- erts_smp_runq_lock(runq);
- res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
- != (erts_aint_t) 0);
- }
- }
-#endif
done:
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));