diff options
Diffstat (limited to 'erts/emulator/beam/erl_port_task.c')
-rw-r--r-- | erts/emulator/beam/erl_port_task.c | 100 |
1 files changed, 41 insertions, 59 deletions
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c index 6eb8c4869c..112d27c94e 100644 --- a/erts/emulator/beam/erl_port_task.c +++ b/erts/emulator/beam/erl_port_task.c @@ -51,12 +51,9 @@ #define ERTS_PORT_TASK_INVALID_PORT(P, ID) \ - ((erts_smp_atomic32_read_acqb(&(P)->state) & ERTS_PORT_SFLGS_DEAD) \ + ((erts_atomic32_read_acqb(&(P)->state) & ERTS_PORT_SFLGS_DEAD) \ || (P)->common.id != (ID)) -#define ERTS_PORT_IS_IN_RUNQ(RQ, P) \ - ((P)->sched.next || (P)->sched.prev || (RQ)->ports.start == (P)) - #ifdef USE_VM_PROBES #define DTRACE_DRIVER(PROBE_NAME, PP) \ if (DTRACE_ENABLED(driver_ready_input)) { \ @@ -76,7 +73,6 @@ erts_smp_atomic_t erts_port_task_outstanding_io_tasks; struct ErtsPortTaskQueue_ { ErtsPortTask *first; ErtsPortTask *last; - Port *port; }; struct ErtsPortTask_ { @@ -275,7 +271,6 @@ port_taskq_init(ErtsPortTaskQueue *ptqp, Port *pp) if (ptqp) { ptqp->first = NULL; ptqp->last = NULL; - ptqp->port = pp; } return ptqp; } @@ -429,7 +424,10 @@ erts_port_task_abort(Eterm id, ErtsPortTaskHandle *pthp) ErtsPortTask *ptp; Port *pp; - pp = &erts_port[internal_port_index(id)]; + pp = erts_port_lookup_raw(id); + if (!pp) + return 1; + runq = erts_port_runq(pp); if (!runq) return 1; @@ -512,7 +510,10 @@ erts_port_task_schedule(Eterm id, ptp = port_task_alloc(); ASSERT(is_internal_port(id)); - pp = &erts_port[internal_port_index(id)]; + pp = erts_port_lookup_raw(id); + if (!pp) + return -1; + runq = erts_port_runq(pp); if (!runq || ERTS_PORT_TASK_INVALID_PORT(pp, id)) { @@ -616,7 +617,7 @@ erts_port_task_free_port(Port *pp) ErtsPortTaskQueue *ptqp; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); - ASSERT(!(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)); + ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)); runq = erts_port_runq(pp); ASSERT(runq); ERTS_PT_CHK_PRES_PORTQ(runq, pp); @@ -627,14 +628,10 @@ erts_port_task_free_port(Port *pp) ErtsPortTask *ptp; enqueue_free: ptp = port_task_alloc(); - erts_smp_port_minor_lock(pp); - erts_smp_atomic32_read_bset_relb(&pp->state, - (ERTS_PORT_SFLG_CLOSING - | ERTS_PORT_SFLG_FREE_SCHEDULED), - ERTS_PORT_SFLG_FREE_SCHEDULED); - erts_may_save_closed_port(pp); - erts_smp_port_minor_unlock(pp); - ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&pp->common.refc) > 1); + erts_atomic32_read_bset_relb(&pp->state, + (ERTS_PORT_SFLG_CLOSING + | ERTS_PORT_SFLG_FREE_SCHEDULED), + ERTS_PORT_SFLG_FREE_SCHEDULED); ptp->type = ERTS_PORT_TASK_FREE; ptp->event = (ErlDrvEvent) -1; ptp->event_data = NULL; @@ -651,20 +648,18 @@ erts_port_task_free_port(Port *pp) goto enqueue_free; } ASSERT(!pp->sched.taskq); - erts_smp_port_minor_lock(pp); - erts_smp_atomic32_read_bset_relb(&pp->state, - (ERTS_PORT_SFLG_CLOSING - | ERTS_PORT_SFLG_FREE_SCHEDULED), - ERTS_PORT_SFLG_FREE_SCHEDULED); - erts_may_save_closed_port(pp); - erts_smp_port_minor_unlock(pp); - erts_smp_atomic32_dec_nob(&pp->common.refc); /* Not alive */ - ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&pp->common.refc) > 0); /* Lock */ + erts_atomic32_read_bset_relb(&pp->state, + (ERTS_PORT_SFLG_CLOSING + | ERTS_PORT_SFLG_FREE_SCHEDULED), + ERTS_PORT_SFLG_FREE_SCHEDULED); handle_remaining_tasks(runq, pp); /* May release runq lock */ ASSERT(!pp->sched.exe_taskq && (!ptqp || !ptqp->first)); pp->sched.taskq = NULL; ERTS_PT_CHK_PRES_PORTQ(runq, pp); erts_smp_runq_unlock(runq); +#ifndef ERTS_SMP + pp->cleanup = 1; +#endif } } @@ -703,14 +698,21 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) goto done; } + if (erts_smp_port_trylock(pp) == EBUSY) { + erts_smp_runq_unlock(runq); + erts_smp_port_lock(pp); + erts_smp_runq_lock(runq); + } + ASSERT(pp->sched.in_runq); pp->sched.in_runq = 0; + if (!pp->sched.taskq) { if (erts_system_profile_flags.runnable_ports) profile_runnable_port(pp, am_inactive); res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) != (erts_aint_t) 0); - goto done; + goto release_port_done; } *curr_port_pp = pp; @@ -721,12 +723,6 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) ASSERT(!pp->sched.exe_taskq); pp->sched.exe_taskq = ptqp; - - if (erts_smp_port_trylock(pp) == EBUSY) { - erts_smp_runq_unlock(runq); - erts_smp_port_lock(pp); - erts_smp_runq_lock(runq); - } if (erts_sched_stat.enabled) { ErtsSchedulerData *esdp = erts_get_scheduler_data(); @@ -771,31 +767,31 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) erts_smp_runq_lock(runq); erts_unblock_fpe(fpe_was_unmasked); - ASSERT(erts_smp_atomic32_read_nob(&pp->state) + ASSERT(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLG_FREE_SCHEDULED); if (ptqp->first || (pp->sched.taskq && pp->sched.taskq->first)) handle_remaining_tasks(runq, pp); ASSERT(!ptqp->first && (!pp->sched.taskq || !pp->sched.taskq->first)); - erts_smp_atomic32_dec_nob(&pp->common.refc); /* Not alive */ - ERTS_LC_ASSERT(erts_smp_atomic32_read_nob(&pp->common.refc) > 0); /* Lock */ port_task_free(ptp); if (pp->sched.taskq) port_taskq_free(pp->sched.taskq); pp->sched.taskq = NULL; - +#ifndef ERTS_SMP + pp->cleanup = 1; +#endif goto tasks_done; case ERTS_PORT_TASK_TIMEOUT: reds += ERTS_PORT_REDS_TIMEOUT; - if (!(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)) { + if (!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)) { DTRACE_DRIVER(driver_timeout, pp); (*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data); } break; case ERTS_PORT_TASK_INPUT: reds += ERTS_PORT_REDS_INPUT; - ASSERT((erts_smp_atomic32_read_nob(&pp->state) + ASSERT((erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_ready_input, pp); /* NOTE some windows drivers use ->ready_input for input and output */ @@ -804,7 +800,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) break; case ERTS_PORT_TASK_OUTPUT: reds += ERTS_PORT_REDS_OUTPUT; - ASSERT((erts_smp_atomic32_read_nob(&pp->state) + ASSERT((erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_ready_output, pp); (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, ptp->event); @@ -812,7 +808,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) break; case ERTS_PORT_TASK_EVENT: reds += ERTS_PORT_REDS_EVENT; - ASSERT((erts_smp_atomic32_read_nob(&pp->state) + ASSERT((erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_event, pp); (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data, ptp->event, ptp->event_data); @@ -828,7 +824,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) break; } - if ((erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLG_CLOSING) + if ((erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLG_CLOSING) && erts_is_port_ioq_empty(pp)) { reds += ERTS_PORT_REDS_TERMINATE; erts_terminate_port(pp); @@ -879,7 +875,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) ErtsRunQueue *xrunq; #endif - ASSERT(!(erts_smp_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)); + ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)); ASSERT(pp->sched.taskq->first); #ifdef ERTS_SMP @@ -922,23 +918,9 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) { trace_sched_ports(pp, am_out); } -#ifndef ERTS_SMP + +release_port_done: erts_port_release(pp); -#else - { - erts_aint32_t refc; - erts_smp_mtx_unlock(pp->lock); - refc = erts_smp_atomic32_dec_read_nob(&pp->common.refc); - ASSERT(refc >= 0); - if (refc == 0) { - erts_smp_runq_unlock(runq); - erts_port_cleanup(pp); /* Might aquire runq lock */ - erts_smp_runq_lock(runq); - res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) - != (erts_aint_t) 0); - } - } -#endif done: ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); |