/* * %CopyrightBegin% * * Copyright Ericsson AB 2006-2013. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in * compliance with the License. You should have received a copy of the * Erlang Public License along with this software. If not, it can be * retrieved online at http://www.erlang.org/. * * Software distributed under the License is distributed on an "AS IS" * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See * the License for the specific language governing rights and limitations * under the License. * * %CopyrightEnd% */ /* * Description: Scheduling of port tasks * * Author: Rickard Green */ #define ERL_PORT_TASK_C__ #ifdef HAVE_CONFIG_H # include "config.h" #endif #include "global.h" #include "erl_port_task.h" #include "dist.h" #include "dtrace-wrapper.h" #include /* * ERTS_PORT_CALLBACK_VREDS: Limit the amount of callback calls we do... */ #define ERTS_PORT_CALLBACK_VREDS (CONTEXT_REDS/20) #if defined(DEBUG) && 0 #define ERTS_HARD_DEBUG_TASK_QUEUES #else #undef ERTS_HARD_DEBUG_TASK_QUEUES #endif #ifdef ERTS_HARD_DEBUG_TASK_QUEUES static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue); #define ERTS_PT_DBG_CHK_TASK_QS(PP, EQ, PBQ) \ chk_task_queues((PP), (EQ), (PBQ)) #else #define ERTS_PT_DBG_CHK_TASK_QS(PP, EQ, PBQ) #endif #ifdef USE_VM_PROBES #define DTRACE_DRIVER(PROBE_NAME, PP) \ if (DTRACE_ENABLED(PROBE_NAME)) { \ DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE); \ DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); \ \ dtrace_pid_str(ERTS_PORT_GET_CONNECTED(PP), process_str); \ dtrace_port_str(PP, port_str); \ DTRACE3(PROBE_NAME, process_str, port_str, PP->name); \ } #else #define DTRACE_DRIVER(PROBE_NAME, PP) do {} while(0) #endif erts_smp_atomic_t erts_port_task_outstanding_io_tasks; #define ERTS_PT_STATE_SCHEDULED 0 #define ERTS_PT_STATE_ABORTED 1 #define ERTS_PT_STATE_EXECUTING 2 typedef union { struct { /* I/O tasks */ ErlDrvEvent event; ErlDrvEventData event_data; } io; struct { ErtsProc2PortSigCallback callback; ErtsProc2PortSigData data; } psig; } ErtsPortTaskTypeData; struct ErtsPortTask_ { erts_smp_atomic32_t state; ErtsPortTaskType type; union { struct { ErtsPortTask *next; ErtsPortTaskHandle *handle; int flags; Uint32 ref[ERTS_MAX_REF_NUMBERS]; ErtsPortTaskTypeData td; } alive; ErtsThrPrgrLaterOp release; } u; }; struct ErtsPortTaskHandleList_ { ErtsPortTaskHandle handle; union { ErtsPortTaskHandleList *next; #ifdef ERTS_SMP ErtsThrPrgrLaterOp release; #endif } u; }; typedef struct ErtsPortTaskBusyCaller_ ErtsPortTaskBusyCaller; struct ErtsPortTaskBusyCaller_ { ErtsPortTaskBusyCaller *next; Eterm caller; SWord count; ErtsPortTask *last; }; #define ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS 17 struct ErtsPortTaskBusyCallerTable_ { ErtsPortTaskBusyCaller *bucket[ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS]; ErtsPortTaskBusyCaller pre_alloc_busy_caller; }; static void begin_port_cleanup(Port *pp, ErtsPortTask **execq, int *processing_busy_q_p); ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(port_task, ErtsPortTask, 1000, ERTS_ALC_T_PORT_TASK) ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(busy_caller_table, ErtsPortTaskBusyCallerTable, 50, ERTS_ALC_T_BUSY_CALLER_TAB) #ifdef ERTS_SMP static void call_port_task_free(void *vptp) { port_task_free((ErtsPortTask *) vptp); } #endif static ERTS_INLINE void schedule_port_task_free(ErtsPortTask *ptp) { #ifdef ERTS_SMP erts_schedule_thr_prgr_later_op(call_port_task_free, (void *) ptp, &ptp->u.release); #else port_task_free(ptp); #endif } static ERTS_INLINE ErtsPortTask * p2p_sig_data_to_task(ErtsProc2PortSigData *sigdp) { ErtsPortTask *ptp; char *ptr = (char *) sigdp; ptr -= offsetof(ErtsPortTask, u.alive.td.psig.data); ptp = (ErtsPortTask *) ptr; ASSERT(ptp->type == ERTS_PORT_TASK_PROC_SIG); return ptp; } ErtsProc2PortSigData * erts_port_task_alloc_p2p_sig_data(void) { ErtsPortTask *ptp = port_task_alloc(); ptp->type = ERTS_PORT_TASK_PROC_SIG; ptp->u.alive.flags = ERTS_PT_FLG_SIG_DEP; erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED); ASSERT(ptp == p2p_sig_data_to_task(&ptp->u.alive.td.psig.data)); return &ptp->u.alive.td.psig.data; } static ERTS_INLINE Eterm task_caller(ErtsPortTask *ptp) { Eterm caller; ASSERT(ptp->type == ERTS_PORT_TASK_PROC_SIG); caller = ptp->u.alive.td.psig.data.caller; ASSERT(is_internal_pid(caller) || is_internal_port(caller)); return caller; } /* * Busy queue management */ static ERTS_INLINE int caller2bix(Eterm caller) { ASSERT(is_internal_pid(caller) || is_internal_port(caller)); return (int) (_GET_PID_DATA(caller) % ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS); } static void popped_from_busy_queue(Port *pp, ErtsPortTask *ptp, int last) { ErtsPortTaskBusyCaller **prev_bcpp = NULL, *bcp; ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table; Eterm caller = task_caller(ptp); int bix = caller2bix(caller); ASSERT(is_internal_pid(caller)); ASSERT(tabp); bcp = tabp->bucket[bix]; prev_bcpp = &tabp->bucket[bix]; ASSERT(bcp); while (bcp->caller != caller) { prev_bcpp = &bcp->next; bcp = bcp->next; ASSERT(bcp); } ASSERT(bcp->count > 0); if (--bcp->count != 0) { ASSERT(!last); } else { *prev_bcpp = bcp->next; if (bcp == &tabp->pre_alloc_busy_caller) bcp->caller = am_undefined; else erts_free(ERTS_ALC_T_BUSY_CALLER, bcp); if (last) { #ifdef DEBUG erts_aint32_t flags = #endif erts_smp_atomic32_read_band_nob( &pp->sched.flags, ~ERTS_PTS_FLG_HAVE_BUSY_TASKS); ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS); #ifdef DEBUG for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) { ASSERT(!tabp->bucket[bix]); } #endif busy_caller_table_free(tabp); pp->sched.taskq.local.busy.first = NULL; pp->sched.taskq.local.busy.last = NULL; pp->sched.taskq.local.busy.table = NULL; } } } static void busy_wait_move_to_busy_queue(Port *pp, ErtsPortTask *ptp) { ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table; Eterm caller = task_caller(ptp); ErtsPortTaskBusyCaller *bcp; int bix; ASSERT(is_internal_pid(caller)); /* * Port is busy and this task type needs to wait until not busy. */ ASSERT(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY); ptp->u.alive.next = NULL; if (pp->sched.taskq.local.busy.last) { ASSERT(pp->sched.taskq.local.busy.first); pp->sched.taskq.local.busy.last->u.alive.next = ptp; } else { int i; #ifdef DEBUG erts_aint32_t flags; #endif pp->sched.taskq.local.busy.first = ptp; #ifdef DEBUG flags = #endif erts_smp_atomic32_read_bor_nob(&pp->sched.flags, ERTS_PTS_FLG_HAVE_BUSY_TASKS); ASSERT(!(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS)); ASSERT(!tabp); tabp = busy_caller_table_alloc(); pp->sched.taskq.local.busy.table = tabp; for (i = 0; i < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; i++) tabp->bucket[i] = NULL; tabp->pre_alloc_busy_caller.caller = am_undefined; } pp->sched.taskq.local.busy.last = ptp; bix = caller2bix(caller); ASSERT(tabp); bcp = tabp->bucket[bix]; while (bcp && bcp->caller != caller) bcp = bcp->next; if (bcp) bcp->count++; else { if (tabp->pre_alloc_busy_caller.caller == am_undefined) bcp = &tabp->pre_alloc_busy_caller; else bcp = erts_alloc(ERTS_ALC_T_BUSY_CALLER, sizeof(ErtsPortTaskBusyCaller)); bcp->caller = caller; bcp->count = 1; bcp->next = tabp->bucket[bix]; tabp->bucket[bix] = bcp; } bcp->last = ptp; } static ERTS_INLINE int check_sig_dep_move_to_busy_queue(Port *pp, ErtsPortTask *ptp) { ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table; ErtsPortTask *last_ptp; ErtsPortTaskBusyCaller *bcp; int bix; Eterm caller; ASSERT(ptp->u.alive.flags & ERTS_PT_FLG_SIG_DEP); ASSERT(pp->sched.taskq.local.busy.last); ASSERT(tabp); /* * We are either not busy, or the task does not imply wait on busy port. * However, due to the signaling order requirements the task might depend * on other tasks in the busy queue. */ caller = task_caller(ptp); bix = caller2bix(caller); bcp = tabp->bucket[bix]; while (bcp && bcp->caller != caller) bcp = bcp->next; if (!bcp) return 0; /* * There are other tasks that we depend on in the busy queue; * move into busy queue. */ bcp->count++; last_ptp = bcp->last; ptp->u.alive.next = last_ptp->u.alive.next; if (!ptp->u.alive.next) { ASSERT(pp->sched.taskq.local.busy.last == last_ptp); pp->sched.taskq.local.busy.last = ptp; } last_ptp->u.alive.next = ptp; bcp->last = ptp; return 1; } static void no_sig_dep_move_from_busyq(Port *pp) { ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table; ErtsPortTask *first_ptp, *last_ptp, *ptp; ErtsPortTaskBusyCaller **prev_bcpp = NULL, *bcp = NULL; /* * Move tasks at the head of the busy queue that no longer * have any dependencies to busy wait tasks into the ordinary * queue. */ first_ptp = ptp = pp->sched.taskq.local.busy.first; ASSERT(ptp && !(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)); ASSERT(tabp); do { Eterm caller = task_caller(ptp); if (!bcp || bcp->caller != caller) { int bix = caller2bix(caller); prev_bcpp = &tabp->bucket[bix]; bcp = tabp->bucket[bix]; ASSERT(bcp); while (bcp->caller != caller) { ASSERT(bcp); prev_bcpp = &bcp->next; bcp = bcp->next; } } ASSERT(bcp->caller == caller); ASSERT(bcp->count > 0); if (--bcp->count == 0) { *prev_bcpp = bcp->next; if (bcp == &tabp->pre_alloc_busy_caller) bcp->caller = am_undefined; else erts_free(ERTS_ALC_T_BUSY_CALLER, bcp); } last_ptp = ptp; ptp = ptp->u.alive.next; } while (ptp && !(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)); pp->sched.taskq.local.busy.first = last_ptp->u.alive.next; if (!pp->sched.taskq.local.busy.first) { #ifdef DEBUG int bix; erts_aint32_t flags = #endif erts_smp_atomic32_read_band_nob( &pp->sched.flags, ~ERTS_PTS_FLG_HAVE_BUSY_TASKS); ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS); #ifdef DEBUG for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) { ASSERT(!tabp->bucket[bix]); } #endif busy_caller_table_free(tabp); pp->sched.taskq.local.busy.last = NULL; pp->sched.taskq.local.busy.table = NULL; } last_ptp->u.alive.next = pp->sched.taskq.local.first; pp->sched.taskq.local.first = first_ptp; } #ifdef ERTS_HARD_DEBUG_TASK_QUEUES static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue) { Sint tot_count, tot_table_count; int bix; ErtsPortTask *ptp, *last; ErtsPortTask *first = processing_busy_queue ? execq : pp->sched.taskq.local.busy.first; ErtsPortTask *nb_task_queue = processing_busy_queue ? pp->sched.taskq.local.first : execq; ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table; ErtsPortTaskBusyCaller *bcp; if (!first) { ASSERT(!tabp); ASSERT(!pp->sched.taskq.local.busy.last); ASSERT(!(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS)); return; } ASSERT(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS); ASSERT(tabp); tot_count = 0; ptp = first; while (ptp) { Sint count = 0; Eterm caller = task_caller(ptp); int bix = caller2bix(caller); for (bcp = tabp->bucket[bix]; bcp; bcp = bcp->next) if (bcp->caller == caller) break; ASSERT(bcp && bcp->caller == caller); ASSERT(bcp->last); while (1) { ErtsPortTask *ptp2; ASSERT(caller == task_caller(ptp)); count++; tot_count++; last = ptp; for (ptp2 = nb_task_queue; ptp2; ptp2 = ptp2->u.alive.next) { ASSERT(ptp != ptp2); } if (ptp == bcp->last) break; ptp = ptp->u.alive.next; } ASSERT(count == bcp->count); ptp = ptp->u.alive.next; } tot_table_count = 0; for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) { for (bcp = tabp->bucket[bix]; bcp; bcp = bcp->next) tot_table_count += bcp->count; } ASSERT(tot_count == tot_table_count); ASSERT(last == pp->sched.taskq.local.busy.last); } #endif /* ERTS_HARD_DEBUG_TASK_QUEUES */ /* * Task handle manipulation. */ static ERTS_INLINE void reset_port_task_handle(ErtsPortTaskHandle *pthp) { erts_smp_atomic_set_relb(pthp, (erts_aint_t) NULL); } static ERTS_INLINE ErtsPortTask * handle2task(ErtsPortTaskHandle *pthp) { return (ErtsPortTask *) erts_smp_atomic_read_acqb(pthp); } static ERTS_INLINE void reset_handle(ErtsPortTask *ptp) { if (ptp->u.alive.handle) { ASSERT(ptp == handle2task(ptp->u.alive.handle)); reset_port_task_handle(ptp->u.alive.handle); } } static ERTS_INLINE void set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp) { ptp->u.alive.handle = pthp; if (pthp) { erts_smp_atomic_set_relb(pthp, (erts_aint_t) ptp); ASSERT(ptp == handle2task(ptp->u.alive.handle)); } } /* * Busy port queue management */ static erts_aint32_t check_unset_busy_port_q(Port *pp, erts_aint32_t flags, ErtsPortTaskBusyPortQ *bpq) { ErlDrvSizeT qsize, low; int resume_procs = 0; ASSERT(bpq); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); erts_port_task_sched_lock(&pp->sched); qsize = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size); low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low); if (qsize < low) { erts_aint32_t mask = ~(ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q | ERTS_PTS_FLG_BUSY_PORT_Q); flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags, mask); if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q) resume_procs = 1; } else if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) { flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags, ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q); flags &= ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q; } erts_port_task_sched_unlock(&pp->sched); if (resume_procs) erts_port_resume_procs(pp); return flags; } static ERTS_INLINE void aborted_proc2port_data(Port *pp, ErlDrvSizeT size) { ErtsPortTaskBusyPortQ *bpq; erts_aint32_t flags; ErlDrvSizeT qsz; ASSERT(pp->sched.taskq.bpq); if (size == 0) return; bpq = pp->sched.taskq.bpq; qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size, (erts_aint_t) -size); ASSERT(qsz + size > qsz); flags = erts_smp_atomic32_read_nob(&pp->sched.flags); ASSERT(pp->sched.taskq.bpq); if ((flags & (ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q | ERTS_PTS_FLG_BUSY_PORT_Q)) != ERTS_PTS_FLG_BUSY_PORT_Q) return; if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low)) erts_smp_atomic32_read_bor_nob(&pp->sched.flags, ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q); } static ERTS_INLINE void dequeued_proc2port_data(Port *pp, ErlDrvSizeT size) { ErtsPortTaskBusyPortQ *bpq; erts_aint32_t flags; ErlDrvSizeT qsz; ASSERT(pp->sched.taskq.bpq); if (size == 0) return; bpq = pp->sched.taskq.bpq; qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size, (erts_aint_t) -size); ASSERT(qsz + size > qsz); flags = erts_smp_atomic32_read_nob(&pp->sched.flags); if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q)) return; if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->low)) check_unset_busy_port_q(pp, flags, bpq); } static ERTS_INLINE erts_aint32_t enqueue_proc2port_data(Port *pp, ErtsProc2PortSigData *sigdp, erts_aint32_t flags) { ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq; if (sigdp && bpq) { ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp); if (size) { erts_aint_t asize = erts_smp_atomic_add_read_acqb(&bpq->size, (erts_aint_t) size); ErlDrvSizeT qsz = (ErlDrvSizeT) asize; ASSERT(qsz - size < qsz); if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q) && qsz > bpq->high) { flags = erts_smp_atomic32_read_bor_acqb(&pp->sched.flags, ERTS_PTS_FLG_BUSY_PORT_Q); flags |= ERTS_PTS_FLG_BUSY_PORT_Q; qsz = (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->size); if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low)) { flags = (erts_smp_atomic32_read_bor_relb( &pp->sched.flags, ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)); flags |= ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q; } } ASSERT(!(flags & ERTS_PTS_FLG_EXIT)); } } return flags; } /* * erl_drv_busy_msgq_limits() is called by drivers either reading or * writing the limits. * * A limit of zero is interpreted as a read only request (using a * limit of zero would not be useful). Other values are interpreted * as a write-read request. */ void erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp) { Port *pp = erts_drvport2port(dport, NULL); ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq; int written = 0, resume_procs = 0; ErlDrvSizeT low, high; if (!pp || !bpq) { if (lowp) *lowp = ERL_DRV_BUSY_MSGQ_DISABLED; if (highp) *highp = ERL_DRV_BUSY_MSGQ_DISABLED; return; } low = lowp ? *lowp : 0; high = highp ? *highp : 0; erts_port_task_sched_lock(&pp->sched); if (low == ERL_DRV_BUSY_MSGQ_DISABLED || high == ERL_DRV_BUSY_MSGQ_DISABLED) { /* Disable busy msgq feature */ erts_aint32_t flags; pp->sched.taskq.bpq = NULL; flags = ~(ERTS_PTS_FLG_BUSY_PORT_Q|ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q); flags = erts_smp_atomic32_read_band_acqb(&pp->sched.flags, flags); if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q) resume_procs = 1; } else { if (!low) low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low); else { if (bpq->high < low) bpq->high = low; erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low); written = 1; } if (!high) high = bpq->high; else { if (low > high) { low = high; erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low); } bpq->high = high; written = 1; } if (written) { ErlDrvSizeT size = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size); if (size > high) erts_smp_atomic32_read_bor_relb(&pp->sched.flags, ERTS_PTS_FLG_BUSY_PORT_Q); else if (size < low) erts_smp_atomic32_read_bor_relb(&pp->sched.flags, ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q); } } erts_port_task_sched_unlock(&pp->sched); if (resume_procs) erts_port_resume_procs(pp); if (lowp) *lowp = low; if (highp) *highp = high; } /* * No-suspend handles. */ #ifdef ERTS_SMP static void free_port_task_handle_list(void *vpthlp) { erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp); } #endif static void schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp) { #ifdef ERTS_SMP erts_schedule_thr_prgr_later_op(free_port_task_handle_list, (void *) pthlp, &pthlp->u.release); #else erts_free(ERTS_ALC_T_PT_HNDL_LIST, pthlp); #endif } static ERTS_INLINE void abort_nosuspend_task(Port *pp, ErtsPortTaskType type, ErtsPortTaskTypeData *tdp) { ASSERT(type == ERTS_PORT_TASK_PROC_SIG); if (!pp->sched.taskq.bpq) tdp->psig.callback(NULL, ERTS_PORT_SFLG_INVALID, ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND, &tdp->psig.data); else { ErlDrvSizeT size = erts_proc2port_sig_command_data_size(&tdp->psig.data); tdp->psig.callback(NULL, ERTS_PORT_SFLG_INVALID, ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND, &tdp->psig.data); aborted_proc2port_data(pp, size); } } static ErtsPortTaskHandleList * get_free_nosuspend_handles(Port *pp) { ErtsPortTaskHandleList *nshp, *last_nshp = NULL; ERTS_SMP_LC_ASSERT(erts_port_task_sched_lock_is_locked(&pp->sched)); nshp = pp->sched.taskq.local.busy.nosuspend; while (nshp && !erts_port_task_is_scheduled(&nshp->handle)) { last_nshp = nshp; nshp = nshp->u.next; } if (!last_nshp) nshp = NULL; else { nshp = pp->sched.taskq.local.busy.nosuspend; pp->sched.taskq.local.busy.nosuspend = last_nshp->u.next; last_nshp->u.next = NULL; if (!pp->sched.taskq.local.busy.nosuspend) erts_smp_atomic32_read_band_nob(&pp->sched.flags, ~ERTS_PTS_FLG_HAVE_NS_TASKS); } return nshp; } static void free_nosuspend_handles(ErtsPortTaskHandleList *free_nshp) { while (free_nshp) { ErtsPortTaskHandleList *nshp = free_nshp; free_nshp = free_nshp->u.next; schedule_port_task_handle_list_free(nshp); } } /* * Port queue operations */ static ERTS_INLINE void enqueue_port(ErtsRunQueue *runq, Port *pp) { ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); pp->sched.next = NULL; if (runq->ports.end) { ASSERT(runq->ports.start); runq->ports.end->sched.next = pp; } else { ASSERT(!runq->ports.start); runq->ports.start = pp; } runq->ports.end = pp; ASSERT(runq->ports.start && runq->ports.end); erts_smp_inc_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL); } static ERTS_INLINE Port * pop_port(ErtsRunQueue *runq) { Port *pp = runq->ports.start; ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); if (!pp) { ASSERT(!runq->ports.end); } else { runq->ports.start = runq->ports.start->sched.next; if (!runq->ports.start) { ASSERT(runq->ports.end == pp); runq->ports.end = NULL; } erts_smp_dec_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL); } ASSERT(runq->ports.start || !runq->ports.end); ASSERT(runq->ports.end || !runq->ports.start); return pp; } /* * Task queue operations */ static ERTS_INLINE int enqueue_task(Port *pp, ErtsPortTask *ptp, ErtsProc2PortSigData *sigdp, ErtsPortTaskHandleList *ns_pthlp, erts_aint32_t *flagsp) { int res; erts_aint32_t fail_flags = ERTS_PTS_FLG_EXIT; erts_aint32_t flags; ptp->u.alive.next = NULL; if (ns_pthlp) fail_flags |= ERTS_PTS_FLG_BUSY_PORT; erts_port_task_sched_lock(&pp->sched); flags = erts_smp_atomic32_read_nob(&pp->sched.flags); if (flags & fail_flags) res = 0; else { if (ns_pthlp) { ns_pthlp->u.next = pp->sched.taskq.local.busy.nosuspend; pp->sched.taskq.local.busy.nosuspend = ns_pthlp; } if (pp->sched.taskq.in.last) { ASSERT(pp->sched.taskq.in.first); ASSERT(!pp->sched.taskq.in.last->u.alive.next); pp->sched.taskq.in.last->u.alive.next = ptp; } else { ASSERT(!pp->sched.taskq.in.first); pp->sched.taskq.in.first = ptp; } pp->sched.taskq.in.last = ptp; flags = enqueue_proc2port_data(pp, sigdp, flags); res = 1; } erts_port_task_sched_unlock(&pp->sched); *flagsp = flags; return res; } static ERTS_INLINE void prepare_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) { erts_aint32_t act = erts_smp_atomic32_read_nob(&pp->sched.flags); if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY_PORT)) { *execqp = pp->sched.taskq.local.first; *processing_busy_q_p = 0; } else { *execqp = pp->sched.taskq.local.busy.first; *processing_busy_q_p = 1; } ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p); while (1) { erts_aint32_t new, exp; new = exp = act; new &= ~ERTS_PTS_FLG_IN_RUNQ; new |= ERTS_PTS_FLG_EXEC; act = erts_smp_atomic32_cmpxchg_nob(&pp->sched.flags, new, exp); ASSERT(act & ERTS_PTS_FLG_IN_RUNQ); if (exp == act) break; } } /* finalize_exec() return value != 0 if port should remain active */ static ERTS_INLINE int finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q) { erts_aint32_t act; if (!processing_busy_q) pp->sched.taskq.local.first = *execq; else { pp->sched.taskq.local.busy.first = *execq; ASSERT(*execq); } ERTS_PT_DBG_CHK_TASK_QS(pp, *execq, processing_busy_q); *execq = NULL; act = erts_smp_atomic32_read_nob(&pp->sched.flags); if (act & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) act = check_unset_busy_port_q(pp, act, pp->sched.taskq.bpq); while (1) { erts_aint32_t new, exp; new = exp = act; new &= ~ERTS_PTS_FLG_EXEC; if (act & ERTS_PTS_FLG_HAVE_TASKS) new |= ERTS_PTS_FLG_IN_RUNQ; act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp); ASSERT(!(act & ERTS_PTS_FLG_IN_RUNQ)); if (exp == act) break; } return (act & ERTS_PTS_FLG_HAVE_TASKS) != 0; } static ERTS_INLINE erts_aint32_t select_queue_for_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) { erts_aint32_t flags = erts_smp_atomic32_read_nob(&pp->sched.flags); if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) flags = check_unset_busy_port_q(pp, flags, pp->sched.taskq.bpq); ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p); if (flags & ERTS_PTS_FLG_BUSY_PORT) { if (*processing_busy_q_p) { ErtsPortTask *ptp; ptp = pp->sched.taskq.local.busy.first = *execqp; if (!ptp) pp->sched.taskq.local.busy.last = NULL; else if (!(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)) no_sig_dep_move_from_busyq(pp); *execqp = pp->sched.taskq.local.first; *processing_busy_q_p = 0; ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p); } return flags; } /* Not busy */ if (!*processing_busy_q_p && pp->sched.taskq.local.busy.first) { pp->sched.taskq.local.first = *execqp; *execqp = pp->sched.taskq.local.busy.first; *processing_busy_q_p = 1; ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p); } return flags; } /* * check_task_for_exec() returns a value !0 if the task * is ok to execute; otherwise 0. */ static ERTS_INLINE int check_task_for_exec(Port *pp, erts_aint32_t flags, ErtsPortTask **execqp, int *processing_busy_q_p, ErtsPortTask *ptp) { if (!*processing_busy_q_p) { /* Processing normal queue */ ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p); if ((flags & ERTS_PTS_FLG_BUSY_PORT) && (ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)) { busy_wait_move_to_busy_queue(pp, ptp); ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p); return 0; } if (pp->sched.taskq.local.busy.last && (ptp->u.alive.flags & ERTS_PT_FLG_SIG_DEP)) { int res = !check_sig_dep_move_to_busy_queue(pp, ptp); ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p); return res; } } else { /* Processing busy queue */ ASSERT(!(flags & ERTS_PTS_FLG_BUSY_PORT)); ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p); popped_from_busy_queue(pp, ptp, !*execqp); if (!*execqp) { *execqp = pp->sched.taskq.local.first; *processing_busy_q_p = 0; } ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p); } return 1; } static ErtsPortTask * fetch_in_queue(Port *pp, ErtsPortTask **execqp) { ErtsPortTask *ptp; ErtsPortTaskHandleList *free_nshp = NULL; erts_port_task_sched_lock(&pp->sched); ptp = pp->sched.taskq.in.first; pp->sched.taskq.in.first = NULL; pp->sched.taskq.in.last = NULL; if (ptp) *execqp = ptp->u.alive.next; else erts_smp_atomic32_read_band_nob(&pp->sched.flags, ~ERTS_PTS_FLG_HAVE_TASKS); if (pp->sched.taskq.local.busy.nosuspend) free_nshp = get_free_nosuspend_handles(pp); erts_port_task_sched_unlock(&pp->sched); if (free_nshp) free_nosuspend_handles(free_nshp); return ptp; } static ERTS_INLINE ErtsPortTask * select_task_for_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) { ErtsPortTask *ptp; erts_aint32_t flags; flags = select_queue_for_exec(pp, execqp, processing_busy_q_p); while (1) { ptp = *execqp; if (ptp) *execqp = ptp->u.alive.next; else { ptp = fetch_in_queue(pp, execqp); if (!ptp) return NULL; } if (check_task_for_exec(pp, flags, execqp, processing_busy_q_p, ptp)) return ptp; } } /* * Abort a scheduled task. */ int erts_port_task_abort(ErtsPortTaskHandle *pthp) { int res; ErtsPortTask *ptp; #ifdef ERTS_SMP ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay(); #endif ptp = handle2task(pthp); if (!ptp) res = -1; else { erts_aint32_t old_state; #ifdef DEBUG ErtsPortTaskHandle *saved_pthp = ptp->u.alive.handle; ERTS_SMP_READ_MEMORY_BARRIER; old_state = erts_smp_atomic32_read_nob(&ptp->state); if (old_state == ERTS_PT_STATE_SCHEDULED) { ASSERT(saved_pthp == pthp); } #endif old_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state, ERTS_PT_STATE_ABORTED, ERTS_PT_STATE_SCHEDULED); if (old_state != ERTS_PT_STATE_SCHEDULED) res = - 1; /* Task already aborted, executing, or executed */ else { reset_port_task_handle(pthp); switch (ptp->type) { case ERTS_PORT_TASK_INPUT: case ERTS_PORT_TASK_OUTPUT: case ERTS_PORT_TASK_EVENT: ASSERT(erts_smp_atomic_read_nob( &erts_port_task_outstanding_io_tasks) > 0); erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks); break; case ERTS_PORT_TASK_PROC_SIG: ERTS_INTERNAL_ERROR("Aborted process to port signal"); break; default: break; } res = 0; } } #ifdef ERTS_SMP erts_thr_progress_unmanaged_continue(dhndl); #endif return res; } void erts_port_task_abort_nosuspend_tasks(Port *pp) { ErtsPortTaskHandleList *abort_list; #ifdef ERTS_SMP ErtsThrPrgrDelayHandle dhndl = ERTS_THR_PRGR_DHANDLE_INVALID; #endif erts_port_task_sched_lock(&pp->sched); erts_smp_atomic32_read_band_nob(&pp->sched.flags, ~ERTS_PTS_FLG_HAVE_NS_TASKS); abort_list = pp->sched.taskq.local.busy.nosuspend; pp->sched.taskq.local.busy.nosuspend = NULL; erts_port_task_sched_unlock(&pp->sched); while (abort_list) { #ifdef DEBUG ErtsPortTaskHandle *saved_pthp; #endif ErtsPortTaskType type; ErtsPortTaskTypeData td; ErtsPortTaskHandle *pthp; ErtsPortTask *ptp; ErtsPortTaskHandleList *pthlp; erts_aint32_t old_state; pthlp = abort_list; abort_list = pthlp->u.next; #ifdef ERTS_SMP if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) dhndl = erts_thr_progress_unmanaged_delay(); #endif pthp = &pthlp->handle; ptp = handle2task(pthp); if (!ptp) { #ifdef ERTS_SMP if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) erts_thr_progress_unmanaged_continue(dhndl); #endif schedule_port_task_handle_list_free(pthlp); continue; } #ifdef DEBUG saved_pthp = ptp->u.alive.handle; ERTS_SMP_READ_MEMORY_BARRIER; old_state = erts_smp_atomic32_read_nob(&ptp->state); if (old_state == ERTS_PT_STATE_SCHEDULED) { ASSERT(saved_pthp == pthp); } #endif old_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state, ERTS_PT_STATE_ABORTED, ERTS_PT_STATE_SCHEDULED); if (old_state != ERTS_PT_STATE_SCHEDULED) { /* Task already aborted, executing, or executed */ #ifdef ERTS_SMP if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) erts_thr_progress_unmanaged_continue(dhndl); #endif schedule_port_task_handle_list_free(pthlp); continue; } reset_port_task_handle(pthp); type = ptp->type; td = ptp->u.alive.td; #ifdef ERTS_SMP if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) erts_thr_progress_unmanaged_continue(dhndl); #endif schedule_port_task_handle_list_free(pthlp); abort_nosuspend_task(pp, type, &td); } } /* * Schedule a task. */ int erts_port_task_schedule(Eterm id, ErtsPortTaskHandle *pthp, ErtsPortTaskType type, ...) { ErtsProc2PortSigData *sigdp = NULL; ErtsPortTaskHandleList *ns_pthlp = NULL; #ifdef ERTS_SMP ErtsRunQueue *xrunq; ErtsThrPrgrDelayHandle dhndl; #endif ErtsRunQueue *runq; Port *pp; ErtsPortTask *ptp = NULL; erts_aint32_t act, add_flags; if (pthp && erts_port_task_is_scheduled(pthp)) { ASSERT(0); erts_port_task_abort(pthp); } ASSERT(is_internal_port(id)); #ifdef ERTS_SMP dhndl = erts_thr_progress_unmanaged_delay(); #endif pp = erts_port_lookup_raw(id); #ifdef ERTS_SMP if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) { if (pp) erts_port_inc_refc(pp); erts_thr_progress_unmanaged_continue(dhndl); } #endif if (!pp) goto fail; if (type != ERTS_PORT_TASK_PROC_SIG) { ptp = port_task_alloc(); ptp->type = type; ptp->u.alive.flags = 0; erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED); set_handle(ptp, pthp); } switch (type) { case ERTS_PORT_TASK_INPUT: case ERTS_PORT_TASK_OUTPUT: { va_list argp; va_start(argp, type); ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent); va_end(argp); erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks); break; } case ERTS_PORT_TASK_EVENT: { va_list argp; va_start(argp, type); ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent); ptp->u.alive.td.io.event_data = va_arg(argp, ErlDrvEventData); va_end(argp); erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks); break; } case ERTS_PORT_TASK_PROC_SIG: { va_list argp; ASSERT(!pthp); va_start(argp, type); sigdp = va_arg(argp, ErtsProc2PortSigData *); ptp = p2p_sig_data_to_task(sigdp); ptp->u.alive.td.psig.callback = va_arg(argp, ErtsProc2PortSigCallback); ptp->u.alive.flags |= va_arg(argp, int); va_end(argp); if (!(ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND)) set_handle(ptp, pthp); else { ns_pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST, sizeof(ErtsPortTaskHandleList)); set_handle(ptp, &ns_pthlp->handle); } break; } default: break; } if (!enqueue_task(pp, ptp, sigdp, ns_pthlp, &act)) { reset_handle(ptp); if (ns_pthlp && !(act & ERTS_PTS_FLG_EXIT)) goto abort_nosuspend; else goto fail; } add_flags = ERTS_PTS_FLG_HAVE_TASKS; if (ns_pthlp) add_flags |= ERTS_PTS_FLG_HAVE_NS_TASKS; while (1) { erts_aint32_t new, exp; if ((act & add_flags) == add_flags && (act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) goto done; /* Done */ new = exp = act; new |= add_flags; if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) new |= ERTS_PTS_FLG_IN_RUNQ; act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp); if (exp == act) { if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) break; /* Need to enqueue port */ goto done; /* Done */ } if (act & ERTS_PTS_FLG_EXIT) goto done; /* Died after our task insert... */ } /* Enqueue port on run-queue */ runq = erts_port_runq(pp); if (!runq) ERTS_INTERNAL_ERROR("Missing run-queue"); #ifdef ERTS_SMP xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL); if (xrunq) { /* Port emigrated ... */ erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq); erts_smp_runq_unlock(runq); runq = erts_port_runq(pp); if (!runq) ERTS_INTERNAL_ERROR("Missing run-queue"); } #endif enqueue_port(runq, pp); if (erts_system_profile_flags.runnable_ports) { profile_runnable_port(pp, am_active); } erts_smp_runq_unlock(runq); erts_smp_notify_inc_runq(runq); done: #ifdef ERTS_SMP if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) erts_port_dec_refc(pp); #endif return 0; abort_nosuspend: #ifdef ERTS_SMP if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) erts_port_dec_refc(pp); #endif abort_nosuspend_task(pp, ptp->type, &ptp->u.alive.td); ASSERT(ns_pthlp); erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp); if (ptp) port_task_free(ptp); return 0; fail: #ifdef ERTS_SMP if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) erts_port_dec_refc(pp); #endif if (ns_pthlp) erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp); if (ptp) port_task_free(ptp); return -1; } void erts_port_task_free_port(Port *pp) { erts_aint32_t flags; ErtsRunQueue *runq; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)); runq = erts_port_runq(pp); if (!runq) ERTS_INTERNAL_ERROR("Missing run-queue"); erts_port_task_sched_lock(&pp->sched); flags = erts_smp_atomic32_read_bor_relb(&pp->sched.flags, ERTS_PTS_FLG_EXIT); erts_port_task_sched_unlock(&pp->sched); erts_atomic32_read_bset_relb(&pp->state, (ERTS_PORT_SFLG_CONNECTED | ERTS_PORT_SFLG_EXITING | ERTS_PORT_SFLG_CLOSING | ERTS_PORT_SFLG_FREE), ERTS_PORT_SFLG_FREE); erts_smp_runq_unlock(runq); if (!(flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) begin_port_cleanup(pp, NULL, NULL); } /* * Execute scheduled tasks of a port. * * erts_port_task_execute() is called by scheduler threads between * scheduling of processes. Run-queue lock should be held by caller. */ int erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) { Port *pp; ErtsPortTask *execq; int processing_busy_q; int res = 0; int vreds = 0; int reds = ERTS_PORT_REDS_EXECUTE; erts_aint_t io_tasks_executed = 0; int fpe_was_unmasked; erts_aint32_t state; int active; ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); pp = pop_port(runq); if (!pp) { res = 0; goto done; } erts_smp_runq_unlock(runq); *curr_port_pp = pp; if (erts_sched_stat.enabled) { ErtsSchedulerData *esdp = erts_get_scheduler_data(); Uint old = ERTS_PORT_SCHED_ID(pp, esdp->no); int migrated = old && old != esdp->no; erts_smp_spin_lock(&erts_sched_stat.lock); erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_executed++; erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].executed++; if (migrated) { erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_migrated++; erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].migrated++; } erts_smp_spin_unlock(&erts_sched_stat.lock); } prepare_exec(pp, &execq, &processing_busy_q); erts_smp_port_lock(pp); /* trace port scheduling, in */ if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) { trace_sched_ports(pp, am_in); } fpe_was_unmasked = erts_block_fpe(); state = erts_atomic32_read_nob(&pp->state); goto begin_handle_tasks; while (1) { erts_aint32_t task_state; ErtsPortTask *ptp; ptp = select_task_for_exec(pp, &execq, &processing_busy_q); if (!ptp) break; task_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state, ERTS_PT_STATE_EXECUTING, ERTS_PT_STATE_SCHEDULED); if (task_state != ERTS_PT_STATE_SCHEDULED) { ASSERT(task_state == ERTS_PT_STATE_ABORTED); goto aborted_port_task; } reset_handle(ptp); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); ERTS_SMP_CHK_NO_PROC_LOCKS; ASSERT(pp->drv_ptr); switch (ptp->type) { case ERTS_PORT_TASK_TIMEOUT: reds += ERTS_PORT_REDS_TIMEOUT; if (!(state & ERTS_PORT_SFLGS_DEAD)) { DTRACE_DRIVER(driver_timeout, pp); (*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data); } break; case ERTS_PORT_TASK_INPUT: reds += ERTS_PORT_REDS_INPUT; ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_ready_input, pp); /* NOTE some windows drivers use ->ready_input for input and output */ (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data, ptp->u.alive.td.io.event); io_tasks_executed++; break; case ERTS_PORT_TASK_OUTPUT: reds += ERTS_PORT_REDS_OUTPUT; ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_ready_output, pp); (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, ptp->u.alive.td.io.event); io_tasks_executed++; break; case ERTS_PORT_TASK_EVENT: reds += ERTS_PORT_REDS_EVENT; ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_event, pp); (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data, ptp->u.alive.td.io.event, ptp->u.alive.td.io.event_data); io_tasks_executed++; break; case ERTS_PORT_TASK_PROC_SIG: { ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data; ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); if (!pp->sched.taskq.bpq) reds += ptp->u.alive.td.psig.callback(pp, state, ERTS_PROC2PORT_SIG_EXEC, sigdp); else { ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp); reds += ptp->u.alive.td.psig.callback(pp, state, ERTS_PROC2PORT_SIG_EXEC, sigdp); dequeued_proc2port_data(pp, size); } break; } case ERTS_PORT_TASK_DIST_CMD: reds += erts_dist_command(pp, CONTEXT_REDS-reds); break; default: erl_exit(ERTS_ABORT_EXIT, "Invalid port task type: %d\n", (int) ptp->type); break; } reds += erts_port_driver_callback_epilogue(pp, &state); aborted_port_task: schedule_port_task_free(ptp); begin_handle_tasks: if (state & ERTS_PORT_SFLG_FREE) { reds += ERTS_PORT_REDS_FREE; begin_port_cleanup(pp, &execq, &processing_busy_q); break; } vreds += ERTS_PORT_CALLBACK_VREDS; reds += ERTS_PORT_CALLBACK_VREDS; if (reds >= CONTEXT_REDS) break; } erts_unblock_fpe(fpe_was_unmasked); /* trace port scheduling, out */ if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) { trace_sched_ports(pp, am_out); } if (io_tasks_executed) { ASSERT(erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) >= io_tasks_executed); erts_smp_atomic_add_relb(&erts_port_task_outstanding_io_tasks, -1*io_tasks_executed); } #ifdef ERTS_SMP ASSERT(runq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue)); #endif active = finalize_exec(pp, &execq, processing_busy_q); erts_port_release(pp); *curr_port_pp = NULL; erts_smp_runq_lock(runq); if (!active) { if (erts_system_profile_flags.runnable_ports) profile_runnable_port(pp, am_inactive); } else { #ifdef ERTS_SMP ErtsRunQueue *xrunq; #endif ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)); #ifdef ERTS_SMP xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL); if (!xrunq) { #endif enqueue_port(runq, pp); /* No need to notify ourselves about inc in runq. */ #ifdef ERTS_SMP } else { /* Port emigrated ... */ erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq); erts_smp_runq_unlock(runq); xrunq = erts_port_runq(pp); ASSERT(xrunq); enqueue_port(xrunq, pp); erts_smp_runq_unlock(xrunq); erts_smp_notify_inc_runq(xrunq); erts_smp_runq_lock(runq); } #endif } done: res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) != (erts_aint_t) 0); reds -= vreds; runq->scheduler->reductions += reds; ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); ERTS_PORT_REDUCTIONS_EXECUTED(runq, reds); return res; } #ifdef ERTS_SMP static void release_port(void *vport) { erts_port_dec_refc((Port *) vport); } #endif static void begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) { int i, max; ErtsPortTaskBusyCallerTable *tabp; ErtsPortTask *qs[3]; ErtsPortTaskHandleList *free_nshp = NULL; ErtsProcList *plp; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); /* * Abort remaining tasks... * * We want to process queues in the following order in order * to preserve signal ordering guarantees: * 1. Local busy queue * 2. Local queue * 3. In queue */ max = 0; if (!execqp) { if (pp->sched.taskq.local.busy.first) qs[max++] = pp->sched.taskq.local.busy.first; if (pp->sched.taskq.local.first) qs[max++] = pp->sched.taskq.local.first; } else { if (*processing_busy_q_p) { if (*execqp) qs[max++] = *execqp; if (pp->sched.taskq.local.first) qs[max++] = pp->sched.taskq.local.first; } else { if (pp->sched.taskq.local.busy.first) qs[max++] = pp->sched.taskq.local.busy.first; if (*execqp) qs[max++] = *execqp; } *execqp = NULL; *processing_busy_q_p = 0; } pp->sched.taskq.local.busy.first = NULL; pp->sched.taskq.local.busy.last = NULL; pp->sched.taskq.local.first = NULL; tabp = pp->sched.taskq.local.busy.table; if (tabp) { int bix; for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) { ErtsPortTaskBusyCaller *bcp = tabp->bucket[bix]; while (bcp) { ErtsPortTaskBusyCaller *free_bcp = bcp; bcp = bcp->next; if (free_bcp != &tabp->pre_alloc_busy_caller) erts_free(ERTS_ALC_T_BUSY_CALLER, free_bcp); } } busy_caller_table_free(tabp); pp->sched.taskq.local.busy.table = NULL; } erts_port_task_sched_lock(&pp->sched); qs[max] = pp->sched.taskq.in.first; pp->sched.taskq.in.first = NULL; pp->sched.taskq.in.last = NULL; erts_port_task_sched_unlock(&pp->sched); if (qs[max]) max++; for (i = 0; i < max; i++) { while (1) { erts_aint32_t state; ErtsPortTask *ptp = qs[i]; if (!ptp) break; qs[i] = ptp->u.alive.next; /* Normal case here is aborted tasks... */ state = erts_smp_atomic32_read_nob(&ptp->state); if (state == ERTS_PT_STATE_ABORTED) goto aborted_port_task; state = erts_smp_atomic32_cmpxchg_nob(&ptp->state, ERTS_PT_STATE_EXECUTING, ERTS_PT_STATE_SCHEDULED); if (state != ERTS_PT_STATE_SCHEDULED) { ASSERT(state == ERTS_PT_STATE_ABORTED); goto aborted_port_task; } reset_handle(ptp); switch (ptp->type) { case ERTS_PORT_TASK_TIMEOUT: break; case ERTS_PORT_TASK_INPUT: erts_stale_drv_select(pp->common.id, ptp->u.alive.td.io.event, DO_READ, 1); break; case ERTS_PORT_TASK_OUTPUT: erts_stale_drv_select(pp->common.id, ptp->u.alive.td.io.event, DO_WRITE, 1); break; case ERTS_PORT_TASK_EVENT: erts_stale_drv_select(pp->common.id, ptp->u.alive.td.io.event, 0, 1); break; case ERTS_PORT_TASK_DIST_CMD: break; case ERTS_PORT_TASK_PROC_SIG: { ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data; if (!pp->sched.taskq.bpq) ptp->u.alive.td.psig.callback(NULL, ERTS_PORT_SFLG_INVALID, ERTS_PROC2PORT_SIG_ABORT_CLOSED, sigdp); else { ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp); ptp->u.alive.td.psig.callback(NULL, ERTS_PORT_SFLG_INVALID, ERTS_PROC2PORT_SIG_ABORT_CLOSED, sigdp); aborted_proc2port_data(pp, size); } break; } default: erl_exit(ERTS_ABORT_EXIT, "Invalid port task type: %d\n", (int) ptp->type); } aborted_port_task: schedule_port_task_free(ptp); } } erts_smp_atomic32_read_band_nob(&pp->sched.flags, ~(ERTS_PTS_FLG_HAVE_BUSY_TASKS |ERTS_PTS_FLG_HAVE_TASKS |ERTS_PTS_FLGS_BUSY)); erts_port_task_sched_lock(&pp->sched); /* Cleanup nosuspend handles... */ free_nshp = (pp->sched.taskq.local.busy.nosuspend ? get_free_nosuspend_handles(pp) : NULL); ASSERT(!pp->sched.taskq.local.busy.nosuspend); /* Make sure not to leave any processes suspended on the port... */ plp = pp->suspended; pp->suspended = NULL; erts_port_task_sched_unlock(&pp->sched); if (free_nshp) free_nosuspend_handles(free_nshp); if (erts_proclist_fetch(&plp, NULL)) { #ifdef USE_VM_PROBES if (DTRACE_ENABLED(process_port_unblocked)) { DTRACE_CHARBUF(port_str, 16); DTRACE_CHARBUF(pid_str, 16); ErtsProcList* plp2 = plp; erts_snprintf(port_str, sizeof(port_str), "%T", pp->common.id); while (plp2 != NULL) { erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid); DTRACE2(process_port_unblocked, pid_str, port_str); } } #endif erts_resume_processes(plp); } /* * Schedule cleanup of port structure... */ #ifdef ERTS_SMP erts_schedule_thr_prgr_later_op(release_port, (void *) pp, &pp->common.u.release); #else pp->cleanup = 1; #endif } int erts_port_is_scheduled(Port *pp) { erts_aint32_t flags = erts_smp_atomic32_read_acqb(&pp->sched.flags); return (flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)) != 0; } #ifdef ERTS_SMP void erts_enqueue_port(ErtsRunQueue *rq, Port *pp) { ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); ASSERT(rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue)); ASSERT(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_IN_RUNQ); enqueue_port(rq, pp); } Port * erts_dequeue_port(ErtsRunQueue *rq) { Port *pp; ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); pp = pop_port(rq); ASSERT(!pp || rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue)); ASSERT(!pp || (erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_IN_RUNQ)); return pp; } #endif /* * Initialize the module. */ void erts_port_task_init(void) { erts_smp_atomic_init_nob(&erts_port_task_outstanding_io_tasks, (erts_aint_t) 0); init_port_task_alloc(); init_busy_caller_table_alloc(); }