/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 2006-2017. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* %CopyrightEnd%
*/
/*
* Description: Scheduling of port tasks
*
* Author: Rickard Green
*/
#define ERL_PORT_TASK_C__
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "global.h"
#include "erl_port_task.h"
#include "dist.h"
#include "erl_check_io.h"
#include "dtrace-wrapper.h"
#include "lttng-wrapper.h"
#include "erl_check_io.h"
#include <stdarg.h>
/*
* ERTS_PORT_CALLBACK_VREDS: Limit the amount of callback calls we do...
*/
#define ERTS_PORT_CALLBACK_VREDS (CONTEXT_REDS/20)
#if defined(DEBUG) && 0
#define ERTS_HARD_DEBUG_TASK_QUEUES
#else
#undef ERTS_HARD_DEBUG_TASK_QUEUES
#endif
#ifdef ERTS_HARD_DEBUG_TASK_QUEUES
static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue);
#define ERTS_PT_DBG_CHK_TASK_QS(PP, EQ, PBQ) \
chk_task_queues((PP), (EQ), (PBQ))
#else
#define ERTS_PT_DBG_CHK_TASK_QS(PP, EQ, PBQ)
#endif
#ifdef USE_VM_PROBES
#define DTRACE_DRIVER(PROBE_NAME, PP) \
if (DTRACE_ENABLED(PROBE_NAME)) { \
DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE); \
DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); \
\
dtrace_pid_str(ERTS_PORT_GET_CONNECTED(PP), process_str); \
dtrace_port_str(PP, port_str); \
DTRACE3(PROBE_NAME, process_str, port_str, PP->name); \
}
#else
#define DTRACE_DRIVER(PROBE_NAME, PP) do {} while(0)
#endif
#ifdef USE_LTTNG_VM_TRACEPOINTS
#define LTTNG_DRIVER(TRACEPOINT, PP) \
if (LTTNG_ENABLED(TRACEPOINT)) { \
lttng_decl_portbuf(port_str); \
lttng_decl_procbuf(proc_str); \
lttng_pid_to_str(ERTS_PORT_GET_CONNECTED(PP), proc_str); \
lttng_port_to_str((PP), port_str); \
LTTNG3(TRACEPOINT, proc_str, port_str, (PP)->name); \
}
#else
#define LTTNG_DRIVER(TRACEPOINT, PP) do {} while(0)
#endif
#define ERTS_LC_VERIFY_RQ(RQ, PP) \
do { \
ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq)); \
ERTS_LC_ASSERT((RQ) == ((ErtsRunQueue *) \
erts_atomic_read_nob(&(PP)->run_queue))); \
} while (0)
#define ERTS_PT_STATE_SCHEDULED 0
#define ERTS_PT_STATE_ABORTED 1
#define ERTS_PT_STATE_EXECUTING 2
typedef union {
struct { /* I/O tasks */
ErlDrvEvent event;
} io;
struct {
ErtsProc2PortSigCallback callback;
ErtsProc2PortSigData data;
} psig;
} ErtsPortTaskTypeData;
struct ErtsPortTask_ {
erts_atomic32_t state;
ErtsPortTaskType type;
union {
struct {
ErtsPortTask *next;
ErtsPortTaskHandle *handle;
int flags;
Uint32 ref[ERTS_MAX_REF_NUMBERS];
ErtsPortTaskTypeData td;
} alive;
ErtsThrPrgrLaterOp release;
} u;
};
struct ErtsPortTaskHandleList_ {
ErtsPortTaskHandle handle;
union {
ErtsPortTaskHandleList *next;
ErtsThrPrgrLaterOp release;
} u;
};
typedef struct ErtsPortTaskBusyCaller_ ErtsPortTaskBusyCaller;
struct ErtsPortTaskBusyCaller_ {
ErtsPortTaskBusyCaller *next;
Eterm caller;
SWord count;
ErtsPortTask *last;
};
#define ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS 17
struct ErtsPortTaskBusyCallerTable_ {
ErtsPortTaskBusyCaller *bucket[ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS];
ErtsPortTaskBusyCaller pre_alloc_busy_caller;
};
static void begin_port_cleanup(Port *pp,
ErtsPortTask **execq,
int *processing_busy_q_p);
ERTS_THR_PREF_QUICK_ALLOC_IMPL(port_task,
ErtsPortTask,
1000,
ERTS_ALC_T_PORT_TASK)
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(busy_caller_table,
ErtsPortTaskBusyCallerTable,
50,
ERTS_ALC_T_BUSY_CALLER_TAB)
static void
call_port_task_free(void *vptp)
{
port_task_free((ErtsPortTask *) vptp);
}
static ERTS_INLINE void
schedule_port_task_free(ErtsPortTask *ptp)
{
erts_schedule_thr_prgr_later_cleanup_op(call_port_task_free,
(void *) ptp,
&ptp->u.release,
sizeof(ErtsPortTask));
}
static ERTS_INLINE ErtsPortTask *
p2p_sig_data_to_task(ErtsProc2PortSigData *sigdp)
{
ErtsPortTask *ptp;
char *ptr = (char *) sigdp;
ptr -= offsetof(ErtsPortTask, u.alive.td.psig.data);
ptp = (ErtsPortTask *) ptr;
ASSERT(ptp->type == ERTS_PORT_TASK_PROC_SIG);
return ptp;
}
static ERTS_INLINE ErtsProc2PortSigData *
p2p_sig_data_init(ErtsPortTask *ptp)
{
ptp->type = ERTS_PORT_TASK_PROC_SIG;
ptp->u.alive.flags = ERTS_PT_FLG_SIG_DEP;
erts_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);
ASSERT(ptp == p2p_sig_data_to_task(&ptp->u.alive.td.psig.data));
return &ptp->u.alive.td.psig.data;
}
ErtsProc2PortSigData *
erts_port_task_alloc_p2p_sig_data(void)
{
ErtsPortTask *ptp = port_task_alloc();
return p2p_sig_data_init(ptp);
}
ErtsProc2PortSigData *
erts_port_task_alloc_p2p_sig_data_extra(size_t extra, void **extra_ptr)
{
ErtsPortTask *ptp = erts_alloc(ERTS_ALC_T_PORT_TASK,
sizeof(ErtsPortTask) + extra);
*extra_ptr = ptp+1;
return p2p_sig_data_init(ptp);
}
void
erts_port_task_free_p2p_sig_data(ErtsProc2PortSigData *sigdp)
{
schedule_port_task_free(p2p_sig_data_to_task(sigdp));
}
static ERTS_INLINE Eterm
task_caller(ErtsPortTask *ptp)
{
Eterm caller;
ASSERT(ptp->type == ERTS_PORT_TASK_PROC_SIG);
caller = ptp->u.alive.td.psig.data.caller;
ASSERT(is_internal_pid(caller) || is_internal_port(caller));
return caller;
}
/*
* Busy queue management
*/
static ERTS_INLINE int
caller2bix(Eterm caller)
{
ASSERT(is_internal_pid(caller) || is_internal_port(caller));
return (int) (_GET_PID_DATA(caller) % ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS);
}
static void
popped_from_busy_queue(Port *pp, ErtsPortTask *ptp, int last)
{
ErtsPortTaskBusyCaller **prev_bcpp = NULL, *bcp;
ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
Eterm caller = task_caller(ptp);
int bix = caller2bix(caller);
ASSERT(is_internal_pid(caller));
ASSERT(tabp);
bcp = tabp->bucket[bix];
prev_bcpp = &tabp->bucket[bix];
ASSERT(bcp);
while (bcp->caller != caller) {
prev_bcpp = &bcp->next;
bcp = bcp->next;
ASSERT(bcp);
}
ASSERT(bcp->count > 0);
if (--bcp->count != 0) {
ASSERT(!last);
}
else {
*prev_bcpp = bcp->next;
if (bcp == &tabp->pre_alloc_busy_caller)
bcp->caller = am_undefined;
else
erts_free(ERTS_ALC_T_BUSY_CALLER, bcp);
if (last) {
#ifdef DEBUG
erts_aint32_t flags =
#endif
erts_atomic32_read_band_nob(
&pp->sched.flags,
~ERTS_PTS_FLG_HAVE_BUSY_TASKS);
ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
#ifdef DEBUG
for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
ASSERT(!tabp->bucket[bix]);
}
#endif
busy_caller_table_free(tabp);
pp->sched.taskq.local.busy.first = NULL;
pp->sched.taskq.local.busy.last = NULL;
pp->sched.taskq.local.busy.table = NULL;
}
}
}
static void
busy_wait_move_to_busy_queue(Port *pp, ErtsPortTask *ptp)
{
ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
Eterm caller = task_caller(ptp);
ErtsPortTaskBusyCaller *bcp;
int bix;
ASSERT(is_internal_pid(caller));
/*
* Port is busy and this task type needs to wait until not busy.
*/
ASSERT(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY);
ptp->u.alive.next = NULL;
if (pp->sched.taskq.local.busy.last) {
ASSERT(pp->sched.taskq.local.busy.first);
pp->sched.taskq.local.busy.last->u.alive.next = ptp;
}
else {
int i;
#ifdef DEBUG
erts_aint32_t flags;
#endif
pp->sched.taskq.local.busy.first = ptp;
#ifdef DEBUG
flags =
#endif
erts_atomic32_read_bor_nob(&pp->sched.flags,
ERTS_PTS_FLG_HAVE_BUSY_TASKS);
ASSERT(!(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS));
ASSERT(!tabp);
tabp = busy_caller_table_alloc();
pp->sched.taskq.local.busy.table = tabp;
for (i = 0; i < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; i++)
tabp->bucket[i] = NULL;
tabp->pre_alloc_busy_caller.caller = am_undefined;
}
pp->sched.taskq.local.busy.last = ptp;
bix = caller2bix(caller);
ASSERT(tabp);
bcp = tabp->bucket[bix];
while (bcp && bcp->caller != caller)
bcp = bcp->next;
if (bcp)
bcp->count++;
else {
if (tabp->pre_alloc_busy_caller.caller == am_undefined)
bcp = &tabp->pre_alloc_busy_caller;
else
bcp = erts_alloc(ERTS_ALC_T_BUSY_CALLER,
sizeof(ErtsPortTaskBusyCaller));
bcp->caller = caller;
bcp->count = 1;
bcp->next = tabp->bucket[bix];
tabp->bucket[bix] = bcp;
}
bcp->last = ptp;
}
static ERTS_INLINE int
check_sig_dep_move_to_busy_queue(Port *pp, ErtsPortTask *ptp)
{
ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
ErtsPortTask *last_ptp;
ErtsPortTaskBusyCaller *bcp;
int bix;
Eterm caller;
ASSERT(ptp->u.alive.flags & ERTS_PT_FLG_SIG_DEP);
ASSERT(pp->sched.taskq.local.busy.last);
ASSERT(tabp);
/*
* We are either not busy, or the task does not imply wait on busy port.
* However, due to the signaling order requirements the task might depend
* on other tasks in the busy queue.
*/
caller = task_caller(ptp);
bix = caller2bix(caller);
bcp = tabp->bucket[bix];
while (bcp && bcp->caller != caller)
bcp = bcp->next;
if (!bcp)
return 0;
/*
* There are other tasks that we depend on in the busy queue;
* move into busy queue.
*/
bcp->count++;
last_ptp = bcp->last;
ptp->u.alive.next = last_ptp->u.alive.next;
if (!ptp->u.alive.next) {
ASSERT(pp->sched.taskq.local.busy.last == last_ptp);
pp->sched.taskq.local.busy.last = ptp;
}
last_ptp->u.alive.next = ptp;
bcp->last = ptp;
return 1;
}
static void
no_sig_dep_move_from_busyq(Port *pp)
{
ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
ErtsPortTask *first_ptp, *last_ptp, *ptp;
ErtsPortTaskBusyCaller **prev_bcpp = NULL, *bcp = NULL;
/*
* Move tasks at the head of the busy queue that no longer
* have any dependencies to busy wait tasks into the ordinary
* queue.
*/
first_ptp = ptp = pp->sched.taskq.local.busy.first;
ASSERT(ptp && !(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY));
ASSERT(tabp);
do {
Eterm caller = task_caller(ptp);
if (!bcp || bcp->caller != caller) {
int bix = caller2bix(caller);
prev_bcpp = &tabp->bucket[bix];
bcp = tabp->bucket[bix];
ASSERT(bcp);
while (bcp->caller != caller) {
ASSERT(bcp);
prev_bcpp = &bcp->next;
bcp = bcp->next;
}
}
ASSERT(bcp->caller == caller);
ASSERT(bcp->count > 0);
if (--bcp->count == 0) {
*prev_bcpp = bcp->next;
if (bcp == &tabp->pre_alloc_busy_caller)
bcp->caller = am_undefined;
else
erts_free(ERTS_ALC_T_BUSY_CALLER, bcp);
}
last_ptp = ptp;
ptp = ptp->u.alive.next;
} while (ptp && !(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY));
pp->sched.taskq.local.busy.first = last_ptp->u.alive.next;
if (!pp->sched.taskq.local.busy.first) {
#ifdef DEBUG
int bix;
erts_aint32_t flags =
#endif
erts_atomic32_read_band_nob(
&pp->sched.flags,
~ERTS_PTS_FLG_HAVE_BUSY_TASKS);
ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
#ifdef DEBUG
for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
ASSERT(!tabp->bucket[bix]);
}
#endif
busy_caller_table_free(tabp);
pp->sched.taskq.local.busy.last = NULL;
pp->sched.taskq.local.busy.table = NULL;
}
last_ptp->u.alive.next = pp->sched.taskq.local.first;
pp->sched.taskq.local.first = first_ptp;
}
#ifdef ERTS_HARD_DEBUG_TASK_QUEUES
static void
chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue)
{
Sint tot_count, tot_table_count;
int bix;
ErtsPortTask *ptp, *last;
ErtsPortTask *first = processing_busy_queue ? execq : pp->sched.taskq.local.busy.first;
ErtsPortTask *nb_task_queue = processing_busy_queue ? pp->sched.taskq.local.first : execq;
ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
ErtsPortTaskBusyCaller *bcp;
if (!first) {
ASSERT(!tabp);
ASSERT(!pp->sched.taskq.local.busy.last);
ASSERT(!(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS));
return;
}
ASSERT(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
ASSERT(tabp);
tot_count = 0;
ptp = first;
while (ptp) {
Sint count = 0;
Eterm caller = task_caller(ptp);
int bix = caller2bix(caller);
for (bcp = tabp->bucket[bix]; bcp; bcp = bcp->next)
if (bcp->caller == caller)
break;
ASSERT(bcp && bcp->caller == caller);
ASSERT(bcp->last);
while (1) {
ErtsPortTask *ptp2;
ASSERT(caller == task_caller(ptp));
count++;
tot_count++;
last = ptp;
for (ptp2 = nb_task_queue; ptp2; ptp2 = ptp2->u.alive.next) {
ASSERT(ptp != ptp2);
}
if (ptp == bcp->last)
break;
ptp = ptp->u.alive.next;
}
ASSERT(count == bcp->count);
ptp = ptp->u.alive.next;
}
tot_table_count = 0;
for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
for (bcp = tabp->bucket[bix]; bcp; bcp = bcp->next)
tot_table_count += bcp->count;
}
ASSERT(tot_count == tot_table_count);
ASSERT(last == pp->sched.taskq.local.busy.last);
}
#endif /* ERTS_HARD_DEBUG_TASK_QUEUES */
/*
* Task handle manipulation.
*/
static ERTS_INLINE void
reset_port_task_handle(ErtsPortTaskHandle *pthp)
{
erts_atomic_set_relb(pthp, (erts_aint_t) NULL);
}
static ERTS_INLINE ErtsPortTask *
handle2task(ErtsPortTaskHandle *pthp)
{
return (ErtsPortTask *) erts_atomic_read_acqb(pthp);
}
static ERTS_INLINE void
reset_handle(ErtsPortTask *ptp)
{
if (ptp->u.alive.handle) {
ASSERT(ptp == handle2task(ptp->u.alive.handle));
reset_port_task_handle(ptp->u.alive.handle);
}
}
static ERTS_INLINE void
reset_executed_io_task_handle(ErtsPortTask *ptp)
{
if (ptp->u.alive.handle) {
ASSERT(ptp == handle2task(ptp->u.alive.handle));
/* The port task handle is reset inside task_executed */
erts_io_notify_port_task_executed(ptp->type, ptp->u.alive.handle,
reset_port_task_handle);
}
}
static ERTS_INLINE void
set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
{
ptp->u.alive.handle = pthp;
if (pthp) {
erts_atomic_set_relb(pthp, (erts_aint_t) ptp);
ASSERT(ptp == handle2task(ptp->u.alive.handle));
}
}
static ERTS_INLINE void
set_tmp_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
{
ptp->u.alive.handle = NULL;
if (pthp) {
/*
* IMPORTANT! Task either need to be aborted, or task handle
* need to be detached before thread progress has been made.
*/
erts_atomic_set_relb(pthp, (erts_aint_t) ptp);
}
}
/*
* Busy port queue management
*/
static erts_aint32_t
check_unset_busy_port_q(Port *pp,
erts_aint32_t flags,
ErtsPortTaskBusyPortQ *bpq)
{
ErlDrvSizeT qsize, low;
int resume_procs = 0;
ASSERT(bpq);
ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
erts_port_task_sched_lock(&pp->sched);
qsize = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->size);
low = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low);
if (qsize < low) {
erts_aint32_t mask = ~(ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
| ERTS_PTS_FLG_BUSY_PORT_Q);
flags = erts_atomic32_read_band_relb(&pp->sched.flags, mask);
if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
resume_procs = 1;
}
else if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) {
flags = erts_atomic32_read_band_relb(&pp->sched.flags,
~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
flags &= ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
}
erts_port_task_sched_unlock(&pp->sched);
if (resume_procs)
erts_port_resume_procs(pp);
return flags;
}
static ERTS_INLINE void
aborted_proc2port_data(Port *pp, ErlDrvSizeT size)
{
ErtsPortTaskBusyPortQ *bpq;
erts_aint32_t flags;
ErlDrvSizeT qsz;
ASSERT(pp->sched.taskq.bpq);
if (size == 0)
return;
bpq = pp->sched.taskq.bpq;
qsz = (ErlDrvSizeT) erts_atomic_add_read_acqb(&bpq->size,
(erts_aint_t) -size);
ASSERT(qsz + size > qsz);
flags = erts_atomic32_read_nob(&pp->sched.flags);
ASSERT(pp->sched.taskq.bpq);
if ((flags & (ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
| ERTS_PTS_FLG_BUSY_PORT_Q)) != ERTS_PTS_FLG_BUSY_PORT_Q)
return;
if (qsz < (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low))
erts_atomic32_read_bor_nob(&pp->sched.flags,
ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
}
static ERTS_INLINE void
dequeued_proc2port_data(Port *pp, ErlDrvSizeT size)
{
ErtsPortTaskBusyPortQ *bpq;
erts_aint32_t flags;
ErlDrvSizeT qsz;
ASSERT(pp->sched.taskq.bpq);
if (size == 0)
return;
bpq = pp->sched.taskq.bpq;
qsz = (ErlDrvSizeT) erts_atomic_add_read_acqb(&bpq->size,
(erts_aint_t) -size);
ASSERT(qsz + size > qsz);
flags = erts_atomic32_read_nob(&pp->sched.flags);
if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q))
return;
if (qsz < (ErlDrvSizeT) erts_atomic_read_acqb(&bpq->low))
check_unset_busy_port_q(pp, flags, bpq);
}
static ERTS_INLINE erts_aint32_t
enqueue_proc2port_data(Port *pp,
ErtsProc2PortSigData *sigdp,
erts_aint32_t flags)
{
ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq;
if (sigdp && bpq) {
ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
if (size) {
erts_aint_t asize = erts_atomic_add_read_acqb(&bpq->size,
(erts_aint_t) size);
ErlDrvSizeT qsz = (ErlDrvSizeT) asize;
ASSERT(qsz - size < qsz);
if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q) && qsz > bpq->high) {
flags = erts_atomic32_read_bor_acqb(&pp->sched.flags,
ERTS_PTS_FLG_BUSY_PORT_Q);
flags |= ERTS_PTS_FLG_BUSY_PORT_Q;
qsz = (ErlDrvSizeT) erts_atomic_read_acqb(&bpq->size);
if (qsz < (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low)) {
flags = (erts_atomic32_read_bor_relb(
&pp->sched.flags,
ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q));
flags |= ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
}
}
ASSERT(!(flags & ERTS_PTS_FLG_EXIT));
}
}
return flags;
}
/*
* erl_drv_busy_msgq_limits() is called by drivers either reading or
* writing the limits.
*
* A limit of zero is interpreted as a read only request (using a
* limit of zero would not be useful). Other values are interpreted
* as a write-read request.
*/
void
erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp)
{
Port *pp = erts_drvport2port(dport);
ErtsPortTaskBusyPortQ *bpq;
int written = 0, resume_procs = 0;
ErlDrvSizeT low, high;
if (pp == ERTS_INVALID_ERL_DRV_PORT || !(bpq = pp->sched.taskq.bpq)) {
if (lowp)
*lowp = ERL_DRV_BUSY_MSGQ_DISABLED;
if (highp)
*highp = ERL_DRV_BUSY_MSGQ_DISABLED;
return;
}
low = lowp ? *lowp : 0;
high = highp ? *highp : 0;
erts_port_task_sched_lock(&pp->sched);
if (low == ERL_DRV_BUSY_MSGQ_DISABLED
|| high == ERL_DRV_BUSY_MSGQ_DISABLED) {
/* Disable busy msgq feature */
erts_aint32_t flags;
pp->sched.taskq.bpq = NULL;
flags = ~(ERTS_PTS_FLG_BUSY_PORT_Q|ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
flags = erts_atomic32_read_band_acqb(&pp->sched.flags, flags);
if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
resume_procs = 1;
}
else {
if (!low)
low = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low);
else {
if (bpq->high < low)
bpq->high = low;
erts_atomic_set_relb(&bpq->low, (erts_aint_t) low);
written = 1;
}
if (!high)
high = bpq->high;
else {
if (low > high) {
low = high;
erts_atomic_set_relb(&bpq->low, (erts_aint_t) low);
}
bpq->high = high;
written = 1;
}
if (written) {
ErlDrvSizeT size = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->size);
if (size > high)
erts_atomic32_read_bor_relb(&pp->sched.flags,
ERTS_PTS_FLG_BUSY_PORT_Q);
else if (size < low)
erts_atomic32_read_bor_relb(&pp->sched.flags,
ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
}
}
erts_port_task_sched_unlock(&pp->sched);
if (resume_procs)
erts_port_resume_procs(pp);
if (lowp)
*lowp = low;
if (highp)
*highp = high;
}
/*
* No-suspend handles.
*/
static void
free_port_task_handle_list(void *vpthlp)
{
erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp);
}
static void
schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp)
{
erts_schedule_thr_prgr_later_cleanup_op(free_port_task_handle_list,
(void *) pthlp,
&pthlp->u.release,
sizeof(ErtsPortTaskHandleList));
}
static ERTS_INLINE void
abort_signal_task(Port *pp,
int abort_type,
ErtsPortTaskType type,
ErtsPortTaskTypeData *tdp,
int bpq_data)
{
ASSERT(type == ERTS_PORT_TASK_PROC_SIG);
if (!bpq_data)
tdp->psig.callback(NULL,
ERTS_PORT_SFLG_INVALID,
abort_type,
&tdp->psig.data);
else {
ErlDrvSizeT size = erts_proc2port_sig_command_data_size(&tdp->psig.data);
tdp->psig.callback(NULL,
ERTS_PORT_SFLG_INVALID,
abort_type,
&tdp->psig.data);
aborted_proc2port_data(pp, size);
}
}
static ERTS_INLINE void
abort_nosuspend_task(Port *pp,
ErtsPortTaskType type,
ErtsPortTaskTypeData *tdp,
int bpq_data)
{
abort_signal_task(pp, ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND, type, tdp, bpq_data);
}
static ErtsPortTaskHandleList *
get_free_nosuspend_handles(Port *pp)
{
ErtsPortTaskHandleList *nshp, *last_nshp = NULL;
ERTS_LC_ASSERT(erts_port_task_sched_lock_is_locked(&pp->sched));
nshp = pp->sched.taskq.local.busy.nosuspend;
while (nshp && !erts_port_task_is_scheduled(&nshp->handle)) {
last_nshp = nshp;
nshp = nshp->u.next;
}
if (!last_nshp)
nshp = NULL;
else {
nshp = pp->sched.taskq.local.busy.nosuspend;
pp->sched.taskq.local.busy.nosuspend = last_nshp->u.next;
last_nshp->u.next = NULL;
if (!pp->sched.taskq.local.busy.nosuspend)
erts_atomic32_read_band_nob(&pp->sched.flags,
~ERTS_PTS_FLG_HAVE_NS_TASKS);
}
return nshp;
}
static void
free_nosuspend_handles(ErtsPortTaskHandleList *free_nshp)
{
while (free_nshp) {
ErtsPortTaskHandleList *nshp = free_nshp;
free_nshp = free_nshp->u.next;
schedule_port_task_handle_list_free(nshp);
}
}
/*
* Port queue operations
*/
static ERTS_INLINE void
enqueue_port(ErtsRunQueue *runq, Port *pp)
{
ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
pp->sched.next = NULL;
if (runq->ports.end) {
ASSERT(runq->ports.start);
runq->ports.end->sched.next = pp;
}
else {
ASSERT(!runq->ports.start);
runq->ports.start = pp;
}
runq->ports.end = pp;
ASSERT(runq->ports.start && runq->ports.end);
erts_inc_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL);
if (ERTS_RUNQ_FLGS_GET_NOB(runq) & ERTS_RUNQ_FLG_HALTING)
erts_non_empty_runq(runq);
}
static ERTS_INLINE Port *
pop_port(ErtsRunQueue *runq)
{
Port *pp = runq->ports.start;
ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
if (!pp) {
ASSERT(!runq->ports.end);
}
else {
runq->ports.start = runq->ports.start->sched.next;
if (!runq->ports.start) {
ASSERT(runq->ports.end == pp);
runq->ports.end = NULL;
}
erts_dec_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL);
}
ASSERT(runq->ports.start || !runq->ports.end);
ASSERT(runq->ports.end || !runq->ports.start);
return pp;
}
/*
* Task queue operations
*/
static ERTS_INLINE int
enqueue_task(Port *pp,
ErtsPortTask *ptp,
ErtsProc2PortSigData *sigdp,
ErtsPortTaskHandleList *ns_pthlp,
erts_aint32_t *flagsp)
{
int res;
erts_aint32_t fail_flags = ERTS_PTS_FLG_EXIT;
erts_aint32_t flags;
ptp->u.alive.next = NULL;
if (ns_pthlp)
fail_flags |= ERTS_PTS_FLG_BUSY_PORT;
erts_port_task_sched_lock(&pp->sched);
flags = erts_atomic32_read_nob(&pp->sched.flags);
if (flags & fail_flags)
res = 0;
else {
if (ns_pthlp) {
ns_pthlp->u.next = pp->sched.taskq.local.busy.nosuspend;
pp->sched.taskq.local.busy.nosuspend = ns_pthlp;
}
if (pp->sched.taskq.in.last) {
ASSERT(pp->sched.taskq.in.first);
ASSERT(!pp->sched.taskq.in.last->u.alive.next);
pp->sched.taskq.in.last->u.alive.next = ptp;
}
else {
ASSERT(!pp->sched.taskq.in.first);
pp->sched.taskq.in.first = ptp;
}
pp->sched.taskq.in.last = ptp;
flags = enqueue_proc2port_data(pp, sigdp, flags);
res = 1;
}
erts_port_task_sched_unlock(&pp->sched);
*flagsp = flags;
return res;
}
static ERTS_INLINE void
prepare_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
{
erts_aint32_t act = erts_atomic32_read_nob(&pp->sched.flags);
if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY_PORT)) {
*execqp = pp->sched.taskq.local.first;
*processing_busy_q_p = 0;
}
else {
*execqp = pp->sched.taskq.local.busy.first;
*processing_busy_q_p = 1;
}
ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
while (1) {
erts_aint32_t new, exp;
new = exp = act;
new &= ~ERTS_PTS_FLG_IN_RUNQ;
new |= ERTS_PTS_FLG_EXEC;
act = erts_atomic32_cmpxchg_nob(&pp->sched.flags, new, exp);
ASSERT(act & ERTS_PTS_FLG_IN_RUNQ);
if (exp == act)
break;
}
}
/* finalize_exec() return value != 0 if port should remain active */
static ERTS_INLINE int
finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q)
{
erts_aint32_t act;
unsigned int prof_runnable_ports;
if (!processing_busy_q)
pp->sched.taskq.local.first = *execq;
else {
pp->sched.taskq.local.busy.first = *execq;
ASSERT(*execq);
}
ERTS_PT_DBG_CHK_TASK_QS(pp, *execq, processing_busy_q);
*execq = NULL;
act = erts_atomic32_read_nob(&pp->sched.flags);
if (act & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
act = check_unset_busy_port_q(pp, act, pp->sched.taskq.bpq);
prof_runnable_ports = erts_system_profile_flags.runnable_ports;
if (prof_runnable_ports)
erts_port_task_sched_lock(&pp->sched);
while (1) {
erts_aint32_t new, exp;
new = exp = act;
new &= ~ERTS_PTS_FLG_EXEC;
if (act & ERTS_PTS_FLG_HAVE_TASKS)
new |= ERTS_PTS_FLG_IN_RUNQ;
act = erts_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
ERTS_LC_ASSERT(!(act & ERTS_PTS_FLG_IN_RUNQ));
ERTS_LC_ASSERT(!(act & ERTS_PTS_FLG_EXEC_IMM));
if (exp == act)
break;
}
if (prof_runnable_ports | IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
/* trace port scheduling, out */
if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS))
trace_sched_ports(pp, am_out);
if (prof_runnable_ports) {
if (!(act & (ERTS_PTS_FLG_EXEC_IMM|ERTS_PTS_FLG_HAVE_TASKS)))
profile_runnable_port(pp, am_inactive);
erts_port_task_sched_unlock(&pp->sched);
}
}
return (act & ERTS_PTS_FLG_HAVE_TASKS) != 0;
}
static ERTS_INLINE erts_aint32_t
select_queue_for_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
{
erts_aint32_t flags = erts_atomic32_read_nob(&pp->sched.flags);
if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
flags = check_unset_busy_port_q(pp, flags, pp->sched.taskq.bpq);
ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
if (flags & ERTS_PTS_FLG_BUSY_PORT) {
if (*processing_busy_q_p) {
ErtsPortTask *ptp;
ptp = pp->sched.taskq.local.busy.first = *execqp;
if (!ptp)
pp->sched.taskq.local.busy.last = NULL;
else if (!(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY))
no_sig_dep_move_from_busyq(pp);
*execqp = pp->sched.taskq.local.first;
*processing_busy_q_p = 0;
ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
}
return flags;
}
/* Not busy */
if (!*processing_busy_q_p && pp->sched.taskq.local.busy.first) {
pp->sched.taskq.local.first = *execqp;
*execqp = pp->sched.taskq.local.busy.first;
*processing_busy_q_p = 1;
ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
}
return flags;
}
/*
* check_task_for_exec() returns a value !0 if the task
* is ok to execute; otherwise 0.
*/
static ERTS_INLINE int
check_task_for_exec(Port *pp,
erts_aint32_t flags,
ErtsPortTask **execqp,
int *processing_busy_q_p,
ErtsPortTask *ptp)
{
if (!*processing_busy_q_p) {
/* Processing normal queue */
ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);
if ((flags & ERTS_PTS_FLG_BUSY_PORT)
&& (ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)) {
busy_wait_move_to_busy_queue(pp, ptp);
ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
return 0;
}
if (pp->sched.taskq.local.busy.last
&& (ptp->u.alive.flags & ERTS_PT_FLG_SIG_DEP)) {
int res = !check_sig_dep_move_to_busy_queue(pp, ptp);
ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
return res;
}
}
else {
/* Processing busy queue */
ASSERT(!(flags & ERTS_PTS_FLG_BUSY_PORT));
ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);
popped_from_busy_queue(pp, ptp, !*execqp);
if (!*execqp) {
*execqp = pp->sched.taskq.local.first;
*processing_busy_q_p = 0;
}
ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
}
return 1;
}
static ErtsPortTask *
fetch_in_queue(Port *pp, ErtsPortTask **execqp)
{
ErtsPortTask *ptp;
ErtsPortTaskHandleList *free_nshp = NULL;
erts_port_task_sched_lock(&pp->sched);
ptp = pp->sched.taskq.in.first;
pp->sched.taskq.in.first = NULL;
pp->sched.taskq.in.last = NULL;
if (ptp)
*execqp = ptp->u.alive.next;
else
erts_atomic32_read_band_nob(&pp->sched.flags,
~ERTS_PTS_FLG_HAVE_TASKS);
if (pp->sched.taskq.local.busy.nosuspend)
free_nshp = get_free_nosuspend_handles(pp);
erts_port_task_sched_unlock(&pp->sched);
if (free_nshp)
free_nosuspend_handles(free_nshp);
return ptp;
}
static ERTS_INLINE ErtsPortTask *
select_task_for_exec(Port *pp,
ErtsPortTask **execqp,
int *processing_busy_q_p)
{
ErtsPortTask *ptp;
erts_aint32_t flags;
flags = select_queue_for_exec(pp, execqp, processing_busy_q_p);
while (1) {
ptp = *execqp;
if (ptp)
*execqp = ptp->u.alive.next;
else {
ptp = fetch_in_queue(pp, execqp);
if (!ptp)
return NULL;
}
if (check_task_for_exec(pp, flags, execqp, processing_busy_q_p, ptp))
return ptp;
}
}
/*
* Cut time slice
*/
int
erl_drv_consume_timeslice(ErlDrvPort dprt, int percent)
{
Port *pp = erts_drvport2port(dprt);
if (pp == ERTS_INVALID_ERL_DRV_PORT)
return -1;
if (percent < 1)
percent = 1;
else if (100 < percent)
percent = 100;
pp->reds += percent*((CONTEXT_REDS+99)/100);
if (pp->reds < CONTEXT_REDS)
return 0;
pp->reds = CONTEXT_REDS;
return 1;
}
void
erts_port_task_tmp_handle_detach(ErtsPortTaskHandle *pthp)
{
ERTS_LC_ASSERT(erts_thr_progress_lc_is_delaying());
reset_port_task_handle(pthp);
}
/*
* Abort a scheduled task.
*/
int
erts_port_task_abort(ErtsPortTaskHandle *pthp)
{
int res;
ErtsPortTask *ptp;
ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay();
ptp = handle2task(pthp);
if (!ptp)
res = -1;
else {
erts_aint32_t old_state;
#ifdef DEBUG
ErtsPortTaskHandle *saved_pthp = ptp->u.alive.handle;
ERTS_THR_READ_MEMORY_BARRIER;
old_state = erts_atomic32_read_nob(&ptp->state);
if (old_state == ERTS_PT_STATE_SCHEDULED) {
ASSERT(!saved_pthp || saved_pthp == pthp);
}
#endif
old_state = erts_atomic32_cmpxchg_nob(&ptp->state,
ERTS_PT_STATE_ABORTED,
ERTS_PT_STATE_SCHEDULED);
if (old_state != ERTS_PT_STATE_SCHEDULED)
res = - 1; /* Task already aborted, executing, or executed */
else {
reset_port_task_handle(pthp);
res = 0;
}
}
erts_thr_progress_unmanaged_continue(dhndl);
return res;
}
void
erts_port_task_abort_nosuspend_tasks(Port *pp)
{
ErtsPortTaskHandleList *abort_list;
ErtsThrPrgrDelayHandle dhndl = ERTS_THR_PRGR_DHANDLE_INVALID;
erts_port_task_sched_lock(&pp->sched);
erts_atomic32_read_band_nob(&pp->sched.flags,
~ERTS_PTS_FLG_HAVE_NS_TASKS);
abort_list = pp->sched.taskq.local.busy.nosuspend;
pp->sched.taskq.local.busy.nosuspend = NULL;
erts_port_task_sched_unlock(&pp->sched);
while (abort_list) {
#ifdef DEBUG
ErtsPortTaskHandle *saved_pthp;
#endif
ErtsPortTaskType type;
ErtsPortTaskTypeData td;
ErtsPortTaskHandle *pthp;
ErtsPortTask *ptp;
ErtsPortTaskHandleList *pthlp;
erts_aint32_t old_state;
pthlp = abort_list;
abort_list = pthlp->u.next;
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
dhndl = erts_thr_progress_unmanaged_delay();
pthp = &pthlp->handle;
ptp = handle2task(pthp);
if (!ptp) {
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
erts_thr_progress_unmanaged_continue(dhndl);
schedule_port_task_handle_list_free(pthlp);
continue;
}
#ifdef DEBUG
saved_pthp = ptp->u.alive.handle;
ERTS_THR_READ_MEMORY_BARRIER;
old_state = erts_atomic32_read_nob(&ptp->state);
if (old_state == ERTS_PT_STATE_SCHEDULED) {
ASSERT(saved_pthp == pthp);
}
#endif
old_state = erts_atomic32_cmpxchg_nob(&ptp->state,
ERTS_PT_STATE_ABORTED,
ERTS_PT_STATE_SCHEDULED);
if (old_state != ERTS_PT_STATE_SCHEDULED) {
/* Task already aborted, executing, or executed */
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
erts_thr_progress_unmanaged_continue(dhndl);
schedule_port_task_handle_list_free(pthlp);
continue;
}
reset_port_task_handle(pthp);
type = ptp->type;
td = ptp->u.alive.td;
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
erts_thr_progress_unmanaged_continue(dhndl);
schedule_port_task_handle_list_free(pthlp);
abort_nosuspend_task(pp, type, &td, pp->sched.taskq.bpq != NULL);
}
}
/*
* Schedule a task.
*/
int
erts_port_task_schedule(Eterm id,
ErtsPortTaskHandle *pthp,
ErtsPortTaskType type,
...)
{
ErtsProc2PortSigData *sigdp = NULL;
ErtsPortTaskHandleList *ns_pthlp = NULL;
ErtsRunQueue *xrunq;
ErtsThrPrgrDelayHandle dhndl;
ErtsRunQueue *runq;
Port *pp;
ErtsPortTask *ptp = NULL;
erts_aint32_t act, add_flags;
unsigned int prof_runnable_ports;
ERTS_LC_ASSERT(!pthp || !erts_port_task_is_scheduled(pthp));
ASSERT(is_internal_port(id));
dhndl = erts_thr_progress_unmanaged_delay();
pp = erts_port_lookup_raw(id);
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
if (pp)
erts_port_inc_refc(pp);
erts_thr_progress_unmanaged_continue(dhndl);
}
if (type != ERTS_PORT_TASK_PROC_SIG) {
if (!pp)
goto fail;
ptp = port_task_alloc();
ptp->type = type;
ptp->u.alive.flags = 0;
erts_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);
set_handle(ptp, pthp);
}
switch (type) {
case ERTS_PORT_TASK_INPUT:
case ERTS_PORT_TASK_OUTPUT: {
va_list argp;
va_start(argp, type);
ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent);
va_end(argp);
break;
}
case ERTS_PORT_TASK_PROC_SIG: {
va_list argp;
va_start(argp, type);
sigdp = va_arg(argp, ErtsProc2PortSigData *);
ptp = p2p_sig_data_to_task(sigdp);
ptp->u.alive.td.psig.callback = va_arg(argp, ErtsProc2PortSigCallback);
ptp->u.alive.flags |= va_arg(argp, int);
va_end(argp);
if (!pp)
goto fail;
if (!(ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND))
set_tmp_handle(ptp, pthp);
else {
ns_pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST,
sizeof(ErtsPortTaskHandleList));
set_handle(ptp, &ns_pthlp->handle);
}
break;
}
default:
break;
}
if (!enqueue_task(pp, ptp, sigdp, ns_pthlp, &act)) {
reset_handle(ptp);
if (ns_pthlp && !(act & ERTS_PTS_FLG_EXIT))
goto abort_nosuspend;
else
goto fail;
}
add_flags = ERTS_PTS_FLG_HAVE_TASKS;
if (ns_pthlp)
add_flags |= ERTS_PTS_FLG_HAVE_NS_TASKS;
prof_runnable_ports = erts_system_profile_flags.runnable_ports;
if (prof_runnable_ports)
erts_port_task_sched_lock(&pp->sched);
while (1) {
erts_aint32_t new, exp;
if ((act & add_flags) == add_flags
&& (act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
goto done; /* Done */
new = exp = act;
new |= add_flags;
if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
new |= ERTS_PTS_FLG_IN_RUNQ;
act = erts_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
if (exp == act) {
if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
break; /* Need to enqueue port */
goto done; /* Done */
}
if (act & ERTS_PTS_FLG_EXIT)
goto done; /* Died after our task insert... */
}
if (prof_runnable_ports) {
if (!(act & ERTS_PTS_FLG_EXEC_IMM))
profile_runnable_port(pp, am_active);
erts_port_task_sched_unlock(&pp->sched);
prof_runnable_ports = 0;
}
/* Enqueue port on run-queue */
runq = erts_port_runq(pp);
if (!runq)
ERTS_INTERNAL_ERROR("Missing run-queue");
xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
ERTS_LC_ASSERT(runq != xrunq);
ERTS_LC_VERIFY_RQ(runq, pp);
if (xrunq) {
/* Emigrate port ... */
erts_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq);
erts_runq_unlock(runq);
runq = erts_port_runq(pp);
if (!runq)
ERTS_INTERNAL_ERROR("Missing run-queue");
}
enqueue_port(runq, pp);
erts_runq_unlock(runq);
erts_notify_inc_runq(runq);
done:
if (prof_runnable_ports)
erts_port_task_sched_unlock(&pp->sched);
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
erts_port_dec_refc(pp);
return 0;
abort_nosuspend:
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
erts_port_dec_refc(pp);
abort_nosuspend_task(pp, ptp->type, &ptp->u.alive.td, 0);
ASSERT(ns_pthlp);
erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp);
ASSERT(ptp);
port_task_free(ptp);
return 0;
fail:
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
erts_port_dec_refc(pp);
if (ptp) {
abort_signal_task(pp, ERTS_PROC2PORT_SIG_ABORT,
ptp->type, &ptp->u.alive.td, 0);
port_task_free(ptp);
}
if (ns_pthlp)
erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp);
return -1;
}
void
erts_port_task_free_port(Port *pp)
{
erts_aint32_t flags;
ErtsRunQueue *runq;
ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
runq = erts_port_runq(pp);
if (!runq)
ERTS_INTERNAL_ERROR("Missing run-queue");
erts_port_task_sched_lock(&pp->sched);
flags = erts_atomic32_read_bor_relb(&pp->sched.flags,
ERTS_PTS_FLG_EXIT);
erts_port_task_sched_unlock(&pp->sched);
erts_atomic32_read_bset_relb(&pp->state,
(ERTS_PORT_SFLG_CONNECTED
| ERTS_PORT_SFLG_EXITING
| ERTS_PORT_SFLG_CLOSING
| ERTS_PORT_SFLG_FREE),
ERTS_PORT_SFLG_FREE);
erts_runq_unlock(runq);
if (!(flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
begin_port_cleanup(pp, NULL, NULL);
}
/*
* Execute scheduled tasks of a port.
*
* erts_port_task_execute() is called by scheduler threads between
* scheduling of processes. Run-queue lock should be held by caller.
*/
void
erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
{
Port *pp;
ErtsPortTask *execq;
int processing_busy_q;
int vreds = 0;
int reds = 0;
erts_aint_t io_tasks_executed = 0;
int fpe_was_unmasked;
erts_aint32_t state;
int active;
Uint64 start_time = 0;
ErtsSchedulerData *esdp = runq->scheduler;
ERTS_MSACC_PUSH_STATE_M();
ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
pp = pop_port(runq);
if (!pp) {
goto done;
}
ERTS_LC_VERIFY_RQ(runq, pp);
erts_runq_unlock(runq);
*curr_port_pp = pp;
if (erts_sched_stat.enabled) {
Uint old = ERTS_PORT_SCHED_ID(pp, esdp->no);
int migrated = old && old != esdp->no;
erts_spin_lock(&erts_sched_stat.lock);
erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_executed++;
erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].executed++;
if (migrated) {
erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_migrated++;
erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].migrated++;
}
erts_spin_unlock(&erts_sched_stat.lock);
}
prepare_exec(pp, &execq, &processing_busy_q);
erts_port_lock(pp);
/* trace port scheduling, in */
if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
trace_sched_ports(pp, am_in);
}
fpe_was_unmasked = erts_block_fpe();
state = erts_atomic32_read_nob(&pp->state);
pp->reds = ERTS_PORT_REDS_EXECUTE;
ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
goto begin_handle_tasks;
while (1) {
erts_aint32_t task_state;
ErtsPortTask *ptp;
ptp = select_task_for_exec(pp, &execq, &processing_busy_q);
if (!ptp)
break;
task_state = erts_atomic32_cmpxchg_nob(&ptp->state,
ERTS_PT_STATE_EXECUTING,
ERTS_PT_STATE_SCHEDULED);
if (task_state != ERTS_PT_STATE_SCHEDULED) {
ASSERT(task_state == ERTS_PT_STATE_ABORTED);
goto aborted_port_task;
}
if (erts_system_monitor_long_schedule != 0) {
start_time = erts_timestamp_millis();
}
ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
ERTS_CHK_NO_PROC_LOCKS;
ASSERT(pp->drv_ptr);
switch (ptp->type) {
case ERTS_PORT_TASK_TIMEOUT:
reset_handle(ptp);
if (!ERTS_PTMR_IS_TIMED_OUT(pp))
reds = 0;
else {
ERTS_PTMR_CLEAR(pp);
reds = ERTS_PORT_REDS_TIMEOUT;
if (!(state & ERTS_PORT_SFLGS_DEAD)) {
DTRACE_DRIVER(driver_timeout, pp);
LTTNG_DRIVER(driver_timeout, pp);
if (IS_TRACED_FL(pp, F_TRACE_RECEIVE))
trace_port(pp, am_receive, am_timeout);
(*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data);
}
}
break;
case ERTS_PORT_TASK_INPUT:
reds = ERTS_PORT_REDS_INPUT;
ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_ready_input, pp);
LTTNG_DRIVER(driver_ready_input, pp);
/* NOTE some windows drivers use ->ready_input
for input and output */
(*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data,
ptp->u.alive.td.io.event);
reset_executed_io_task_handle(ptp);
io_tasks_executed++;
break;
case ERTS_PORT_TASK_OUTPUT:
reds = ERTS_PORT_REDS_OUTPUT;
ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_ready_output, pp);
LTTNG_DRIVER(driver_ready_output, pp);
(*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data,
ptp->u.alive.td.io.event);
reset_executed_io_task_handle(ptp);
io_tasks_executed++;
break;
case ERTS_PORT_TASK_PROC_SIG: {
ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
reset_handle(ptp);
ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
if (!pp->sched.taskq.bpq)
reds = ptp->u.alive.td.psig.callback(pp,
state,
ERTS_PROC2PORT_SIG_EXEC,
sigdp);
else {
ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
reds = ptp->u.alive.td.psig.callback(pp,
state,
ERTS_PROC2PORT_SIG_EXEC,
sigdp);
dequeued_proc2port_data(pp, size);
}
break;
}
case ERTS_PORT_TASK_DIST_CMD:
reset_handle(ptp);
reds = erts_dist_command(pp, CONTEXT_REDS - pp->reds);
break;
default:
erts_exit(ERTS_ABORT_EXIT,
"Invalid port task type: %d\n",
(int) ptp->type);
break;
}
reds += erts_port_driver_callback_epilogue(pp, &state);
if (start_time != 0) {
Sint64 diff = erts_timestamp_millis() - start_time;
if (diff > 0 && (Uint) diff > erts_system_monitor_long_schedule) {
monitor_long_schedule_port(pp,ptp->type,(Uint) diff);
}
}
start_time = 0;
aborted_port_task:
schedule_port_task_free(ptp);
begin_handle_tasks:
if (state & ERTS_PORT_SFLG_FREE) {
reds += ERTS_PORT_REDS_FREE;
begin_port_cleanup(pp, &execq, &processing_busy_q);
break;
}
vreds += ERTS_PORT_CALLBACK_VREDS;
reds += ERTS_PORT_CALLBACK_VREDS;
pp->reds += reds;
reds = 0;
if (pp->reds >= CONTEXT_REDS)
break;
}
erts_unblock_fpe(fpe_was_unmasked);
ERTS_MSACC_POP_STATE_M();
ASSERT(runq == (ErtsRunQueue *) erts_atomic_read_nob(&pp->run_queue));
active = finalize_exec(pp, &execq, processing_busy_q);
reds = pp->reds - vreds;
erts_port_release(pp);
*curr_port_pp = NULL;
erts_runq_lock(runq);
if (active) {
ErtsRunQueue *xrunq;
ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
ERTS_LC_ASSERT(runq != xrunq);
ERTS_LC_VERIFY_RQ(runq, pp);
if (!xrunq) {
enqueue_port(runq, pp);
/* No need to notify ourselves about inc in runq. */
}
else {
/* Emigrate port... */
erts_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq);
erts_runq_unlock(runq);
xrunq = erts_port_runq(pp);
ASSERT(xrunq);
enqueue_port(xrunq, pp);
erts_runq_unlock(xrunq);
erts_notify_inc_runq(xrunq);
erts_runq_lock(runq);
}
}
done:
runq->scheduler->reductions += reds;
ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
ERTS_PORT_REDUCTIONS_EXECUTED(esdp, runq, reds);
}
static void
release_port(void *vport)
{
erts_port_dec_refc((Port *) vport);
}
static void
schedule_release_port(void *vport) {
Port *pp = (Port*)vport;
/* This is only used when a port release was ordered from a non-scheduler */
erts_schedule_thr_prgr_later_op(release_port,
(void *) pp,
&pp->common.u.release);
}
static void
begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
{
int i, max;
ErtsPortTaskBusyCallerTable *tabp;
ErtsPortTask *qs[3];
ErtsPortTaskHandleList *free_nshp = NULL;
ErtsProcList *plp;
ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
/*
* Abort remaining tasks...
*
* We want to process queues in the following order in order
* to preserve signal ordering guarantees:
* 1. Local busy queue
* 2. Local queue
* 3. In queue
*/
max = 0;
if (!execqp) {
if (pp->sched.taskq.local.busy.first)
qs[max++] = pp->sched.taskq.local.busy.first;
if (pp->sched.taskq.local.first)
qs[max++] = pp->sched.taskq.local.first;
}
else {
if (*processing_busy_q_p) {
if (*execqp)
qs[max++] = *execqp;
if (pp->sched.taskq.local.first)
qs[max++] = pp->sched.taskq.local.first;
}
else {
if (pp->sched.taskq.local.busy.first)
qs[max++] = pp->sched.taskq.local.busy.first;
if (*execqp)
qs[max++] = *execqp;
}
*execqp = NULL;
*processing_busy_q_p = 0;
}
pp->sched.taskq.local.busy.first = NULL;
pp->sched.taskq.local.busy.last = NULL;
pp->sched.taskq.local.first = NULL;
tabp = pp->sched.taskq.local.busy.table;
if (tabp) {
int bix;
for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
ErtsPortTaskBusyCaller *bcp = tabp->bucket[bix];
while (bcp) {
ErtsPortTaskBusyCaller *free_bcp = bcp;
bcp = bcp->next;
if (free_bcp != &tabp->pre_alloc_busy_caller)
erts_free(ERTS_ALC_T_BUSY_CALLER, free_bcp);
}
}
busy_caller_table_free(tabp);
pp->sched.taskq.local.busy.table = NULL;
}
erts_port_task_sched_lock(&pp->sched);
qs[max] = pp->sched.taskq.in.first;
pp->sched.taskq.in.first = NULL;
pp->sched.taskq.in.last = NULL;
erts_port_task_sched_unlock(&pp->sched);
if (qs[max])
max++;
for (i = 0; i < max; i++) {
while (1) {
erts_aint32_t state;
ErtsPortTask *ptp = qs[i];
if (!ptp)
break;
qs[i] = ptp->u.alive.next;
/* Normal case here is aborted tasks... */
state = erts_atomic32_read_nob(&ptp->state);
if (state == ERTS_PT_STATE_ABORTED)
goto aborted_port_task;
state = erts_atomic32_cmpxchg_nob(&ptp->state,
ERTS_PT_STATE_EXECUTING,
ERTS_PT_STATE_SCHEDULED);
if (state != ERTS_PT_STATE_SCHEDULED) {
ASSERT(state == ERTS_PT_STATE_ABORTED);
goto aborted_port_task;
}
reset_handle(ptp);
switch (ptp->type) {
case ERTS_PORT_TASK_TIMEOUT:
break;
case ERTS_PORT_TASK_INPUT:
erts_stale_drv_select(pp->common.id,
ERTS_Port2ErlDrvPort(pp),
ptp->u.alive.td.io.event,
DO_READ,
1);
break;
case ERTS_PORT_TASK_OUTPUT:
erts_stale_drv_select(pp->common.id,
ERTS_Port2ErlDrvPort(pp),
ptp->u.alive.td.io.event,
DO_WRITE,
1);
break;
case ERTS_PORT_TASK_DIST_CMD:
break;
case ERTS_PORT_TASK_PROC_SIG: {
ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
if (!pp->sched.taskq.bpq)
ptp->u.alive.td.psig.callback(NULL,
ERTS_PORT_SFLG_INVALID,
ERTS_PROC2PORT_SIG_ABORT_CLOSED,
sigdp);
else {
ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
ptp->u.alive.td.psig.callback(NULL,
ERTS_PORT_SFLG_INVALID,
ERTS_PROC2PORT_SIG_ABORT_CLOSED,
sigdp);
aborted_proc2port_data(pp, size);
}
break;
}
default:
erts_exit(ERTS_ABORT_EXIT,
"Invalid port task type: %d\n",
(int) ptp->type);
}
aborted_port_task:
schedule_port_task_free(ptp);
}
}
erts_atomic32_read_band_nob(&pp->sched.flags,
~(ERTS_PTS_FLG_HAVE_BUSY_TASKS
|ERTS_PTS_FLG_HAVE_TASKS
|ERTS_PTS_FLGS_BUSY));
erts_port_task_sched_lock(&pp->sched);
/* Cleanup nosuspend handles... */
free_nshp = (pp->sched.taskq.local.busy.nosuspend
? get_free_nosuspend_handles(pp)
: NULL);
ASSERT(!pp->sched.taskq.local.busy.nosuspend);
/* Make sure not to leave any processes suspended on the port... */
plp = pp->suspended;
pp->suspended = NULL;
erts_port_task_sched_unlock(&pp->sched);
if (free_nshp)
free_nosuspend_handles(free_nshp);
if (erts_proclist_fetch(&plp, NULL)) {
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(process_port_unblocked)) {
DTRACE_CHARBUF(port_str, 16);
DTRACE_CHARBUF(pid_str, 16);
ErtsProcList* plp2 = plp;
erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", pp->common.id);
while (plp2 != NULL) {
erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), "%T", plp2->pid);
DTRACE2(process_port_unblocked, pid_str, port_str);
}
}
#endif
erts_resume_processes(plp);
}
/*
* Schedule cleanup of port structure...
*/
/* We might not be a scheduler, eg. traceing to port we are sys_msg_dispatcher */
if (!erts_get_scheduler_data()) {
erts_schedule_misc_aux_work(1, schedule_release_port, (void*)pp);
} else {
/* Has to be more or less immediate to release any driver */
erts_schedule_thr_prgr_later_op(release_port,
(void *) pp,
&pp->common.u.release);
}
}
void
erts_enqueue_port(ErtsRunQueue *rq, Port *pp)
{
ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
ASSERT(rq == (ErtsRunQueue *) erts_atomic_read_nob(&pp->run_queue));
ASSERT(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_IN_RUNQ);
enqueue_port(rq, pp);
}
Port *
erts_dequeue_port(ErtsRunQueue *rq)
{
Port *pp;
ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
pp = pop_port(rq);
ASSERT(!pp
|| rq == (ErtsRunQueue *) erts_atomic_read_nob(&pp->run_queue));
ASSERT(!pp || (erts_atomic32_read_nob(&pp->sched.flags)
& ERTS_PTS_FLG_IN_RUNQ));
return pp;
}
/*
* Initialize the module.
*/
void
erts_port_task_init(void)
{
init_port_task_alloc(erts_no_schedulers + erts_no_poll_threads
+ 1); /* aux_thread */
init_busy_caller_table_alloc();
}