diff options
Diffstat (limited to 'erts/emulator/beam/erl_port_task.c')
-rw-r--r-- | erts/emulator/beam/erl_port_task.c | 927 |
1 files changed, 859 insertions, 68 deletions
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c index e613ad0144..e8ed00daee 100644 --- a/erts/emulator/beam/erl_port_task.c +++ b/erts/emulator/beam/erl_port_task.c @@ -36,9 +36,20 @@ #include <stdarg.h> #if defined(DEBUG) && 0 -#define HARD_DEBUG +#define ERTS_HARD_DEBUG_TASK_QUEUES +#else +#undef ERTS_HARD_DEBUG_TASK_QUEUES +#endif + +#ifdef ERTS_HARD_DEBUG_TASK_QUEUES +static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue); +#define ERTS_PT_DBG_CHK_TASK_QS(PP, EQ, PBQ) \ + chk_task_queues((PP), (EQ), (PBQ)) +#else +#define ERTS_PT_DBG_CHK_TASK_QS(PP, EQ, PBQ) #endif + /* * Costs in reductions for some port operations. */ @@ -52,7 +63,7 @@ #ifdef USE_VM_PROBES #define DTRACE_DRIVER(PROBE_NAME, PP) \ - if (DTRACE_ENABLED(driver_ready_input)) { \ + if (DTRACE_ENABLED(PROBE_NAME)) { \ DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE); \ DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); \ \ @@ -70,6 +81,17 @@ erts_smp_atomic_t erts_port_task_outstanding_io_tasks; #define ERTS_PT_STATE_ABORTED 1 #define ERTS_PT_STATE_EXECUTING 2 +typedef union { + struct { /* I/O tasks */ + ErlDrvEvent event; + ErlDrvEventData event_data; + } io; + struct { + ErtsProc2PortSigCallback callback; + ErtsProc2PortSigData data; + } psig; +} ErtsPortTaskTypeData; + struct ErtsPortTask_ { erts_smp_atomic32_t state; ErtsPortTaskType type; @@ -77,17 +99,39 @@ struct ErtsPortTask_ { struct { ErtsPortTask *next; ErtsPortTaskHandle *handle; - union { - struct { /* I/O tasks */ - ErlDrvEvent event; - ErlDrvEventData event_data; - } io; - } u; + int flags; + Uint32 ref[ERTS_MAX_REF_NUMBERS]; + ErtsPortTaskTypeData td; } alive; ErtsThrPrgrLaterOp release; } u; }; +struct ErtsPortTaskHandleList_ { + ErtsPortTaskHandle handle; + union { + ErtsPortTaskHandleList *next; +#ifdef ERTS_SMP + ErtsThrPrgrLaterOp release; +#endif + } u; +}; + +typedef struct ErtsPortTaskBusyCaller_ ErtsPortTaskBusyCaller; +struct ErtsPortTaskBusyCaller_ { + ErtsPortTaskBusyCaller *next; + Eterm caller; + SWord count; + ErtsPortTask *last; +}; + +#define ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS 17 +struct ErtsPortTaskBusyCallerTable_ { + ErtsPortTaskBusyCaller *bucket[ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS]; + ErtsPortTaskBusyCaller pre_alloc_busy_caller; +}; + + static void begin_port_cleanup(Port *pp, ErtsPortTask **execq); ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(port_task, @@ -95,6 +139,11 @@ ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(port_task, 1000, ERTS_ALC_T_PORT_TASK) +ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(busy_caller_table, + ErtsPortTaskBusyCallerTable, + 50, + ERTS_ALC_T_BUSY_CALLER_TAB) + #ifdef ERTS_SMP static void call_port_task_free(void *vptp) @@ -115,6 +164,358 @@ schedule_port_task_free(ErtsPortTask *ptp) #endif } +static ERTS_INLINE ErtsPortTask * +p2p_sig_data_to_task(ErtsProc2PortSigData *sigdp) +{ + ErtsPortTask *ptp; + char *ptr = (char *) sigdp; + ptr -= offsetof(ErtsPortTask, u.alive.td.psig.data); + ptp = (ErtsPortTask *) ptr; + ASSERT(ptp->type == ERTS_PORT_TASK_PROC_SIG); + return ptp; +} + +ErtsProc2PortSigData * +erts_port_task_alloc_p2p_sig_data(void) +{ + ErtsPortTask *ptp = port_task_alloc(); + + ptp->type = ERTS_PORT_TASK_PROC_SIG; + ptp->u.alive.flags = ERTS_PT_FLG_SIG_DEP; + erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED); + + ASSERT(ptp == p2p_sig_data_to_task(&ptp->u.alive.td.psig.data)); + + return &ptp->u.alive.td.psig.data; +} + +static ERTS_INLINE Eterm +task_caller(ErtsPortTask *ptp) +{ + Eterm caller; + + ASSERT(ptp->type == ERTS_PORT_TASK_PROC_SIG); + + caller = ptp->u.alive.td.psig.data.caller; + + ASSERT(is_internal_pid(caller) || is_internal_port(caller)); + + return caller; +} + +/* + * Busy queue management + */ + +static ERTS_INLINE int +caller2bix(Eterm caller) +{ + ASSERT(is_internal_pid(caller) || is_internal_port(caller)); + return (int) (_GET_PID_DATA(caller) % ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS); +} + + +static void +popped_from_busy_queue(Port *pp, ErtsPortTask *ptp, int last) +{ + ErtsPortTaskBusyCaller **prev_bcpp = NULL, *bcp; + ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table; + Eterm caller = task_caller(ptp); + int bix = caller2bix(caller); + + ASSERT(is_internal_pid(caller)); + + ASSERT(tabp); + bcp = tabp->bucket[bix]; + prev_bcpp = &tabp->bucket[bix]; + ASSERT(bcp); + while (bcp->caller != caller) { + prev_bcpp = &bcp->next; + bcp = bcp->next; + ASSERT(bcp); + } + ASSERT(bcp->count > 0); + if (--bcp->count != 0) { + ASSERT(!last); + } + else { + *prev_bcpp = bcp->next; + if (bcp == &tabp->pre_alloc_busy_caller) + bcp->caller = am_undefined; + else + erts_free(ERTS_ALC_T_BUSY_CALLER, bcp); + if (last) { +#ifdef DEBUG + erts_aint32_t flags = +#endif + erts_smp_atomic32_read_band_nob( + &pp->sched.flags, + ~ERTS_PTS_FLG_HAVE_BUSY_TASKS); + ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS); +#ifdef DEBUG + for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) { + ASSERT(!tabp->bucket[bix]); + } +#endif + busy_caller_table_free(tabp); + pp->sched.taskq.local.busy.first = NULL; + pp->sched.taskq.local.busy.last = NULL; + pp->sched.taskq.local.busy.table = NULL; + } + } +} + +static void +busy_wait_move_to_busy_queue(Port *pp, ErtsPortTask *ptp) +{ + ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table; + Eterm caller = task_caller(ptp); + ErtsPortTaskBusyCaller *bcp; + int bix; + + ASSERT(is_internal_pid(caller)); + /* + * Port is busy and this task type needs to wait until not busy. + */ + + ASSERT(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY); + + ptp->u.alive.next = NULL; + if (pp->sched.taskq.local.busy.last) { + ASSERT(pp->sched.taskq.local.busy.first); + pp->sched.taskq.local.busy.last->u.alive.next = ptp; + } + else { + int i; + erts_aint32_t flags; + + pp->sched.taskq.local.busy.first = ptp; + flags = erts_smp_atomic32_read_bor_nob(&pp->sched.flags, + ERTS_PTS_FLG_HAVE_BUSY_TASKS); + ASSERT(!(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS)); + + ASSERT(!tabp); + + tabp = busy_caller_table_alloc(); + pp->sched.taskq.local.busy.table = tabp; + for (i = 0; i < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; i++) + tabp->bucket[i] = NULL; + tabp->pre_alloc_busy_caller.caller = am_undefined; + } + pp->sched.taskq.local.busy.last = ptp; + + bix = caller2bix(caller); + ASSERT(tabp); + bcp = tabp->bucket[bix]; + + while (bcp && bcp->caller != caller) + bcp = bcp->next; + + if (bcp) + bcp->count++; + else { + if (tabp->pre_alloc_busy_caller.caller == am_undefined) + bcp = &tabp->pre_alloc_busy_caller; + else + bcp = erts_alloc(ERTS_ALC_T_BUSY_CALLER, + sizeof(ErtsPortTaskBusyCaller)); + bcp->caller = caller; + bcp->count = 1; + bcp->next = tabp->bucket[bix]; + tabp->bucket[bix] = bcp; + } + + bcp->last = ptp; +} + +static ERTS_INLINE int +check_sig_dep_move_to_busy_queue(Port *pp, ErtsPortTask *ptp) +{ + ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table; + ErtsPortTask *last_ptp; + ErtsPortTaskBusyCaller *bcp; + int bix; + Eterm caller; + + ASSERT(ptp->u.alive.flags & ERTS_PT_FLG_SIG_DEP); + ASSERT(pp->sched.taskq.local.busy.last); + ASSERT(tabp); + + + /* + * We are either not busy, or the task does not imply wait on busy port. + * However, due to the signaling order requirements the task might depend + * on other tasks in the busy queue. + */ + + caller = task_caller(ptp); + bix = caller2bix(caller); + bcp = tabp->bucket[bix]; + while (bcp && bcp->caller != caller) + bcp = bcp->next; + + if (!bcp) + return 0; + + /* + * There are other tasks that we depend on in the busy queue; + * move into busy queue. + */ + + bcp->count++; + last_ptp = bcp->last; + ptp->u.alive.next = last_ptp->u.alive.next; + if (!ptp->u.alive.next) { + ASSERT(pp->sched.taskq.local.busy.last == last_ptp); + pp->sched.taskq.local.busy.last = ptp; + } + last_ptp->u.alive.next = ptp; + bcp->last = ptp; + + return 1; +} + +static void +no_sig_dep_move_from_busyq(Port *pp) +{ + ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table; + ErtsPortTask *first_ptp, *last_ptp, *ptp; + ErtsPortTaskBusyCaller **prev_bcpp = NULL, *bcp = NULL; + + /* + * Move tasks at the head of the busy queue that no longer + * have any dependencies to busy wait tasks into the ordinary + * queue. + */ + + first_ptp = ptp = pp->sched.taskq.local.busy.first; + + ASSERT(ptp && !(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)); + ASSERT(tabp); + + do { + Eterm caller = task_caller(ptp); + + if (!bcp || bcp->caller != caller) { + int bix = caller2bix(caller); + + prev_bcpp = &tabp->bucket[bix]; + bcp = tabp->bucket[bix]; + ASSERT(bcp); + while (bcp->caller != caller) { + ASSERT(bcp); + prev_bcpp = &bcp->next; + bcp = bcp->next; + } + } + + ASSERT(bcp->caller == caller); + ASSERT(bcp->count > 0); + + if (--bcp->count == 0) { + *prev_bcpp = bcp->next; + if (bcp == &tabp->pre_alloc_busy_caller) + bcp->caller = am_undefined; + else + erts_free(ERTS_ALC_T_BUSY_CALLER, bcp); + } + + last_ptp = ptp; + ptp = ptp->u.alive.next; + } while (ptp && !(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)); + + pp->sched.taskq.local.busy.first = last_ptp->u.alive.next; + if (!pp->sched.taskq.local.busy.first) { +#ifdef DEBUG + int bix; + erts_aint32_t flags = +#endif + erts_smp_atomic32_read_band_nob( + &pp->sched.flags, + ~ERTS_PTS_FLG_HAVE_BUSY_TASKS); + ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS); +#ifdef DEBUG + for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) { + ASSERT(!tabp->bucket[bix]); + } +#endif + busy_caller_table_free(tabp); + pp->sched.taskq.local.busy.last = NULL; + pp->sched.taskq.local.busy.table = NULL; + } + last_ptp->u.alive.next = pp->sched.taskq.local.first; + pp->sched.taskq.local.first = first_ptp; +} + +#ifdef ERTS_HARD_DEBUG_TASK_QUEUES + +static void +chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue) +{ + Sint tot_count, tot_table_count; + int bix; + ErtsPortTask *ptp, *last; + ErtsPortTask *first = processing_busy_queue ? execq : pp->sched.taskq.local.busy.first; + ErtsPortTask *nb_task_queue = processing_busy_queue ? pp->sched.taskq.local.first : execq; + ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table; + ErtsPortTaskBusyCaller *bcp; + + if (!first) { + ASSERT(!tabp); + ASSERT(!pp->sched.taskq.local.busy.last); + ASSERT(!(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS)); + return; + } + + ASSERT(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS); + ASSERT(tabp); + + tot_count = 0; + ptp = first; + while (ptp) { + Sint count = 0; + Eterm caller = task_caller(ptp); + int bix = caller2bix(caller); + for (bcp = tabp->bucket[bix]; bcp; bcp = bcp->next) + if (bcp->caller == caller) + break; + ASSERT(bcp && bcp->caller == caller); + + ASSERT(bcp->last); + while (1) { + ErtsPortTask *ptp2; + + ASSERT(caller == task_caller(ptp)); + count++; + tot_count++; + last = ptp; + + for (ptp2 = nb_task_queue; ptp2; ptp2 = ptp2->u.alive.next) { + ASSERT(ptp != ptp2); + } + + if (ptp == bcp->last) + break; + ptp = ptp->u.alive.next; + } + + ASSERT(count == bcp->count); + ptp = ptp->u.alive.next; + } + + tot_table_count = 0; + for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) { + for (bcp = tabp->bucket[bix]; bcp; bcp = bcp->next) + tot_table_count += bcp->count; + } + + ASSERT(tot_count == tot_table_count); + + ASSERT(last == pp->sched.taskq.local.busy.last); +} + +#endif /* ERTS_HARD_DEBUG_TASK_QUEUES */ + /* * Task handle manipulation. */ @@ -151,6 +552,141 @@ set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp) } /* + * No-suspend handles. + */ + +#ifdef ERTS_SMP +static void +free_port_task_handle_list(void *vpthlp) +{ + erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp); +} +#endif + +static void +schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp) +{ +#ifdef ERTS_SMP + erts_schedule_thr_prgr_later_op(free_port_task_handle_list, + (void *) pthlp, + &pthlp->u.release); +#else + erts_free(ERTS_ALC_T_PT_HNDL_LIST, pthlp); +#endif +} + +static ERTS_INLINE void +abort_nosuspend_task(ErtsPortTaskType type, + ErtsPortTaskTypeData *tdp) +{ + + if (type != ERTS_PORT_TASK_PROC_SIG) + ERTS_INTERNAL_ERROR("Invalid no-suspend port task type"); + + tdp->psig.callback(NULL, + ERTS_PORT_SFLG_INVALID, + ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND, + &tdp->psig.data); +} + + +static void +save_nosuspend_handle(Port *pp, ErtsPortTask *ptp) +{ + erts_aint32_t act; + ErtsPortTaskHandleList *pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST, + sizeof(ErtsPortTaskHandleList)); + + set_handle(ptp, &pthlp->handle); + + ASSERT(ptp == handle2task(&pthlp->handle)); + ASSERT(ptp->u.alive.handle == &pthlp->handle); + + act = erts_smp_atomic32_read_nob(&pp->sched.flags); + + if (!(act & ERTS_PTS_FLG_BUSY)) { + + erts_port_task_sched_lock(&pp->sched); + pthlp->u.next = pp->sched.taskq.local.busy.nosuspend; + pp->sched.taskq.local.busy.nosuspend = pthlp; + erts_port_task_sched_unlock(&pp->sched); + + act = erts_smp_atomic32_read_nob(&pp->sched.flags); + + while (1) { + erts_aint32_t exp, new; + + if ((act & (ERTS_PTS_FLG_BUSY|ERTS_PTS_FLG_HAVE_NS_TASKS)) + == ERTS_PTS_FLG_HAVE_NS_TASKS) + return; + + if (act & ERTS_PTS_FLG_BUSY) { + erts_aint32_t s; + s = erts_smp_atomic32_cmpxchg_nob(&ptp->state, + ERTS_PT_STATE_ABORTED, + ERTS_PT_STATE_SCHEDULED); + if (s == ERTS_PT_STATE_SCHEDULED) { + reset_port_task_handle(&pthlp->handle); + break; /* Abort task */ + } + /* Else: someone else handled it */ + return; + } + + new = exp = act; + + new |= ERTS_PTS_FLG_HAVE_NS_TASKS; + + act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp); + if (act == exp) + return; + + } + } + + + abort_nosuspend_task(ptp->type, &ptp->u.alive.td); +} + + +static ErtsPortTaskHandleList * +get_free_nosuspend_handles(Port *pp) +{ + ErtsPortTaskHandleList *nshp, *last_nshp = NULL; + + ERTS_SMP_LC_ASSERT(erts_port_task_sched_lock_is_locked(&pp->sched)); + + nshp = pp->sched.taskq.local.busy.nosuspend; + + while (nshp && !erts_port_task_is_scheduled(&nshp->handle)) { + last_nshp = nshp; + nshp = nshp->u.next; + } + + if (!last_nshp) + nshp = NULL; + else { + nshp = pp->sched.taskq.local.busy.nosuspend; + pp->sched.taskq.local.busy.nosuspend = last_nshp->u.next; + last_nshp->u.next = NULL; + if (!pp->sched.taskq.local.busy.nosuspend) + erts_smp_atomic32_read_band_nob(&pp->sched.flags, + ~ERTS_PTS_FLG_HAVE_NS_TASKS); + } + return nshp; +} + +static void +free_nosuspend_handles(ErtsPortTaskHandleList *free_nshp) +{ + while (free_nshp) { + ErtsPortTaskHandleList *nshp = free_nshp; + free_nshp = free_nshp->u.next; + schedule_port_task_handle_list_free(nshp); + } +} + +/* * Port queue operations */ @@ -225,42 +761,21 @@ enqueue_task(Port *pp, ErtsPortTask *ptp) return flags; } -static ERTS_INLINE ErtsPortTask * -pop_task(Port *pp, ErtsPortTask **execqp) +static ERTS_INLINE void +prepare_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) { - ErtsPortTask *ptp; + erts_aint32_t act = erts_smp_atomic32_read_nob(&pp->sched.flags); - ptp = *execqp; - if (ptp) { - *execqp = ptp->u.alive.next; - return ptp; + if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY)) { + *execqp = pp->sched.taskq.local.first; + *processing_busy_q_p = 0; + } + else { + *execqp = pp->sched.taskq.local.busy.first; + *processing_busy_q_p = 1; } - ASSERT(!pp->sched.taskq.local); - - erts_port_task_sched_lock(&pp->sched); - ptp = pp->sched.taskq.in.first; - pp->sched.taskq.in.first = NULL; - pp->sched.taskq.in.last = NULL; - if (ptp) - *execqp = ptp->u.alive.next; - else - erts_smp_atomic32_read_band_nob(&pp->sched.flags, - ~ERTS_PTS_FLG_HAVE_TASKS); - erts_port_task_sched_unlock(&pp->sched); - - - return ptp; -} - -static ERTS_INLINE void -prepare_exec(Port *pp, ErtsPortTask **execqp) -{ - erts_aint32_t act; - *execqp = pp->sched.taskq.local; - - /* guess a likely value */ - act = ERTS_PTS_FLG_HAVE_TASKS|ERTS_PTS_FLG_IN_RUNQ; + ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p); while (1) { erts_aint32_t new, exp; @@ -281,11 +796,19 @@ prepare_exec(Port *pp, ErtsPortTask **execqp) /* finalize_exec() return value != 0 if port should remain active */ static ERTS_INLINE int -finalize_exec(Port *pp, ErtsPortTask **execq) +finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q) { erts_aint32_t act; - pp->sched.taskq.local = *execq; + if (!processing_busy_q) + pp->sched.taskq.local.first = *execq; + else { + pp->sched.taskq.local.busy.first = *execq; + ASSERT(*execq); + } + + ERTS_PT_DBG_CHK_TASK_QS(pp, *execq, processing_busy_q); + *execq = NULL; /* guess a likely value */ @@ -313,6 +836,155 @@ finalize_exec(Port *pp, ErtsPortTask **execq) return (act & ERTS_PTS_FLG_HAVE_TASKS) != 0; } +static ERTS_INLINE erts_aint32_t +select_queue_for_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) +{ + erts_aint32_t flags = erts_smp_atomic32_read_nob(&pp->sched.flags); + + ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p); + + if (flags & ERTS_PTS_FLG_BUSY) { + if (*processing_busy_q_p) { + ErtsPortTask *ptp; + + ptp = pp->sched.taskq.local.busy.first = *execqp; + if (!ptp) + pp->sched.taskq.local.busy.last = NULL; + else if (!(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)) + no_sig_dep_move_from_busyq(pp); + + *execqp = pp->sched.taskq.local.first; + *processing_busy_q_p = 0; + + ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p); + } + + return flags; + } + + /* Not busy */ + + if (!*processing_busy_q_p && pp->sched.taskq.local.busy.first) { + pp->sched.taskq.local.first = *execqp; + *execqp = pp->sched.taskq.local.busy.first; + *processing_busy_q_p = 1; + + ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p); + } + + return flags; +} + +/* + * check_task_for_exec() returns a value !0 if the task + * is ok to execute; otherwise 0. + */ +static ERTS_INLINE int +check_task_for_exec(Port *pp, + erts_aint32_t flags, + ErtsPortTask **execqp, + int *processing_busy_q_p, + ErtsPortTask *ptp) +{ + + if (!*processing_busy_q_p) { + /* Processing normal queue */ + + ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p); + + if ((flags & ERTS_PTS_FLG_BUSY) + && (ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)) { + + busy_wait_move_to_busy_queue(pp, ptp); + ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p); + + return 0; + } + + if (pp->sched.taskq.local.busy.last + && (ptp->u.alive.flags & ERTS_PT_FLG_SIG_DEP)) { + + int res = !check_sig_dep_move_to_busy_queue(pp, ptp); + ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p); + + return res; + } + + } + else { + /* Processing busy queue */ + + ASSERT(!(flags & ERTS_PTS_FLG_BUSY)); + + ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p); + + popped_from_busy_queue(pp, ptp, !*execqp); + + if (!*execqp) { + *execqp = pp->sched.taskq.local.first; + *processing_busy_q_p = 0; + } + + ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p); + + } + + return 1; +} + +static ErtsPortTask * +fetch_in_queue(Port *pp, ErtsPortTask **execqp) +{ + ErtsPortTask *ptp; + ErtsPortTaskHandleList *free_nshp = NULL; + + erts_port_task_sched_lock(&pp->sched); + + ptp = pp->sched.taskq.in.first; + pp->sched.taskq.in.first = NULL; + pp->sched.taskq.in.last = NULL; + if (ptp) + *execqp = ptp->u.alive.next; + else + erts_smp_atomic32_read_band_nob(&pp->sched.flags, + ~ERTS_PTS_FLG_HAVE_TASKS); + + + if (pp->sched.taskq.local.busy.nosuspend) + free_nshp = get_free_nosuspend_handles(pp); + + erts_port_task_sched_unlock(&pp->sched); + + if (free_nshp) + free_nosuspend_handles(free_nshp); + + return ptp; +} + +static ERTS_INLINE ErtsPortTask * +select_task_for_exec(Port *pp, + ErtsPortTask **execqp, + int *processing_busy_q_p) +{ + ErtsPortTask *ptp; + erts_aint32_t flags; + + flags = select_queue_for_exec(pp, execqp, processing_busy_q_p); + + while (1) { + ptp = *execqp; + if (ptp) + *execqp = ptp->u.alive.next; + else { + ptp = fetch_in_queue(pp, execqp); + if (!ptp) + return NULL; + } + if (check_task_for_exec(pp, flags, execqp, processing_busy_q_p, ptp)) + return ptp; + } +} + /* * Abort a scheduled task. */ @@ -358,6 +1030,12 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp) &erts_port_task_outstanding_io_tasks) > 0); erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks); break; + case ERTS_PORT_TASK_PROC_SIG: + ptp->u.alive.td.psig.callback(NULL, + ERTS_PORT_SFLG_INVALID, + ERTS_PROC2PORT_SIG_ABORT, + &ptp->u.alive.td.psig.data); + break; default: break; } @@ -373,6 +1051,88 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp) return res; } +void +erts_port_task_abort_nosuspend_tasks(Port *pp) +{ + ErtsPortTaskHandleList *abort_list; +#ifdef ERTS_SMP + ErtsThrPrgrDelayHandle dhndl = ERTS_THR_PRGR_DHANDLE_INVALID; +#endif + + erts_port_task_sched_lock(&pp->sched); + erts_smp_atomic32_read_band_nob(&pp->sched.flags, + ~ERTS_PTS_FLG_HAVE_NS_TASKS); + abort_list = pp->sched.taskq.local.busy.nosuspend; + pp->sched.taskq.local.busy.nosuspend = NULL; + erts_port_task_sched_unlock(&pp->sched); + + while (abort_list) { +#ifdef DEBUG + ErtsPortTaskHandle *saved_pthp; +#endif + ErtsPortTaskType type; + ErtsPortTaskTypeData td; + ErtsPortTaskHandle *pthp; + ErtsPortTask *ptp; + ErtsPortTaskHandleList *pthlp; + erts_aint32_t old_state; + + pthlp = abort_list; + abort_list = pthlp->u.next; + +#ifdef ERTS_SMP + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) + dhndl = erts_thr_progress_unmanaged_delay(); +#endif + + pthp = &pthlp->handle; + ptp = handle2task(pthp); + if (!ptp) { +#ifdef ERTS_SMP + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) + erts_thr_progress_unmanaged_continue(dhndl); +#endif + schedule_port_task_handle_list_free(pthlp); + continue; + } + +#ifdef DEBUG + saved_pthp = ptp->u.alive.handle; + ERTS_SMP_READ_MEMORY_BARRIER; + old_state = erts_smp_atomic32_read_nob(&ptp->state); + if (old_state == ERTS_PT_STATE_SCHEDULED) { + ASSERT(saved_pthp == pthp); + } +#endif + + old_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state, + ERTS_PT_STATE_ABORTED, + ERTS_PT_STATE_SCHEDULED); + if (old_state != ERTS_PT_STATE_SCHEDULED) { + /* Task already aborted, executing, or executed */ +#ifdef ERTS_SMP + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) + erts_thr_progress_unmanaged_continue(dhndl); +#endif + schedule_port_task_handle_list_free(pthlp); + continue; + } + + reset_port_task_handle(pthp); + + type = ptp->type; + td = ptp->u.alive.td; + +#ifdef ERTS_SMP + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) + erts_thr_progress_unmanaged_continue(dhndl); +#endif + schedule_port_task_handle_list_free(pthlp); + + abort_nosuspend_task(type, &td); + } +} + /* * Schedule a task. */ @@ -416,18 +1176,23 @@ erts_port_task_schedule(Eterm id, if (!pp) goto fail; - ptp = port_task_alloc(); + if (type != ERTS_PORT_TASK_PROC_SIG) { + ptp = port_task_alloc(); - ptp->type = type; + ptp->type = type; + ptp->u.alive.flags = 0; - erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED); + erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED); + + set_handle(ptp, pthp); + } switch (type) { case ERTS_PORT_TASK_INPUT: case ERTS_PORT_TASK_OUTPUT: { va_list argp; va_start(argp, type); - ptp->u.alive.u.io.event = va_arg(argp, ErlDrvEvent); + ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent); va_end(argp); erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks); break; @@ -435,18 +1200,32 @@ erts_port_task_schedule(Eterm id, case ERTS_PORT_TASK_EVENT: { va_list argp; va_start(argp, type); - ptp->u.alive.u.io.event = va_arg(argp, ErlDrvEvent); - ptp->u.alive.u.io.event_data = va_arg(argp, ErlDrvEventData); + ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent); + ptp->u.alive.td.io.event_data = va_arg(argp, ErlDrvEventData); va_end(argp); erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks); break; } + case ERTS_PORT_TASK_PROC_SIG: { + ErtsProc2PortSigData *sigdp; + va_list argp; + ASSERT(!pthp); + va_start(argp, type); + sigdp = va_arg(argp, ErtsProc2PortSigData *); + ptp = p2p_sig_data_to_task(sigdp); + ptp->u.alive.td.psig.callback = va_arg(argp, ErtsProc2PortSigCallback); + ptp->u.alive.flags |= va_arg(argp, int); + va_end(argp); + if (ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND) + save_nosuspend_handle(pp, ptp); + else + set_handle(ptp, pthp); + break; + } default: break; } - set_handle(ptp, pthp); - act = enqueue_task(pp, ptp); if (act & ERTS_PTS_FLG_EXIT) { reset_handle(ptp); @@ -572,6 +1351,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) { Port *pp; ErtsPortTask *execq; + int processing_busy_q; int res = 0; int reds = ERTS_PORT_REDS_EXECUTE; erts_aint_t io_tasks_executed = 0; @@ -606,7 +1386,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) erts_smp_spin_unlock(&erts_sched_stat.lock); } - prepare_exec(pp, &execq); + prepare_exec(pp, &execq, &processing_busy_q); erts_smp_port_lock(pp); @@ -617,14 +1397,14 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) fpe_was_unmasked = erts_block_fpe(); - state = erts_atomic_read_nob(&pp->state); + state = erts_atomic32_read_nob(&pp->state); goto begin_handle_tasks; while (1) { erts_aint32_t task_state; ErtsPortTask *ptp; - ptp = pop_task(pp, &execq); + ptp = select_task_for_exec(pp, &execq, &processing_busy_q); if (!ptp) break; @@ -645,40 +1425,44 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) switch (ptp->type) { case ERTS_PORT_TASK_TIMEOUT: reds += ERTS_PORT_REDS_TIMEOUT; - if (!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)) { + if (!(state & ERTS_PORT_SFLGS_DEAD)) { DTRACE_DRIVER(driver_timeout, pp); (*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data); } break; case ERTS_PORT_TASK_INPUT: reds += ERTS_PORT_REDS_INPUT; - ASSERT((erts_atomic32_read_nob(&pp->state) - & ERTS_PORT_SFLGS_DEAD) == 0); + ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_ready_input, pp); /* NOTE some windows drivers use ->ready_input for input and output */ (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data, - ptp->u.alive.u.io.event); + ptp->u.alive.td.io.event); io_tasks_executed++; break; case ERTS_PORT_TASK_OUTPUT: reds += ERTS_PORT_REDS_OUTPUT; - ASSERT((erts_atomic32_read_nob(&pp->state) - & ERTS_PORT_SFLGS_DEAD) == 0); + ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_ready_output, pp); (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, - ptp->u.alive.u.io.event); + ptp->u.alive.td.io.event); io_tasks_executed++; break; case ERTS_PORT_TASK_EVENT: reds += ERTS_PORT_REDS_EVENT; - ASSERT((erts_atomic32_read_nob(&pp->state) - & ERTS_PORT_SFLGS_DEAD) == 0); + ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_event, pp); (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data, - ptp->u.alive.u.io.event, - ptp->u.alive.u.io.event_data); + ptp->u.alive.td.io.event, + ptp->u.alive.td.io.event_data); io_tasks_executed++; break; + case ERTS_PORT_TASK_PROC_SIG: + ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); + reds += ptp->u.alive.td.psig.callback(pp, + state, + ERTS_PROC2PORT_SIG_EXEC, + &ptp->u.alive.td.psig.data); + break; case ERTS_PORT_TASK_DIST_CMD: reds += erts_dist_command(pp, CONTEXT_REDS-reds); break; @@ -740,7 +1524,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) ASSERT(runq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue)); #endif - active = finalize_exec(pp, &execq); + active = finalize_exec(pp, &execq, processing_busy_q); erts_port_release(pp); @@ -858,24 +1642,30 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp) break; case ERTS_PORT_TASK_INPUT: erts_stale_drv_select(pp->common.id, - ptp->u.alive.u.io.event, + ptp->u.alive.td.io.event, DO_READ, 1); break; case ERTS_PORT_TASK_OUTPUT: erts_stale_drv_select(pp->common.id, - ptp->u.alive.u.io.event, + ptp->u.alive.td.io.event, DO_WRITE, 1); break; case ERTS_PORT_TASK_EVENT: erts_stale_drv_select(pp->common.id, - ptp->u.alive.u.io.event, + ptp->u.alive.td.io.event, 0, 1); break; case ERTS_PORT_TASK_DIST_CMD: break; + case ERTS_PORT_TASK_PROC_SIG: + ptp->u.alive.td.psig.callback(NULL, + ERTS_PORT_SFLG_INVALID, + ERTS_PROC2PORT_SIG_ABORT_CLOSED, + &ptp->u.alive.td.psig.data); + break; default: erl_exit(ERTS_ABORT_EXIT, "Invalid port task type: %d\n", @@ -944,4 +1734,5 @@ erts_port_task_init(void) erts_smp_atomic_init_nob(&erts_port_task_outstanding_io_tasks, (erts_aint_t) 0); init_port_task_alloc(); + init_busy_caller_table_alloc(); } |