/* * %CopyrightBegin% * * Copyright Ericsson AB 2006-2012. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in * compliance with the License. You should have received a copy of the * Erlang Public License along with this software. If not, it can be * retrieved online at http://www.erlang.org/. * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See * the License for the specific language governing rights and limitations * under the License. * * %CopyrightEnd% */ /* * Description: Scheduling of port tasks * * Author: Rickard Green */ #define ERL_PORT_TASK_C__ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "global.h" #include "erl_port_task.h" #include "dist.h" #include "dtrace-wrapper.h" #if defined(DEBUG) && 0 #define HARD_DEBUG #endif /* * Costs in reductions for some port operations. */ #define ERTS_PORT_REDS_EXECUTE 0 #define ERTS_PORT_REDS_FREE 50 #define ERTS_PORT_REDS_TIMEOUT 200 #define ERTS_PORT_REDS_INPUT 200 #define ERTS_PORT_REDS_OUTPUT 200 #define ERTS_PORT_REDS_EVENT 200 #define ERTS_PORT_REDS_TERMINATE 100 #define ERTS_PORT_TASK_INVALID_PORT(P, ID) \ ((erts_port_status_get((P)) & ERTS_PORT_SFLGS_DEAD) || (P)->id != (ID)) #define ERTS_PORT_IS_IN_RUNQ(RQ, P) \ ((P)->sched.next || (P)->sched.prev || (RQ)->ports.start == (P)) #ifdef USE_VM_PROBES #define DTRACE_DRIVER(PROBE_NAME, PP) \ if (DTRACE_ENABLED(driver_ready_input)) { \ DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE); \ DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); \ \ dtrace_pid_str(PP->connected, process_str); \ dtrace_port_str(PP, port_str); \ DTRACE3(PROBE_NAME, process_str, port_str, PP->name); \ } #else #define DTRACE_DRIVER(PROBE_NAME, PP) do {} while(0) #endif erts_smp_atomic_t erts_port_task_outstanding_io_tasks; struct ErtsPortTaskQueue_ { ErtsPortTask *first; ErtsPortTask *last; Port *port; }; struct ErtsPortTask_ { ErtsPortTask *prev; ErtsPortTask *next; ErtsPortTaskQueue *queue; ErtsPortTaskHandle *handle; ErtsPortTaskType type; ErlDrvEvent event; ErlDrvEventData event_data; }; #ifdef HARD_DEBUG #define ERTS_PT_CHK_PORTQ(RQ) check_port_queue((RQ), NULL, 0) #define ERTS_PT_CHK_PRES_PORTQ(RQ, PP) check_port_queue((RQ), (PP), -1) #define ERTS_PT_CHK_IN_PORTQ(RQ, PP) check_port_queue((RQ), (PP), 1) #define ERTS_PT_CHK_NOT_IN_PORTQ(RQ, PP) check_port_queue((RQ), (PP), 0) #define ERTS_PT_CHK_TASKQ(Q) check_task_queue((Q), NULL, 0) #define ERTS_PT_CHK_IN_TASKQ(Q, T) check_task_queue((Q), (T), 1) #define ERTS_PT_CHK_NOT_IN_TASKQ(Q, T) check_task_queue((Q), (T), 0) static void check_port_queue(Port *chk_pp, int inq); static void check_task_queue(ErtsPortTaskQueue *ptqp, ErtsPortTask *chk_ptp, int inq); #else #define ERTS_PT_CHK_PORTQ(RQ) #define ERTS_PT_CHK_PRES_PORTQ(RQ, PP) #define ERTS_PT_CHK_IN_PORTQ(RQ, PP) #define ERTS_PT_CHK_NOT_IN_PORTQ(RQ, PP) #define ERTS_PT_CHK_TASKQ(Q) #define ERTS_PT_CHK_IN_TASKQ(Q, T) #define ERTS_PT_CHK_NOT_IN_TASKQ(Q, T) #endif static void handle_remaining_tasks(ErtsRunQueue *runq, Port *pp); ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(port_task, ErtsPortTask, 200, ERTS_ALC_T_PORT_TASK) ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(port_taskq, ErtsPortTaskQueue, 50, ERTS_ALC_T_PORT_TASKQ) /* * Task handle manipulation. */ static ERTS_INLINE ErtsPortTask * handle2task(ErtsPortTaskHandle *pthp) { return (ErtsPortTask *) erts_smp_atomic_read_nob(pthp); } static ERTS_INLINE void reset_handle(ErtsPortTask *ptp) { if (ptp->handle) { ASSERT(ptp == handle2task(ptp->handle)); erts_smp_atomic_set_nob(ptp->handle, (erts_aint_t) NULL); } } static ERTS_INLINE void set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp) { ptp->handle = pthp; if (pthp) { erts_smp_atomic_set_nob(pthp, (erts_aint_t) ptp); ASSERT(ptp == handle2task(ptp->handle)); } } /* * Port queue operations */ static ERTS_INLINE void enqueue_port(ErtsRunQueue *runq, Port *pp) { ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); pp->sched.next = NULL; pp->sched.in_runq = 1; if (runq->ports.end) { ASSERT(runq->ports.start); runq->ports.end->sched.next = pp; } else { ASSERT(!runq->ports.start); runq->ports.start = pp; } runq->ports.end = pp; ASSERT(runq->ports.start && runq->ports.end); erts_smp_inc_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL); } static ERTS_INLINE Port * pop_port(ErtsRunQueue *runq) { Port *pp = runq->ports.start; ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); if (!pp) { ASSERT(!runq->ports.end); } else { runq->ports.start = runq->ports.start->sched.next; if (!runq->ports.start) { ASSERT(runq->ports.end == pp); runq->ports.end = NULL; } erts_smp_dec_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL); } ASSERT(runq->ports.start || !runq->ports.end); ASSERT(runq->ports.end || !runq->ports.start); return pp; } #ifdef HARD_DEBUG static void check_port_queue(ErtsRunQueue *runq, Port *chk_pp, int inq) { Port *pp; Port *last_pp; Port *first_pp = runq->ports.start; int no_forward = 0, no_backward = 0; int found_forward = 0, found_backward = 0; if (!first_pp) { ASSERT(!runq->ports.end); } else { ASSERT(!first_pp->sched.prev); for (pp = first_pp; pp; pp = pp->sched.next) { ASSERT(pp->sched.taskq); if (pp->sched.taskq->first) no_forward++; if (chk_pp == pp) found_forward = 1; if (!pp->sched.prev) { ASSERT(first_pp == pp); } if (!pp->sched.next) { ASSERT(runq->ports.end == pp); last_pp = pp; } } for (pp = last_pp; pp; pp = pp->sched.prev) { ASSERT(pp->sched.taskq); if (pp->sched.taskq->last) no_backward++; if (chk_pp == pp) found_backward = 1; if (!pp->sched.prev) { ASSERT(first_pp == pp); } if (!pp->sched.next) { ASSERT(runq->ports.end == pp); } check_task_queue(pp->sched.taskq, NULL, 0); } ASSERT(no_forward == no_backward); } ASSERT(no_forward == RUNQ_READ_LEN(&runq->ports.info.len)); if (chk_pp) { if (chk_pp->sched.taskq || chk_pp->sched.exe_taskq) { ASSERT(chk_pp->sched.taskq != chk_pp->sched.exe_taskq); } ASSERT(!chk_pp->sched.taskq || chk_pp->sched.taskq->first); if (inq < 0) inq = chk_pp->sched.taskq && !chk_pp->sched.exe_taskq; if (inq) { ASSERT(found_forward && found_backward); } else { ASSERT(!found_forward && !found_backward); } } } #endif /* * Task queue operations */ static ERTS_INLINE ErtsPortTaskQueue * port_taskq_init(ErtsPortTaskQueue *ptqp, Port *pp) { if (ptqp) { ptqp->first = NULL; ptqp->last = NULL; ptqp->port = pp; } return ptqp; } static ERTS_INLINE void enqueue_task(ErtsPortTaskQueue *ptqp, ErtsPortTask *ptp) { ERTS_PT_CHK_NOT_IN_TASKQ(ptqp, ptp); ptp->next = NULL; ptp->prev = ptqp->last; ptp->queue = ptqp; if (ptqp->last) { ASSERT(ptqp->first); ptqp->last->next = ptp; } else { ASSERT(!ptqp->first); ptqp->first = ptp; } ptqp->last = ptp; ERTS_PT_CHK_IN_TASKQ(ptqp, ptp); } static ERTS_INLINE void push_task(ErtsPortTaskQueue *ptqp, ErtsPortTask *ptp) { ERTS_PT_CHK_NOT_IN_TASKQ(ptqp, ptp); ptp->next = ptqp->first; ptp->prev = NULL; ptp->queue = ptqp; if (ptqp->first) { ASSERT(ptqp->last); ptqp->first->prev = ptp; } else { ASSERT(!ptqp->last); ptqp->last = ptp; } ptqp->first = ptp; ERTS_PT_CHK_IN_TASKQ(ptqp, ptp); } static ERTS_INLINE void dequeue_task(ErtsPortTask *ptp) { ASSERT(ptp); ASSERT(ptp->queue); ERTS_PT_CHK_IN_TASKQ(ptp->queue, ptp); if (ptp->next) ptp->next->prev = ptp->prev; else { ASSERT(ptp->queue->last == ptp); ptp->queue->last = ptp->prev; } if (ptp->prev) ptp->prev->next = ptp->next; else { ASSERT(ptp->queue->first == ptp); ptp->queue->first = ptp->next; } ASSERT(ptp->queue->first || !ptp->queue->last); ASSERT(ptp->queue->last || !ptp->queue->first); ERTS_PT_CHK_NOT_IN_TASKQ(ptp->queue, ptp); } static ERTS_INLINE ErtsPortTask * pop_task(ErtsPortTaskQueue *ptqp) { ErtsPortTask *ptp = ptqp->first; if (!ptp) { ASSERT(!ptqp->last); } else { ERTS_PT_CHK_IN_TASKQ(ptqp, ptp); ASSERT(!ptp->prev); ptqp->first = ptp->next; if (ptqp->first) ptqp->first->prev = NULL; else { ASSERT(ptqp->last == ptp); ptqp->last = NULL; } ASSERT(ptp->queue->first || !ptp->queue->last); ASSERT(ptp->queue->last || !ptp->queue->first); } ERTS_PT_CHK_NOT_IN_TASKQ(ptqp, ptp); return ptp; } #ifdef HARD_DEBUG static void check_task_queue(ErtsPortTaskQueue *ptqp, ErtsPortTask *chk_ptp, int inq) { ErtsPortTask *ptp; ErtsPortTask *last_ptp; ErtsPortTask *first_ptp = ptqp->first; int found_forward = 0, found_backward = 0; if (!first_ptp) { ASSERT(!ptqp->last); } else { ASSERT(!first_ptp->prev); for (ptp = first_ptp; ptp; ptp = ptp->next) { ASSERT(ptp->queue == ptqp); if (chk_ptp == ptp) found_forward = 1; if (!ptp->prev) { ASSERT(first_ptp == ptp); } if (!ptp->next) { ASSERT(ptqp->last == ptp); last_ptp = ptp; } } for (ptp = last_ptp; ptp; ptp = ptp->prev) { ASSERT(ptp->queue == ptqp); if (chk_ptp == ptp) found_backward = 1; if (!ptp->prev) { ASSERT(first_ptp == ptp); } if (!ptp->next) { ASSERT(ptqp->last == ptp); } } } if (chk_ptp) { if (inq) { ASSERT(found_forward && found_backward); } else { ASSERT(!found_forward && !found_backward); } } } #endif /* * Abort a scheduled task. */ int erts_port_task_abort(Eterm id, ErtsPortTaskHandle *pthp) { ErtsRunQueue *runq; ErtsPortTaskQueue *ptqp; ErtsPortTask *ptp; Port *pp; pp = &erts_port[internal_port_index(id)]; runq = erts_port_runq(pp); if (!runq) return 1; ptp = handle2task(pthp); if (!ptp) { erts_smp_runq_unlock(runq); return 1; } ASSERT(ptp->handle == pthp); ptqp = ptp->queue; ASSERT(pp == ptqp->port); ERTS_PT_CHK_PRES_PORTQ(runq, pp); ASSERT(ptqp); ASSERT(ptqp->first); dequeue_task(ptp); reset_handle(ptp); switch (ptp->type) { case ERTS_PORT_TASK_INPUT: case ERTS_PORT_TASK_OUTPUT: case ERTS_PORT_TASK_EVENT: ASSERT(erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) > 0); erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks); break; default: break; } ASSERT(ptqp == pp->sched.taskq || ptqp == pp->sched.exe_taskq); if (ptqp->first || pp->sched.taskq != ptqp) ptqp = NULL; else pp->sched.taskq = NULL; ERTS_PT_CHK_PRES_PORTQ(runq, pp); erts_smp_runq_unlock(runq); port_task_free(ptp); if (ptqp) port_taskq_free(ptqp); return 0; } /* * Schedule a task. */ int erts_port_task_schedule(Eterm id, ErtsPortTaskHandle *pthp, ErtsPortTaskType type, ErlDrvEvent event, ErlDrvEventData event_data) { ErtsRunQueue *runq; Port *pp; ErtsPortTask *ptp; int enq_port = 0; /* * NOTE: We might not have the port lock here. We are only * allowed to access the 'sched', 'tab_status', * and 'id' fields of the port struct while * tasks_lock is held. */ if (pthp && erts_port_task_is_scheduled(pthp)) { ASSERT(0); erts_port_task_abort(id, pthp); } ptp = port_task_alloc(); ASSERT(is_internal_port(id)); pp = &erts_port[internal_port_index(id)]; runq = erts_port_runq(pp); if (!runq || ERTS_PORT_TASK_INVALID_PORT(pp, id)) { if (runq) erts_smp_runq_unlock(runq); return -1; } ASSERT(!erts_port_task_is_scheduled(pthp)); ERTS_PT_CHK_PRES_PORTQ(runq, pp); if (!pp->sched.taskq) { pp->sched.taskq = port_taskq_init(port_taskq_alloc(), pp); enq_port = !pp->sched.in_runq && !pp->sched.exe_taskq; } #ifdef ERTS_SMP if (enq_port) { ErtsRunQueue *xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL); if (xrunq) { /* Port emigrated ... */ erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq); erts_smp_runq_unlock(runq); runq = erts_port_runq(pp); if (!runq) return -1; } } #endif ASSERT(pp->sched.taskq); ASSERT(ptp); ptp->type = type; ptp->event = event; ptp->event_data = event_data; set_handle(ptp, pthp); switch (type) { case ERTS_PORT_TASK_FREE: erl_exit(ERTS_ABORT_EXIT, "erts_port_task_schedule(): Cannot schedule free task\n"); break; case ERTS_PORT_TASK_INPUT: case ERTS_PORT_TASK_OUTPUT: case ERTS_PORT_TASK_EVENT: erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks); /* Fall through... */ default: enqueue_task(pp->sched.taskq, ptp); break; } #ifndef ERTS_SMP /* * When (!enq_port && !pp->sched.exe_taskq) is true in the smp case, * the port might not be in the run queue. If this is the case, another * thread is in the process of enqueueing the port. This very seldom * occur, but do occur and is a valid scenario. Debug info showing this * enqueue in progress must be introduced before we can enable (modified * versions of these) assertions in the smp case again. */ #if defined(HARD_DEBUG) if (pp->sched.exe_taskq || enq_port) ERTS_PT_CHK_NOT_IN_PORTQ(runq, pp); else ERTS_PT_CHK_IN_PORTQ(runq, pp); #elif defined(DEBUG) if (!enq_port && !pp->sched.exe_taskq) { /* We should be in port run q */ ASSERT(pp->sched.in_runq); } #endif #endif if (!enq_port) { ERTS_PT_CHK_PRES_PORTQ(runq, pp); erts_smp_runq_unlock(runq); } else { enqueue_port(runq, pp); ERTS_PT_CHK_PRES_PORTQ(runq, pp); if (erts_system_profile_flags.runnable_ports) { profile_runnable_port(pp, am_active); } erts_smp_runq_unlock(runq); erts_smp_notify_inc_runq(runq); } return 0; } void erts_port_task_free_port(Port *pp) { ErtsRunQueue *runq; ErtsPortTaskQueue *ptqp; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); ASSERT(!(pp->status & ERTS_PORT_SFLGS_DEAD)); runq = erts_port_runq(pp); ASSERT(runq); ERTS_PT_CHK_PRES_PORTQ(runq, pp); ptqp = pp->sched.exe_taskq; if (ptqp) { /* I (this thread) am currently executing this port, free it when scheduled out... */ ErtsPortTask *ptp; enqueue_free: ptp = port_task_alloc(); erts_smp_port_state_lock(pp); pp->status &= ~ERTS_PORT_SFLG_CLOSING; pp->status |= ERTS_PORT_SFLG_FREE_SCHEDULED; erts_may_save_closed_port(pp); erts_smp_port_state_unlock(pp); ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&pp->refc) > 1); ptp->type = ERTS_PORT_TASK_FREE; ptp->event = (ErlDrvEvent) -1; ptp->event_data = NULL; set_handle(ptp, NULL); push_task(ptqp, ptp); ERTS_PT_CHK_PRES_PORTQ(runq, pp); erts_smp_runq_unlock(runq); } else { if (pp->sched.in_runq) { ptqp = pp->sched.taskq; if (!ptqp) pp->sched.taskq = ptqp = port_taskq_init(port_taskq_alloc(), pp); goto enqueue_free; } ASSERT(!pp->sched.taskq); erts_smp_port_state_lock(pp); pp->status &= ~ERTS_PORT_SFLG_CLOSING; pp->status |= ERTS_PORT_SFLG_FREE_SCHEDULED; erts_may_save_closed_port(pp); erts_smp_port_state_unlock(pp); erts_smp_atomic_dec_nob(&pp->refc); /* Not alive */ ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&pp->refc) > 0); /* Lock */ handle_remaining_tasks(runq, pp); /* May release runq lock */ ASSERT(!pp->sched.exe_taskq && (!ptqp || !ptqp->first)); pp->sched.taskq = NULL; ERTS_PT_CHK_PRES_PORTQ(runq, pp); erts_smp_runq_unlock(runq); } } typedef struct { ErtsRunQueue *runq; int *resp; } ErtsPortTaskExeBlockData; /* * Run all scheduled tasks for the first port in run queue. If * new tasks appear while running reschedule port (free task is * an exception; it is always handled instantly). * * erts_port_task_execute() is called by scheduler threads between * scheduleing of processes. Sched lock should be held by caller. */ int erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) { Port *pp; ErtsPortTaskQueue *ptqp; ErtsPortTask *ptp; int res = 0; int reds = ERTS_PORT_REDS_EXECUTE; erts_aint_t io_tasks_executed = 0; int fpe_was_unmasked; ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); ERTS_PT_CHK_PORTQ(runq); pp = pop_port(runq); if (!pp) { res = 0; goto done; } ASSERT(pp->sched.in_runq); pp->sched.in_runq = 0; if (!pp->sched.taskq) { if (erts_system_profile_flags.runnable_ports) profile_runnable_port(pp, am_inactive); res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) != (erts_aint_t) 0); goto done; } *curr_port_pp = pp; ASSERT(pp->sched.taskq->first); ptqp = pp->sched.taskq; pp->sched.taskq = NULL; ASSERT(!pp->sched.exe_taskq); pp->sched.exe_taskq = ptqp; if (erts_smp_port_trylock(pp) == EBUSY) { erts_smp_runq_unlock(runq); erts_smp_port_lock(pp); erts_smp_runq_lock(runq); } if (erts_sched_stat.enabled) { ErtsSchedulerData *esdp = erts_get_scheduler_data(); Uint old = ERTS_PORT_SCHED_ID(pp, esdp->no); int migrated = old && old != esdp->no; erts_smp_spin_lock(&erts_sched_stat.lock); erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_executed++; erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].executed++; if (migrated) { erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_migrated++; erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].migrated++; } erts_smp_spin_unlock(&erts_sched_stat.lock); } /* trace port scheduling, in */ if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) { trace_sched_ports(pp, am_in); } ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); ERTS_PT_CHK_PRES_PORTQ(runq, pp); ptp = pop_task(ptqp); fpe_was_unmasked = erts_block_fpe(); while (ptp) { ASSERT(pp->sched.taskq != pp->sched.exe_taskq); reset_handle(ptp); erts_smp_runq_unlock(runq); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); ERTS_SMP_CHK_NO_PROC_LOCKS; ASSERT(pp->drv_ptr); switch (ptp->type) { case ERTS_PORT_TASK_FREE: /* May be pushed in q at any time */ reds += ERTS_PORT_REDS_FREE; erts_smp_runq_lock(runq); erts_unblock_fpe(fpe_was_unmasked); ASSERT(pp->status & ERTS_PORT_SFLG_FREE_SCHEDULED); if (ptqp->first || (pp->sched.taskq && pp->sched.taskq->first)) handle_remaining_tasks(runq, pp); ASSERT(!ptqp->first && (!pp->sched.taskq || !pp->sched.taskq->first)); erts_smp_atomic_dec_nob(&pp->refc); /* Not alive */ ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&pp->refc) > 0); /* Lock */ port_task_free(ptp); if (pp->sched.taskq) port_taskq_free(pp->sched.taskq); pp->sched.taskq = NULL; goto tasks_done; case ERTS_PORT_TASK_TIMEOUT: reds += ERTS_PORT_REDS_TIMEOUT; if (!(pp->status & ERTS_PORT_SFLGS_DEAD)) { DTRACE_DRIVER(driver_timeout, pp); (*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data); } break; case ERTS_PORT_TASK_INPUT: reds += ERTS_PORT_REDS_INPUT; ASSERT((pp->status & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_ready_input, pp); /* NOTE some windows drivers use ->ready_input for input and output */ (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data, ptp->event); io_tasks_executed++; break; case ERTS_PORT_TASK_OUTPUT: reds += ERTS_PORT_REDS_OUTPUT; ASSERT((pp->status & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_ready_output, pp); (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, ptp->event); io_tasks_executed++; break; case ERTS_PORT_TASK_EVENT: reds += ERTS_PORT_REDS_EVENT; ASSERT((pp->status & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_event, pp); (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data, ptp->event, ptp->event_data); io_tasks_executed++; break; case ERTS_PORT_TASK_DIST_CMD: reds += erts_dist_command(pp, CONTEXT_REDS-reds); break; default: erl_exit(ERTS_ABORT_EXIT, "Invalid port task type: %d\n", (int) ptp->type); break; } if ((pp->status & ERTS_PORT_SFLG_CLOSING) && erts_is_port_ioq_empty(pp)) { reds += ERTS_PORT_REDS_TERMINATE; erts_terminate_port(pp); } ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); #ifdef ERTS_SMP if (pp->xports) erts_smp_xports_unlock(pp); ASSERT(!pp->xports); #endif ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); port_task_free(ptp); erts_smp_runq_lock(runq); ptp = pop_task(ptqp); } tasks_done: erts_unblock_fpe(fpe_was_unmasked); if (io_tasks_executed) { ASSERT(erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) >= io_tasks_executed); erts_smp_atomic_add_relb(&erts_port_task_outstanding_io_tasks, -1*io_tasks_executed); } *curr_port_pp = NULL; #ifdef ERTS_SMP ASSERT(runq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue)); #endif if (!pp->sched.taskq) { ASSERT(pp->sched.exe_taskq); pp->sched.exe_taskq = NULL; if (erts_system_profile_flags.runnable_ports) profile_runnable_port(pp, am_inactive); } else { #ifdef ERTS_SMP ErtsRunQueue *xrunq; #endif ASSERT(!(pp->status & ERTS_PORT_SFLGS_DEAD)); ASSERT(pp->sched.taskq->first); #ifdef ERTS_SMP xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL); if (!xrunq) { #endif enqueue_port(runq, pp); ASSERT(pp->sched.exe_taskq); pp->sched.exe_taskq = NULL; /* No need to notify ourselves about inc in runq. */ #ifdef ERTS_SMP } else { /* Port emigrated ... */ erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq); erts_smp_runq_unlock(runq); xrunq = erts_port_runq(pp); if (xrunq) { enqueue_port(xrunq, pp); ASSERT(pp->sched.exe_taskq); pp->sched.exe_taskq = NULL; erts_smp_runq_unlock(xrunq); erts_smp_notify_inc_runq(xrunq); } erts_smp_runq_lock(runq); } #endif } res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) != (erts_aint_t) 0); ERTS_PT_CHK_PRES_PORTQ(runq, pp); port_taskq_free(ptqp); /* trace port scheduling, out */ if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) { trace_sched_ports(pp, am_out); } #ifndef ERTS_SMP erts_port_release(pp); #else { erts_aint_t refc; erts_smp_mtx_unlock(pp->lock); refc = erts_smp_atomic_dec_read_nob(&pp->refc); ASSERT(refc >= 0); if (refc == 0) { erts_smp_runq_unlock(runq); erts_port_cleanup(pp); /* Might aquire runq lock */ erts_smp_runq_lock(runq); res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) != (erts_aint_t) 0); } } #endif done: ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); ERTS_PORT_REDUCTIONS_EXECUTED(runq, reds); return res; } /* * Handle remaining tasks after a free task. */ static void handle_remaining_tasks(ErtsRunQueue *runq, Port *pp) { int i; ErtsPortTask *ptp; ErtsPortTaskQueue *ptqps[] = {pp->sched.exe_taskq, pp->sched.taskq}; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); for (i = 0; i < sizeof(ptqps)/sizeof(ErtsPortTaskQueue *); i++) { if (!ptqps[i]) continue; ptp = pop_task(ptqps[i]); while (ptp) { reset_handle(ptp); erts_smp_runq_unlock(runq); switch (ptp->type) { case ERTS_PORT_TASK_FREE: case ERTS_PORT_TASK_TIMEOUT: break; case ERTS_PORT_TASK_INPUT: erts_stale_drv_select(pp->id, ptp->event, DO_READ, 1); break; case ERTS_PORT_TASK_OUTPUT: erts_stale_drv_select(pp->id, ptp->event, DO_WRITE, 1); break; case ERTS_PORT_TASK_EVENT: erts_stale_drv_select(pp->id, ptp->event, 0, 1); break; case ERTS_PORT_TASK_DIST_CMD: break; default: erl_exit(ERTS_ABORT_EXIT, "Invalid port task type: %d\n", (int) ptp->type); } port_task_free(ptp); erts_smp_runq_lock(runq); ptp = pop_task(ptqps[i]); } } ASSERT(!pp->sched.taskq || !pp->sched.taskq->first); } int erts_port_is_scheduled(Port *pp) { int res; ErtsRunQueue *runq = erts_port_runq(pp); if (!runq) return 0; res = pp->sched.taskq || pp->sched.exe_taskq; erts_smp_runq_unlock(runq); return res; } #ifdef ERTS_SMP void erts_enqueue_port(ErtsRunQueue *rq, Port *pp) { ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); ASSERT(rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue)); ASSERT(pp->sched.in_runq); enqueue_port(rq, pp); } Port * erts_dequeue_port(ErtsRunQueue *rq) { Port *pp; ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); pp = pop_port(rq); ASSERT(!pp || rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue)); ASSERT(!pp || pp->sched.in_runq); return pp; } #endif /* * Initialize the module. */ void erts_port_task_init(void) { erts_smp_atomic_init_nob(&erts_port_task_outstanding_io_tasks, (erts_aint_t) 0); init_port_task_alloc(); init_port_taskq_alloc(); }