aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_port_task.c
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2012-12-07 00:28:50 +0100
committerRickard Green <[email protected]>2012-12-07 00:28:50 +0100
commit6027f852217f0014f1892fbbfa45c69e104da652 (patch)
treebdb10abcf579b5607dc287b395fb15fa667b0512 /erts/emulator/beam/erl_port_task.c
parentd29c460c4ad1ca0cc2fb6a13a81b4ccc07516941 (diff)
parent3f85767e086e08b70baee34d719df9a8bc8814f4 (diff)
downloadotp-6027f852217f0014f1892fbbfa45c69e104da652.tar.gz
otp-6027f852217f0014f1892fbbfa45c69e104da652.tar.bz2
otp-6027f852217f0014f1892fbbfa45c69e104da652.zip
Merge branch 'rickard/port-optimizations/OTP-10336' into rickard/r16/port-optimizations/OTP-10336
* rickard/port-optimizations/OTP-10336: Change annotate level for emacs-22 in cerl Update etp-commands Add documentation on communication in Erlang Add support for busy port message queue Add driver callback epilogue Implement true asynchronous signaling between processes and ports Add erl_drv_[send|output]_term Move busy port flag Use rwlock for driver list Optimize management of port tasks Improve configuration of process and port tables Remove R9 compatibility features Use ptab functionality also for ports Prepare for use of ptab functionality also for ports Atomic port state Generalize process table implementation Implement functionality for delaying thread progress from unmanaged threads Conflicts: erts/doc/src/erl_driver.xml erts/doc/src/erlang.xml erts/emulator/beam/beam_bif_load.c erts/emulator/beam/beam_bp.c erts/emulator/beam/beam_emu.c erts/emulator/beam/bif.c erts/emulator/beam/copy.c erts/emulator/beam/erl_alloc.c erts/emulator/beam/erl_alloc.types erts/emulator/beam/erl_bif_info.c erts/emulator/beam/erl_bif_port.c erts/emulator/beam/erl_bif_trace.c erts/emulator/beam/erl_init.c erts/emulator/beam/erl_message.c erts/emulator/beam/erl_port_task.c erts/emulator/beam/erl_process.c erts/emulator/beam/erl_process.h erts/emulator/beam/erl_process_lock.c erts/emulator/beam/erl_trace.c erts/emulator/beam/export.h erts/emulator/beam/global.h erts/emulator/beam/io.c erts/emulator/sys/unix/sys.c erts/emulator/sys/vxworks/sys.c erts/emulator/test/port_SUITE.erl erts/etc/unix/cerl.src erts/preloaded/ebin/erlang.beam erts/preloaded/ebin/prim_inet.beam erts/preloaded/src/prim_inet.erl lib/hipe/cerl/erl_bif_types.erl lib/kernel/doc/src/inet.xml lib/kernel/src/inet.erl
Diffstat (limited to 'erts/emulator/beam/erl_port_task.c')
-rw-r--r--erts/emulator/beam/erl_port_task.c2036
1 files changed, 1463 insertions, 573 deletions
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index b6bc59a1c3..b661c26036 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -33,36 +33,29 @@
#include "erl_port_task.h"
#include "dist.h"
#include "dtrace-wrapper.h"
+#include <stdarg.h>
#if defined(DEBUG) && 0
-#define HARD_DEBUG
+#define ERTS_HARD_DEBUG_TASK_QUEUES
+#else
+#undef ERTS_HARD_DEBUG_TASK_QUEUES
#endif
-/*
- * Costs in reductions for some port operations.
- */
-#define ERTS_PORT_REDS_EXECUTE 0
-#define ERTS_PORT_REDS_FREE 50
-#define ERTS_PORT_REDS_TIMEOUT 200
-#define ERTS_PORT_REDS_INPUT 200
-#define ERTS_PORT_REDS_OUTPUT 200
-#define ERTS_PORT_REDS_EVENT 200
-#define ERTS_PORT_REDS_TERMINATE 100
-
-
-#define ERTS_PORT_TASK_INVALID_PORT(P, ID) \
- ((erts_port_status_get((P)) & ERTS_PORT_SFLGS_DEAD) || (P)->id != (ID))
-
-#define ERTS_PORT_IS_IN_RUNQ(RQ, P) \
- ((P)->sched.next || (P)->sched.prev || (RQ)->ports.start == (P))
+#ifdef ERTS_HARD_DEBUG_TASK_QUEUES
+static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue);
+#define ERTS_PT_DBG_CHK_TASK_QS(PP, EQ, PBQ) \
+ chk_task_queues((PP), (EQ), (PBQ))
+#else
+#define ERTS_PT_DBG_CHK_TASK_QS(PP, EQ, PBQ)
+#endif
#ifdef USE_VM_PROBES
#define DTRACE_DRIVER(PROBE_NAME, PP) \
- if (DTRACE_ENABLED(driver_ready_input)) { \
+ if (DTRACE_ENABLED(PROBE_NAME)) { \
DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE); \
DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); \
\
- dtrace_pid_str(PP->connected, process_str); \
+ dtrace_pid_str(ERTS_PORT_GET_CONNECTED(PP), process_str); \
dtrace_port_str(PP, port_str); \
DTRACE3(PROBE_NAME, process_str, port_str, PP->name); \
}
@@ -72,83 +65,766 @@
erts_smp_atomic_t erts_port_task_outstanding_io_tasks;
-struct ErtsPortTaskQueue_ {
- ErtsPortTask *first;
- ErtsPortTask *last;
- Port *port;
-};
+#define ERTS_PT_STATE_SCHEDULED 0
+#define ERTS_PT_STATE_ABORTED 1
+#define ERTS_PT_STATE_EXECUTING 2
+
+typedef union {
+ struct { /* I/O tasks */
+ ErlDrvEvent event;
+ ErlDrvEventData event_data;
+ } io;
+ struct {
+ ErtsProc2PortSigCallback callback;
+ ErtsProc2PortSigData data;
+ } psig;
+} ErtsPortTaskTypeData;
struct ErtsPortTask_ {
- ErtsPortTask *prev;
- ErtsPortTask *next;
- ErtsPortTaskQueue *queue;
- ErtsPortTaskHandle *handle;
+ erts_smp_atomic32_t state;
ErtsPortTaskType type;
- ErlDrvEvent event;
- ErlDrvEventData event_data;
+ union {
+ struct {
+ ErtsPortTask *next;
+ ErtsPortTaskHandle *handle;
+ int flags;
+ Uint32 ref[ERTS_MAX_REF_NUMBERS];
+ ErtsPortTaskTypeData td;
+ } alive;
+ ErtsThrPrgrLaterOp release;
+ } u;
};
-#ifdef HARD_DEBUG
-#define ERTS_PT_CHK_PORTQ(RQ) check_port_queue((RQ), NULL, 0)
-#define ERTS_PT_CHK_PRES_PORTQ(RQ, PP) check_port_queue((RQ), (PP), -1)
-#define ERTS_PT_CHK_IN_PORTQ(RQ, PP) check_port_queue((RQ), (PP), 1)
-#define ERTS_PT_CHK_NOT_IN_PORTQ(RQ, PP) check_port_queue((RQ), (PP), 0)
-#define ERTS_PT_CHK_TASKQ(Q) check_task_queue((Q), NULL, 0)
-#define ERTS_PT_CHK_IN_TASKQ(Q, T) check_task_queue((Q), (T), 1)
-#define ERTS_PT_CHK_NOT_IN_TASKQ(Q, T) check_task_queue((Q), (T), 0)
-static void
-check_port_queue(Port *chk_pp, int inq);
-static void
-check_task_queue(ErtsPortTaskQueue *ptqp,
- ErtsPortTask *chk_ptp,
- int inq);
-#else
-#define ERTS_PT_CHK_PORTQ(RQ)
-#define ERTS_PT_CHK_PRES_PORTQ(RQ, PP)
-#define ERTS_PT_CHK_IN_PORTQ(RQ, PP)
-#define ERTS_PT_CHK_NOT_IN_PORTQ(RQ, PP)
-#define ERTS_PT_CHK_TASKQ(Q)
-#define ERTS_PT_CHK_IN_TASKQ(Q, T)
-#define ERTS_PT_CHK_NOT_IN_TASKQ(Q, T)
+struct ErtsPortTaskHandleList_ {
+ ErtsPortTaskHandle handle;
+ union {
+ ErtsPortTaskHandleList *next;
+#ifdef ERTS_SMP
+ ErtsThrPrgrLaterOp release;
#endif
+ } u;
+};
+
+typedef struct ErtsPortTaskBusyCaller_ ErtsPortTaskBusyCaller;
+struct ErtsPortTaskBusyCaller_ {
+ ErtsPortTaskBusyCaller *next;
+ Eterm caller;
+ SWord count;
+ ErtsPortTask *last;
+};
+
+#define ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS 17
+struct ErtsPortTaskBusyCallerTable_ {
+ ErtsPortTaskBusyCaller *bucket[ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS];
+ ErtsPortTaskBusyCaller pre_alloc_busy_caller;
+};
+
-static void handle_remaining_tasks(ErtsRunQueue *runq, Port *pp);
+static void begin_port_cleanup(Port *pp, ErtsPortTask **execq);
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(port_task,
ErtsPortTask,
- 200,
+ 1000,
ERTS_ALC_T_PORT_TASK)
-ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(port_taskq,
- ErtsPortTaskQueue,
+
+ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(busy_caller_table,
+ ErtsPortTaskBusyCallerTable,
50,
- ERTS_ALC_T_PORT_TASKQ)
+ ERTS_ALC_T_BUSY_CALLER_TAB)
+
+#ifdef ERTS_SMP
+static void
+call_port_task_free(void *vptp)
+{
+ port_task_free((ErtsPortTask *) vptp);
+}
+#endif
+
+static ERTS_INLINE void
+schedule_port_task_free(ErtsPortTask *ptp)
+{
+#ifdef ERTS_SMP
+ erts_schedule_thr_prgr_later_op(call_port_task_free,
+ (void *) ptp,
+ &ptp->u.release);
+#else
+ port_task_free(ptp);
+#endif
+}
+
+static ERTS_INLINE ErtsPortTask *
+p2p_sig_data_to_task(ErtsProc2PortSigData *sigdp)
+{
+ ErtsPortTask *ptp;
+ char *ptr = (char *) sigdp;
+ ptr -= offsetof(ErtsPortTask, u.alive.td.psig.data);
+ ptp = (ErtsPortTask *) ptr;
+ ASSERT(ptp->type == ERTS_PORT_TASK_PROC_SIG);
+ return ptp;
+}
+
+ErtsProc2PortSigData *
+erts_port_task_alloc_p2p_sig_data(void)
+{
+ ErtsPortTask *ptp = port_task_alloc();
+
+ ptp->type = ERTS_PORT_TASK_PROC_SIG;
+ ptp->u.alive.flags = ERTS_PT_FLG_SIG_DEP;
+ erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);
+
+ ASSERT(ptp == p2p_sig_data_to_task(&ptp->u.alive.td.psig.data));
+
+ return &ptp->u.alive.td.psig.data;
+}
+
+static ERTS_INLINE Eterm
+task_caller(ErtsPortTask *ptp)
+{
+ Eterm caller;
+
+ ASSERT(ptp->type == ERTS_PORT_TASK_PROC_SIG);
+
+ caller = ptp->u.alive.td.psig.data.caller;
+
+ ASSERT(is_internal_pid(caller) || is_internal_port(caller));
+
+ return caller;
+}
+
+/*
+ * Busy queue management
+ */
+
+static ERTS_INLINE int
+caller2bix(Eterm caller)
+{
+ ASSERT(is_internal_pid(caller) || is_internal_port(caller));
+ return (int) (_GET_PID_DATA(caller) % ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS);
+}
+
+
+static void
+popped_from_busy_queue(Port *pp, ErtsPortTask *ptp, int last)
+{
+ ErtsPortTaskBusyCaller **prev_bcpp = NULL, *bcp;
+ ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
+ Eterm caller = task_caller(ptp);
+ int bix = caller2bix(caller);
+
+ ASSERT(is_internal_pid(caller));
+
+ ASSERT(tabp);
+ bcp = tabp->bucket[bix];
+ prev_bcpp = &tabp->bucket[bix];
+ ASSERT(bcp);
+ while (bcp->caller != caller) {
+ prev_bcpp = &bcp->next;
+ bcp = bcp->next;
+ ASSERT(bcp);
+ }
+ ASSERT(bcp->count > 0);
+ if (--bcp->count != 0) {
+ ASSERT(!last);
+ }
+ else {
+ *prev_bcpp = bcp->next;
+ if (bcp == &tabp->pre_alloc_busy_caller)
+ bcp->caller = am_undefined;
+ else
+ erts_free(ERTS_ALC_T_BUSY_CALLER, bcp);
+ if (last) {
+#ifdef DEBUG
+ erts_aint32_t flags =
+#endif
+ erts_smp_atomic32_read_band_nob(
+ &pp->sched.flags,
+ ~ERTS_PTS_FLG_HAVE_BUSY_TASKS);
+ ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
+#ifdef DEBUG
+ for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
+ ASSERT(!tabp->bucket[bix]);
+ }
+#endif
+ busy_caller_table_free(tabp);
+ pp->sched.taskq.local.busy.first = NULL;
+ pp->sched.taskq.local.busy.last = NULL;
+ pp->sched.taskq.local.busy.table = NULL;
+ }
+ }
+}
+
+static void
+busy_wait_move_to_busy_queue(Port *pp, ErtsPortTask *ptp)
+{
+ ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
+ Eterm caller = task_caller(ptp);
+ ErtsPortTaskBusyCaller *bcp;
+ int bix;
+
+ ASSERT(is_internal_pid(caller));
+ /*
+ * Port is busy and this task type needs to wait until not busy.
+ */
+
+ ASSERT(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY);
+
+ ptp->u.alive.next = NULL;
+ if (pp->sched.taskq.local.busy.last) {
+ ASSERT(pp->sched.taskq.local.busy.first);
+ pp->sched.taskq.local.busy.last->u.alive.next = ptp;
+ }
+ else {
+ int i;
+ erts_aint32_t flags;
+
+ pp->sched.taskq.local.busy.first = ptp;
+ flags = erts_smp_atomic32_read_bor_nob(&pp->sched.flags,
+ ERTS_PTS_FLG_HAVE_BUSY_TASKS);
+ ASSERT(!(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS));
+
+ ASSERT(!tabp);
+
+ tabp = busy_caller_table_alloc();
+ pp->sched.taskq.local.busy.table = tabp;
+ for (i = 0; i < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; i++)
+ tabp->bucket[i] = NULL;
+ tabp->pre_alloc_busy_caller.caller = am_undefined;
+ }
+ pp->sched.taskq.local.busy.last = ptp;
+
+ bix = caller2bix(caller);
+ ASSERT(tabp);
+ bcp = tabp->bucket[bix];
+
+ while (bcp && bcp->caller != caller)
+ bcp = bcp->next;
+
+ if (bcp)
+ bcp->count++;
+ else {
+ if (tabp->pre_alloc_busy_caller.caller == am_undefined)
+ bcp = &tabp->pre_alloc_busy_caller;
+ else
+ bcp = erts_alloc(ERTS_ALC_T_BUSY_CALLER,
+ sizeof(ErtsPortTaskBusyCaller));
+ bcp->caller = caller;
+ bcp->count = 1;
+ bcp->next = tabp->bucket[bix];
+ tabp->bucket[bix] = bcp;
+ }
+
+ bcp->last = ptp;
+}
+
+static ERTS_INLINE int
+check_sig_dep_move_to_busy_queue(Port *pp, ErtsPortTask *ptp)
+{
+ ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
+ ErtsPortTask *last_ptp;
+ ErtsPortTaskBusyCaller *bcp;
+ int bix;
+ Eterm caller;
+
+ ASSERT(ptp->u.alive.flags & ERTS_PT_FLG_SIG_DEP);
+ ASSERT(pp->sched.taskq.local.busy.last);
+ ASSERT(tabp);
+
+
+ /*
+ * We are either not busy, or the task does not imply wait on busy port.
+ * However, due to the signaling order requirements the task might depend
+ * on other tasks in the busy queue.
+ */
+
+ caller = task_caller(ptp);
+ bix = caller2bix(caller);
+ bcp = tabp->bucket[bix];
+ while (bcp && bcp->caller != caller)
+ bcp = bcp->next;
+
+ if (!bcp)
+ return 0;
+
+ /*
+ * There are other tasks that we depend on in the busy queue;
+ * move into busy queue.
+ */
+
+ bcp->count++;
+ last_ptp = bcp->last;
+ ptp->u.alive.next = last_ptp->u.alive.next;
+ if (!ptp->u.alive.next) {
+ ASSERT(pp->sched.taskq.local.busy.last == last_ptp);
+ pp->sched.taskq.local.busy.last = ptp;
+ }
+ last_ptp->u.alive.next = ptp;
+ bcp->last = ptp;
+
+ return 1;
+}
+
+static void
+no_sig_dep_move_from_busyq(Port *pp)
+{
+ ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
+ ErtsPortTask *first_ptp, *last_ptp, *ptp;
+ ErtsPortTaskBusyCaller **prev_bcpp = NULL, *bcp = NULL;
+
+ /*
+ * Move tasks at the head of the busy queue that no longer
+ * have any dependencies to busy wait tasks into the ordinary
+ * queue.
+ */
+
+ first_ptp = ptp = pp->sched.taskq.local.busy.first;
+
+ ASSERT(ptp && !(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY));
+ ASSERT(tabp);
+
+ do {
+ Eterm caller = task_caller(ptp);
+
+ if (!bcp || bcp->caller != caller) {
+ int bix = caller2bix(caller);
+
+ prev_bcpp = &tabp->bucket[bix];
+ bcp = tabp->bucket[bix];
+ ASSERT(bcp);
+ while (bcp->caller != caller) {
+ ASSERT(bcp);
+ prev_bcpp = &bcp->next;
+ bcp = bcp->next;
+ }
+ }
+
+ ASSERT(bcp->caller == caller);
+ ASSERT(bcp->count > 0);
+
+ if (--bcp->count == 0) {
+ *prev_bcpp = bcp->next;
+ if (bcp == &tabp->pre_alloc_busy_caller)
+ bcp->caller = am_undefined;
+ else
+ erts_free(ERTS_ALC_T_BUSY_CALLER, bcp);
+ }
+
+ last_ptp = ptp;
+ ptp = ptp->u.alive.next;
+ } while (ptp && !(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY));
+
+ pp->sched.taskq.local.busy.first = last_ptp->u.alive.next;
+ if (!pp->sched.taskq.local.busy.first) {
+#ifdef DEBUG
+ int bix;
+ erts_aint32_t flags =
+#endif
+ erts_smp_atomic32_read_band_nob(
+ &pp->sched.flags,
+ ~ERTS_PTS_FLG_HAVE_BUSY_TASKS);
+ ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
+#ifdef DEBUG
+ for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
+ ASSERT(!tabp->bucket[bix]);
+ }
+#endif
+ busy_caller_table_free(tabp);
+ pp->sched.taskq.local.busy.last = NULL;
+ pp->sched.taskq.local.busy.table = NULL;
+ }
+ last_ptp->u.alive.next = pp->sched.taskq.local.first;
+ pp->sched.taskq.local.first = first_ptp;
+}
+
+#ifdef ERTS_HARD_DEBUG_TASK_QUEUES
+
+static void
+chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue)
+{
+ Sint tot_count, tot_table_count;
+ int bix;
+ ErtsPortTask *ptp, *last;
+ ErtsPortTask *first = processing_busy_queue ? execq : pp->sched.taskq.local.busy.first;
+ ErtsPortTask *nb_task_queue = processing_busy_queue ? pp->sched.taskq.local.first : execq;
+ ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
+ ErtsPortTaskBusyCaller *bcp;
+
+ if (!first) {
+ ASSERT(!tabp);
+ ASSERT(!pp->sched.taskq.local.busy.last);
+ ASSERT(!(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS));
+ return;
+ }
+
+ ASSERT(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
+ ASSERT(tabp);
+
+ tot_count = 0;
+ ptp = first;
+ while (ptp) {
+ Sint count = 0;
+ Eterm caller = task_caller(ptp);
+ int bix = caller2bix(caller);
+ for (bcp = tabp->bucket[bix]; bcp; bcp = bcp->next)
+ if (bcp->caller == caller)
+ break;
+ ASSERT(bcp && bcp->caller == caller);
+
+ ASSERT(bcp->last);
+ while (1) {
+ ErtsPortTask *ptp2;
+
+ ASSERT(caller == task_caller(ptp));
+ count++;
+ tot_count++;
+ last = ptp;
+
+ for (ptp2 = nb_task_queue; ptp2; ptp2 = ptp2->u.alive.next) {
+ ASSERT(ptp != ptp2);
+ }
+
+ if (ptp == bcp->last)
+ break;
+ ptp = ptp->u.alive.next;
+ }
+
+ ASSERT(count == bcp->count);
+ ptp = ptp->u.alive.next;
+ }
+
+ tot_table_count = 0;
+ for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
+ for (bcp = tabp->bucket[bix]; bcp; bcp = bcp->next)
+ tot_table_count += bcp->count;
+ }
+
+ ASSERT(tot_count == tot_table_count);
+
+ ASSERT(last == pp->sched.taskq.local.busy.last);
+}
+
+#endif /* ERTS_HARD_DEBUG_TASK_QUEUES */
/*
* Task handle manipulation.
*/
+static ERTS_INLINE void
+reset_port_task_handle(ErtsPortTaskHandle *pthp)
+{
+ erts_smp_atomic_set_relb(pthp, (erts_aint_t) NULL);
+}
+
static ERTS_INLINE ErtsPortTask *
handle2task(ErtsPortTaskHandle *pthp)
{
- return (ErtsPortTask *) erts_smp_atomic_read_nob(pthp);
+ return (ErtsPortTask *) erts_smp_atomic_read_acqb(pthp);
}
static ERTS_INLINE void
reset_handle(ErtsPortTask *ptp)
{
- if (ptp->handle) {
- ASSERT(ptp == handle2task(ptp->handle));
- erts_smp_atomic_set_nob(ptp->handle, (erts_aint_t) NULL);
+ if (ptp->u.alive.handle) {
+ ASSERT(ptp == handle2task(ptp->u.alive.handle));
+ reset_port_task_handle(ptp->u.alive.handle);
}
}
static ERTS_INLINE void
set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
{
- ptp->handle = pthp;
+ ptp->u.alive.handle = pthp;
if (pthp) {
- erts_smp_atomic_set_nob(pthp, (erts_aint_t) ptp);
- ASSERT(ptp == handle2task(ptp->handle));
+ erts_smp_atomic_set_relb(pthp, (erts_aint_t) ptp);
+ ASSERT(ptp == handle2task(ptp->u.alive.handle));
+ }
+}
+
+
+/*
+ * Busy port queue management
+ */
+
+static erts_aint32_t
+check_unset_busy_port_q(Port *pp,
+ erts_aint32_t flags,
+ ErtsPortTaskBusyPortQ *bpq)
+{
+ ErlDrvSizeT qsize, low;
+ int resume_procs = 0;
+
+ ASSERT(bpq);
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+
+ erts_port_task_sched_lock(&pp->sched);
+ qsize = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size);
+ low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low);
+ if (qsize < low) {
+ erts_aint32_t mask = ~(ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
+ | ERTS_PTS_FLG_BUSY_PORT_Q);
+ flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags, mask);
+ if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
+ resume_procs = 1;
+ }
+ else if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) {
+ flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags,
+ ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
+ flags &= ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
+ }
+ erts_port_task_sched_unlock(&pp->sched);
+ if (resume_procs)
+ erts_port_resume_procs(pp);
+
+ return flags;
+}
+
+static ERTS_INLINE void
+aborted_proc2port_data(Port *pp, ErlDrvSizeT size)
+{
+ ErtsPortTaskBusyPortQ *bpq;
+ erts_aint32_t flags;
+ ErlDrvSizeT qsz;
+
+ ASSERT(pp->sched.taskq.bpq);
+
+ if (size == 0)
+ return;
+
+ bpq = pp->sched.taskq.bpq;
+
+ qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size,
+ (erts_aint_t) -size);
+ ASSERT(qsz + size > qsz);
+ flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ ASSERT(pp->sched.taskq.bpq);
+ if ((flags & (ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
+ | ERTS_PTS_FLG_BUSY_PORT_Q)) != ERTS_PTS_FLG_BUSY_PORT_Q)
+ return;
+ if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low))
+ erts_smp_atomic32_read_bor_nob(&pp->sched.flags,
+ ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
+}
+
+static ERTS_INLINE void
+dequeued_proc2port_data(Port *pp, ErlDrvSizeT size)
+{
+ ErtsPortTaskBusyPortQ *bpq;
+ erts_aint32_t flags;
+ ErlDrvSizeT qsz;
+
+ ASSERT(pp->sched.taskq.bpq);
+
+ if (size == 0)
+ return;
+
+ bpq = pp->sched.taskq.bpq;
+
+ qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size,
+ (erts_aint_t) -size);
+ ASSERT(qsz + size > qsz);
+ flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q))
+ return;
+ if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->low))
+ check_unset_busy_port_q(pp, flags, bpq);
+}
+
+static ERTS_INLINE erts_aint32_t
+enqueue_proc2port_data(Port *pp,
+ ErtsProc2PortSigData *sigdp,
+ erts_aint32_t flags)
+{
+ ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq;
+ if (sigdp && bpq) {
+ ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
+ if (size) {
+ erts_aint_t asize = erts_smp_atomic_add_read_acqb(&bpq->size,
+ (erts_aint_t) size);
+ ErlDrvSizeT qsz = (ErlDrvSizeT) asize;
+
+ ASSERT(qsz - size < qsz);
+
+ if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q) && qsz > bpq->high) {
+ flags = erts_smp_atomic32_read_bor_acqb(&pp->sched.flags,
+ ERTS_PTS_FLG_BUSY_PORT_Q);
+ flags |= ERTS_PTS_FLG_BUSY_PORT_Q;
+ qsz = (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->size);
+ if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low)) {
+ flags = (erts_smp_atomic32_read_bor_relb(
+ &pp->sched.flags,
+ ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q));
+ flags |= ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
+ }
+ }
+ ASSERT(!(flags & ERTS_PTS_FLG_EXIT));
+ }
+ }
+ return flags;
+}
+
+/*
+ * erl_drv_busy_msgq_limits() is called by drivers either reading or
+ * writing the limits.
+ *
+ * A limit of zero is interpreted as a read only request (using a
+ * limit of zero would not be useful). Other values are interpreted
+ * as a write-read request.
+ */
+
+void
+erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp)
+{
+ Port *pp = erts_drvport2port(dport, NULL);
+ ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq;
+ int written = 0, resume_procs = 0;
+ ErlDrvSizeT low, high;
+
+ if (!pp || !bpq) {
+ if (lowp)
+ *lowp = ERL_DRV_BUSY_MSGQ_DISABLED;
+ if (highp)
+ *highp = ERL_DRV_BUSY_MSGQ_DISABLED;
+ return;
+ }
+
+ low = lowp ? *lowp : 0;
+ high = highp ? *highp : 0;
+
+ erts_port_task_sched_lock(&pp->sched);
+
+ if (low == ERL_DRV_BUSY_MSGQ_DISABLED
+ || high == ERL_DRV_BUSY_MSGQ_DISABLED) {
+ /* Disable busy msgq feature */
+ erts_aint32_t flags;
+ pp->sched.taskq.bpq = NULL;
+ flags = ~(ERTS_PTS_FLG_BUSY_PORT_Q|ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
+ flags = erts_smp_atomic32_read_band_acqb(&pp->sched.flags, flags);
+ if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
+ resume_procs = 1;
+ }
+ else {
+
+ if (!low)
+ low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low);
+ else {
+ if (bpq->high < low)
+ bpq->high = low;
+ erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low);
+ written = 1;
+ }
+
+ if (!high)
+ high = bpq->high;
+ else {
+ if (low > high) {
+ low = high;
+ erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low);
+ }
+ bpq->high = high;
+ written = 1;
+ }
+
+ if (written) {
+ ErlDrvSizeT size = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size);
+ if (size > high)
+ erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
+ ERTS_PTS_FLG_BUSY_PORT_Q);
+ else if (size < low)
+ erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
+ ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
+ }
+ }
+
+ erts_port_task_sched_unlock(&pp->sched);
+
+ if (resume_procs)
+ erts_port_resume_procs(pp);
+ if (lowp)
+ *lowp = low;
+ if (highp)
+ *highp = high;
+}
+
+/*
+ * No-suspend handles.
+ */
+
+#ifdef ERTS_SMP
+static void
+free_port_task_handle_list(void *vpthlp)
+{
+ erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp);
+}
+#endif
+
+static void
+schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp)
+{
+#ifdef ERTS_SMP
+ erts_schedule_thr_prgr_later_op(free_port_task_handle_list,
+ (void *) pthlp,
+ &pthlp->u.release);
+#else
+ erts_free(ERTS_ALC_T_PT_HNDL_LIST, pthlp);
+#endif
+}
+
+static ERTS_INLINE void
+abort_nosuspend_task(Port *pp,
+ ErtsPortTaskType type,
+ ErtsPortTaskTypeData *tdp)
+{
+
+ ASSERT(type == ERTS_PORT_TASK_PROC_SIG);
+
+ if (!pp->sched.taskq.bpq)
+ tdp->psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND,
+ &tdp->psig.data);
+ else {
+ ErlDrvSizeT size = erts_proc2port_sig_command_data_size(&tdp->psig.data);
+ tdp->psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND,
+ &tdp->psig.data);
+ aborted_proc2port_data(pp, size);
+ }
+}
+
+static ErtsPortTaskHandleList *
+get_free_nosuspend_handles(Port *pp)
+{
+ ErtsPortTaskHandleList *nshp, *last_nshp = NULL;
+
+ ERTS_SMP_LC_ASSERT(erts_port_task_sched_lock_is_locked(&pp->sched));
+
+ nshp = pp->sched.taskq.local.busy.nosuspend;
+
+ while (nshp && !erts_port_task_is_scheduled(&nshp->handle)) {
+ last_nshp = nshp;
+ nshp = nshp->u.next;
+ }
+
+ if (!last_nshp)
+ nshp = NULL;
+ else {
+ nshp = pp->sched.taskq.local.busy.nosuspend;
+ pp->sched.taskq.local.busy.nosuspend = last_nshp->u.next;
+ last_nshp->u.next = NULL;
+ if (!pp->sched.taskq.local.busy.nosuspend)
+ erts_smp_atomic32_read_band_nob(&pp->sched.flags,
+ ~ERTS_PTS_FLG_HAVE_NS_TASKS);
+ }
+ return nshp;
+}
+
+static void
+free_nosuspend_handles(ErtsPortTaskHandleList *free_nshp)
+{
+ while (free_nshp) {
+ ErtsPortTaskHandleList *nshp = free_nshp;
+ free_nshp = free_nshp->u.next;
+ schedule_port_task_handle_list_free(nshp);
}
}
@@ -161,7 +837,6 @@ enqueue_port(ErtsRunQueue *runq, Port *pp)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
pp->sched.next = NULL;
- pp->sched.in_runq = 1;
if (runq->ports.end) {
ASSERT(runq->ports.start);
runq->ports.end->sched.next = pp;
@@ -199,285 +874,423 @@ pop_port(ErtsRunQueue *runq)
return pp;
}
+/*
+ * Task queue operations
+ */
-#ifdef HARD_DEBUG
+static ERTS_INLINE int
+enqueue_task(Port *pp,
+ ErtsPortTask *ptp,
+ ErtsProc2PortSigData *sigdp,
+ ErtsPortTaskHandleList *ns_pthlp,
+ erts_aint32_t *flagsp)
-static void
-check_port_queue(ErtsRunQueue *runq, Port *chk_pp, int inq)
{
- Port *pp;
- Port *last_pp;
- Port *first_pp = runq->ports.start;
- int no_forward = 0, no_backward = 0;
- int found_forward = 0, found_backward = 0;
- if (!first_pp) {
- ASSERT(!runq->ports.end);
- }
+ int res;
+ erts_aint32_t fail_flags = ERTS_PTS_FLG_EXIT;
+ erts_aint32_t flags;
+ ptp->u.alive.next = NULL;
+ if (ns_pthlp)
+ fail_flags |= ERTS_PTS_FLG_BUSY_PORT;
+ erts_port_task_sched_lock(&pp->sched);
+ flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ if (flags & fail_flags)
+ res = 0;
else {
- ASSERT(!first_pp->sched.prev);
- for (pp = first_pp; pp; pp = pp->sched.next) {
- ASSERT(pp->sched.taskq);
- if (pp->sched.taskq->first)
- no_forward++;
- if (chk_pp == pp)
- found_forward = 1;
- if (!pp->sched.prev) {
- ASSERT(first_pp == pp);
- }
- if (!pp->sched.next) {
- ASSERT(runq->ports.end == pp);
- last_pp = pp;
- }
- }
- for (pp = last_pp; pp; pp = pp->sched.prev) {
- ASSERT(pp->sched.taskq);
- if (pp->sched.taskq->last)
- no_backward++;
- if (chk_pp == pp)
- found_backward = 1;
- if (!pp->sched.prev) {
- ASSERT(first_pp == pp);
- }
- if (!pp->sched.next) {
- ASSERT(runq->ports.end == pp);
- }
- check_task_queue(pp->sched.taskq, NULL, 0);
+ if (ns_pthlp) {
+ ns_pthlp->u.next = pp->sched.taskq.local.busy.nosuspend;
+ pp->sched.taskq.local.busy.nosuspend = ns_pthlp;
}
- ASSERT(no_forward == no_backward);
- }
- ASSERT(no_forward == RUNQ_READ_LEN(&runq->ports.info.len));
- if (chk_pp) {
- if (chk_pp->sched.taskq || chk_pp->sched.exe_taskq) {
- ASSERT(chk_pp->sched.taskq != chk_pp->sched.exe_taskq);
- }
- ASSERT(!chk_pp->sched.taskq || chk_pp->sched.taskq->first);
- if (inq < 0)
- inq = chk_pp->sched.taskq && !chk_pp->sched.exe_taskq;
- if (inq) {
- ASSERT(found_forward && found_backward);
+ if (pp->sched.taskq.in.last) {
+ ASSERT(pp->sched.taskq.in.first);
+ ASSERT(!pp->sched.taskq.in.last->u.alive.next);
+
+ pp->sched.taskq.in.last->u.alive.next = ptp;
}
else {
- ASSERT(!found_forward && !found_backward);
- }
- }
-}
-
-#endif
+ ASSERT(!pp->sched.taskq.in.first);
-/*
- * Task queue operations
- */
-
-static ERTS_INLINE ErtsPortTaskQueue *
-port_taskq_init(ErtsPortTaskQueue *ptqp, Port *pp)
-{
- if (ptqp) {
- ptqp->first = NULL;
- ptqp->last = NULL;
- ptqp->port = pp;
+ pp->sched.taskq.in.first = ptp;
+ }
+ pp->sched.taskq.in.last = ptp;
+ flags = enqueue_proc2port_data(pp, sigdp, flags);
+ res = 1;
}
- return ptqp;
+ erts_port_task_sched_unlock(&pp->sched);
+ *flagsp = flags;
+ return res;
}
static ERTS_INLINE void
-enqueue_task(ErtsPortTaskQueue *ptqp, ErtsPortTask *ptp)
+prepare_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
{
- ERTS_PT_CHK_NOT_IN_TASKQ(ptqp, ptp);
- ptp->next = NULL;
- ptp->prev = ptqp->last;
- ptp->queue = ptqp;
- if (ptqp->last) {
- ASSERT(ptqp->first);
- ptqp->last->next = ptp;
+ erts_aint32_t act = erts_smp_atomic32_read_nob(&pp->sched.flags);
+
+ if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY_PORT)) {
+ *execqp = pp->sched.taskq.local.first;
+ *processing_busy_q_p = 0;
}
else {
- ASSERT(!ptqp->first);
- ptqp->first = ptp;
+ *execqp = pp->sched.taskq.local.busy.first;
+ *processing_busy_q_p = 1;
+ }
+
+ ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
+
+ while (1) {
+ erts_aint32_t new, exp;
+
+ new = exp = act;
+
+ new &= ~ERTS_PTS_FLG_IN_RUNQ;
+ new |= ERTS_PTS_FLG_EXEC;
+
+ act = erts_smp_atomic32_cmpxchg_nob(&pp->sched.flags, new, exp);
+
+ ASSERT(act & ERTS_PTS_FLG_IN_RUNQ);
+
+ if (exp == act)
+ break;
}
- ptqp->last = ptp;
- ERTS_PT_CHK_IN_TASKQ(ptqp, ptp);
}
-static ERTS_INLINE void
-push_task(ErtsPortTaskQueue *ptqp, ErtsPortTask *ptp)
+/* finalize_exec() return value != 0 if port should remain active */
+static ERTS_INLINE int
+finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q)
{
- ERTS_PT_CHK_NOT_IN_TASKQ(ptqp, ptp);
- ptp->next = ptqp->first;
- ptp->prev = NULL;
- ptp->queue = ptqp;
- if (ptqp->first) {
- ASSERT(ptqp->last);
- ptqp->first->prev = ptp;
- }
+ erts_aint32_t act;
+
+ if (!processing_busy_q)
+ pp->sched.taskq.local.first = *execq;
else {
- ASSERT(!ptqp->last);
- ptqp->last = ptp;
+ pp->sched.taskq.local.busy.first = *execq;
+ ASSERT(*execq);
+ }
+
+ ERTS_PT_DBG_CHK_TASK_QS(pp, *execq, processing_busy_q);
+
+ *execq = NULL;
+
+ act = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ if (act & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
+ act = check_unset_busy_port_q(pp, act, pp->sched.taskq.bpq);
+
+ while (1) {
+ erts_aint32_t new, exp;
+
+ new = exp = act;
+
+ new &= ~ERTS_PTS_FLG_EXEC;
+ if (act & ERTS_PTS_FLG_HAVE_TASKS)
+ new |= ERTS_PTS_FLG_IN_RUNQ;
+
+ act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
+
+ ASSERT(!(act & ERTS_PTS_FLG_IN_RUNQ));
+
+ if (exp == act)
+ break;
}
- ptqp->first = ptp;
- ERTS_PT_CHK_IN_TASKQ(ptqp, ptp);
+
+ return (act & ERTS_PTS_FLG_HAVE_TASKS) != 0;
}
-static ERTS_INLINE void
-dequeue_task(ErtsPortTask *ptp)
+static ERTS_INLINE erts_aint32_t
+select_queue_for_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
{
- ASSERT(ptp);
- ASSERT(ptp->queue);
- ERTS_PT_CHK_IN_TASKQ(ptp->queue, ptp);
- if (ptp->next)
- ptp->next->prev = ptp->prev;
- else {
- ASSERT(ptp->queue->last == ptp);
- ptp->queue->last = ptp->prev;
+ erts_aint32_t flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+
+ if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
+ flags = check_unset_busy_port_q(pp, flags, pp->sched.taskq.bpq);
+
+ ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
+
+ if (flags & ERTS_PTS_FLG_BUSY_PORT) {
+ if (*processing_busy_q_p) {
+ ErtsPortTask *ptp;
+
+ ptp = pp->sched.taskq.local.busy.first = *execqp;
+ if (!ptp)
+ pp->sched.taskq.local.busy.last = NULL;
+ else if (!(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY))
+ no_sig_dep_move_from_busyq(pp);
+
+ *execqp = pp->sched.taskq.local.first;
+ *processing_busy_q_p = 0;
+
+ ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
+ }
+
+ return flags;
}
- if (ptp->prev)
- ptp->prev->next = ptp->next;
- else {
- ASSERT(ptp->queue->first == ptp);
- ptp->queue->first = ptp->next;
+
+ /* Not busy */
+
+ if (!*processing_busy_q_p && pp->sched.taskq.local.busy.first) {
+ pp->sched.taskq.local.first = *execqp;
+ *execqp = pp->sched.taskq.local.busy.first;
+ *processing_busy_q_p = 1;
+
+ ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
}
- ASSERT(ptp->queue->first || !ptp->queue->last);
- ASSERT(ptp->queue->last || !ptp->queue->first);
- ERTS_PT_CHK_NOT_IN_TASKQ(ptp->queue, ptp);
+ return flags;
}
-static ERTS_INLINE ErtsPortTask *
-pop_task(ErtsPortTaskQueue *ptqp)
+/*
+ * check_task_for_exec() returns a value !0 if the task
+ * is ok to execute; otherwise 0.
+ */
+static ERTS_INLINE int
+check_task_for_exec(Port *pp,
+ erts_aint32_t flags,
+ ErtsPortTask **execqp,
+ int *processing_busy_q_p,
+ ErtsPortTask *ptp)
{
- ErtsPortTask *ptp = ptqp->first;
- if (!ptp) {
- ASSERT(!ptqp->last);
+
+ if (!*processing_busy_q_p) {
+ /* Processing normal queue */
+
+ ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);
+
+ if ((flags & ERTS_PTS_FLG_BUSY_PORT)
+ && (ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)) {
+
+ busy_wait_move_to_busy_queue(pp, ptp);
+ ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
+
+ return 0;
+ }
+
+ if (pp->sched.taskq.local.busy.last
+ && (ptp->u.alive.flags & ERTS_PT_FLG_SIG_DEP)) {
+
+ int res = !check_sig_dep_move_to_busy_queue(pp, ptp);
+ ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
+
+ return res;
+ }
+
}
else {
- ERTS_PT_CHK_IN_TASKQ(ptqp, ptp);
- ASSERT(!ptp->prev);
- ptqp->first = ptp->next;
- if (ptqp->first)
- ptqp->first->prev = NULL;
- else {
- ASSERT(ptqp->last == ptp);
- ptqp->last = NULL;
+ /* Processing busy queue */
+
+ ASSERT(!(flags & ERTS_PTS_FLG_BUSY_PORT));
+
+ ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);
+
+ popped_from_busy_queue(pp, ptp, !*execqp);
+
+ if (!*execqp) {
+ *execqp = pp->sched.taskq.local.first;
+ *processing_busy_q_p = 0;
}
- ASSERT(ptp->queue->first || !ptp->queue->last);
- ASSERT(ptp->queue->last || !ptp->queue->first);
+
+ ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
+
}
- ERTS_PT_CHK_NOT_IN_TASKQ(ptqp, ptp);
- return ptp;
+
+ return 1;
}
-#ifdef HARD_DEBUG
+static ErtsPortTask *
+fetch_in_queue(Port *pp, ErtsPortTask **execqp)
+{
+ ErtsPortTask *ptp;
+ ErtsPortTaskHandleList *free_nshp = NULL;
-static void
-check_task_queue(ErtsPortTaskQueue *ptqp,
- ErtsPortTask *chk_ptp,
- int inq)
+ erts_port_task_sched_lock(&pp->sched);
+
+ ptp = pp->sched.taskq.in.first;
+ pp->sched.taskq.in.first = NULL;
+ pp->sched.taskq.in.last = NULL;
+ if (ptp)
+ *execqp = ptp->u.alive.next;
+ else
+ erts_smp_atomic32_read_band_nob(&pp->sched.flags,
+ ~ERTS_PTS_FLG_HAVE_TASKS);
+
+
+ if (pp->sched.taskq.local.busy.nosuspend)
+ free_nshp = get_free_nosuspend_handles(pp);
+
+ erts_port_task_sched_unlock(&pp->sched);
+
+ if (free_nshp)
+ free_nosuspend_handles(free_nshp);
+
+ return ptp;
+}
+
+static ERTS_INLINE ErtsPortTask *
+select_task_for_exec(Port *pp,
+ ErtsPortTask **execqp,
+ int *processing_busy_q_p)
{
ErtsPortTask *ptp;
- ErtsPortTask *last_ptp;
- ErtsPortTask *first_ptp = ptqp->first;
- int found_forward = 0, found_backward = 0;
- if (!first_ptp) {
- ASSERT(!ptqp->last);
- }
- else {
- ASSERT(!first_ptp->prev);
- for (ptp = first_ptp; ptp; ptp = ptp->next) {
- ASSERT(ptp->queue == ptqp);
- if (chk_ptp == ptp)
- found_forward = 1;
- if (!ptp->prev) {
- ASSERT(first_ptp == ptp);
- }
- if (!ptp->next) {
- ASSERT(ptqp->last == ptp);
- last_ptp = ptp;
- }
- }
- for (ptp = last_ptp; ptp; ptp = ptp->prev) {
- ASSERT(ptp->queue == ptqp);
- if (chk_ptp == ptp)
- found_backward = 1;
- if (!ptp->prev) {
- ASSERT(first_ptp == ptp);
- }
- if (!ptp->next) {
- ASSERT(ptqp->last == ptp);
- }
- }
- }
- if (chk_ptp) {
- if (inq) {
- ASSERT(found_forward && found_backward);
- }
+ erts_aint32_t flags;
+
+ flags = select_queue_for_exec(pp, execqp, processing_busy_q_p);
+
+ while (1) {
+ ptp = *execqp;
+ if (ptp)
+ *execqp = ptp->u.alive.next;
else {
- ASSERT(!found_forward && !found_backward);
+ ptp = fetch_in_queue(pp, execqp);
+ if (!ptp)
+ return NULL;
}
+ if (check_task_for_exec(pp, flags, execqp, processing_busy_q_p, ptp))
+ return ptp;
}
}
-#endif
/*
* Abort a scheduled task.
*/
int
-erts_port_task_abort(Eterm id, ErtsPortTaskHandle *pthp)
+erts_port_task_abort(ErtsPortTaskHandle *pthp)
{
- ErtsRunQueue *runq;
- ErtsPortTaskQueue *ptqp;
+ int res;
ErtsPortTask *ptp;
- Port *pp;
-
- pp = &erts_port[internal_port_index(id)];
- runq = erts_port_runq(pp);
- if (!runq)
- return 1;
+#ifdef ERTS_SMP
+ ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay();
+#endif
ptp = handle2task(pthp);
+ if (!ptp)
+ res = -1;
+ else {
+ erts_aint32_t old_state;
+
+#ifdef DEBUG
+ ErtsPortTaskHandle *saved_pthp = ptp->u.alive.handle;
+ ERTS_SMP_READ_MEMORY_BARRIER;
+ old_state = erts_smp_atomic32_read_nob(&ptp->state);
+ if (old_state == ERTS_PT_STATE_SCHEDULED) {
+ ASSERT(saved_pthp == pthp);
+ }
+#endif
- if (!ptp) {
- erts_smp_runq_unlock(runq);
- return 1;
+ old_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
+ ERTS_PT_STATE_ABORTED,
+ ERTS_PT_STATE_SCHEDULED);
+ if (old_state != ERTS_PT_STATE_SCHEDULED)
+ res = - 1; /* Task already aborted, executing, or executed */
+ else {
+
+ reset_port_task_handle(pthp);
+
+ switch (ptp->type) {
+ case ERTS_PORT_TASK_INPUT:
+ case ERTS_PORT_TASK_OUTPUT:
+ case ERTS_PORT_TASK_EVENT:
+ ASSERT(erts_smp_atomic_read_nob(
+ &erts_port_task_outstanding_io_tasks) > 0);
+ erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks);
+ break;
+ case ERTS_PORT_TASK_PROC_SIG:
+ ERTS_INTERNAL_ERROR("Aborted process to port signal");
+ break;
+ default:
+ break;
+ }
+
+ res = 0;
+ }
}
- ASSERT(ptp->handle == pthp);
- ptqp = ptp->queue;
- ASSERT(pp == ptqp->port);
+#ifdef ERTS_SMP
+ erts_thr_progress_unmanaged_continue(dhndl);
+#endif
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
- ASSERT(ptqp);
- ASSERT(ptqp->first);
+ return res;
+}
- dequeue_task(ptp);
- reset_handle(ptp);
+void
+erts_port_task_abort_nosuspend_tasks(Port *pp)
+{
+ erts_aint32_t flags;
+ ErtsPortTaskHandleList *abort_list;
+#ifdef ERTS_SMP
+ ErtsThrPrgrDelayHandle dhndl = ERTS_THR_PRGR_DHANDLE_INVALID;
+#endif
- switch (ptp->type) {
- case ERTS_PORT_TASK_INPUT:
- case ERTS_PORT_TASK_OUTPUT:
- case ERTS_PORT_TASK_EVENT:
- ASSERT(erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) > 0);
- erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks);
- break;
- default:
- break;
- }
+ erts_port_task_sched_lock(&pp->sched);
+ flags = erts_smp_atomic32_read_band_nob(&pp->sched.flags,
+ ~ERTS_PTS_FLG_HAVE_NS_TASKS);
+ abort_list = pp->sched.taskq.local.busy.nosuspend;
+ pp->sched.taskq.local.busy.nosuspend = NULL;
+ erts_port_task_sched_unlock(&pp->sched);
- ASSERT(ptqp == pp->sched.taskq || ptqp == pp->sched.exe_taskq);
+ while (abort_list) {
+#ifdef DEBUG
+ ErtsPortTaskHandle *saved_pthp;
+#endif
+ ErtsPortTaskType type;
+ ErtsPortTaskTypeData td;
+ ErtsPortTaskHandle *pthp;
+ ErtsPortTask *ptp;
+ ErtsPortTaskHandleList *pthlp;
+ erts_aint32_t old_state;
- if (ptqp->first || pp->sched.taskq != ptqp)
- ptqp = NULL;
- else
- pp->sched.taskq = NULL;
+ pthlp = abort_list;
+ abort_list = pthlp->u.next;
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ dhndl = erts_thr_progress_unmanaged_delay();
+#endif
- erts_smp_runq_unlock(runq);
+ pthp = &pthlp->handle;
+ ptp = handle2task(pthp);
+ if (!ptp) {
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ erts_thr_progress_unmanaged_continue(dhndl);
+#endif
+ schedule_port_task_handle_list_free(pthlp);
+ continue;
+ }
- port_task_free(ptp);
- if (ptqp)
- port_taskq_free(ptqp);
+#ifdef DEBUG
+ saved_pthp = ptp->u.alive.handle;
+ ERTS_SMP_READ_MEMORY_BARRIER;
+ old_state = erts_smp_atomic32_read_nob(&ptp->state);
+ if (old_state == ERTS_PT_STATE_SCHEDULED) {
+ ASSERT(saved_pthp == pthp);
+ }
+#endif
- return 0;
+ old_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
+ ERTS_PT_STATE_ABORTED,
+ ERTS_PT_STATE_SCHEDULED);
+ if (old_state != ERTS_PT_STATE_SCHEDULED) {
+ /* Task already aborted, executing, or executed */
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ erts_thr_progress_unmanaged_continue(dhndl);
+#endif
+ schedule_port_task_handle_list_free(pthlp);
+ continue;
+ }
+
+ reset_port_task_handle(pthp);
+
+ type = ptp->type;
+ td = ptp->u.alive.td;
+
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ erts_thr_progress_unmanaged_continue(dhndl);
+#endif
+ schedule_port_task_handle_list_free(pthlp);
+
+ abort_nosuspend_task(pp, type, &td);
+ }
}
/*
@@ -488,243 +1301,264 @@ int
erts_port_task_schedule(Eterm id,
ErtsPortTaskHandle *pthp,
ErtsPortTaskType type,
- ErlDrvEvent event,
- ErlDrvEventData event_data)
+ ...)
{
+ ErtsProc2PortSigData *sigdp = NULL;
+ ErtsPortTaskHandleList *ns_pthlp = NULL;
+#ifdef ERTS_SMP
+ ErtsRunQueue *xrunq;
+ ErtsThrPrgrDelayHandle dhndl;
+#endif
ErtsRunQueue *runq;
Port *pp;
- ErtsPortTask *ptp;
- int enq_port = 0;
-
- /*
- * NOTE: We might not have the port lock here. We are only
- * allowed to access the 'sched', 'tab_status',
- * and 'id' fields of the port struct while
- * tasks_lock is held.
- */
+ ErtsPortTask *ptp = NULL;
+ erts_aint32_t act, add_flags;
if (pthp && erts_port_task_is_scheduled(pthp)) {
ASSERT(0);
- erts_port_task_abort(id, pthp);
+ erts_port_task_abort(pthp);
}
- ptp = port_task_alloc();
-
ASSERT(is_internal_port(id));
- pp = &erts_port[internal_port_index(id)];
- runq = erts_port_runq(pp);
- if (!runq || ERTS_PORT_TASK_INVALID_PORT(pp, id)) {
- if (runq)
- erts_smp_runq_unlock(runq);
- return -1;
- }
-
- ASSERT(!erts_port_task_is_scheduled(pthp));
+#ifdef ERTS_SMP
+ dhndl = erts_thr_progress_unmanaged_delay();
+#endif
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
+ pp = erts_port_lookup_raw(id);
- if (!pp->sched.taskq && !pp->sched.in_runq && !pp->sched.exe_taskq) {
#ifdef ERTS_SMP
- ErtsRunQueue *xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
- if (xrunq) {
- /* Port emigrated ... */
- erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq);
- erts_smp_runq_unlock(runq);
- runq = erts_port_runq(pp);
- if (!runq)
- return -1;
- }
- enq_port = !pp->sched.taskq && !pp->sched.in_runq && !pp->sched.exe_taskq;
-#else
- enq_port = 1;
-#endif
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
+ if (pp)
+ erts_port_inc_refc(pp);
+ erts_thr_progress_unmanaged_continue(dhndl);
}
+#endif
- ASSERT(!enq_port
- || !(ERTS_RUNQ_FLGS_GET_NOB(runq) & ERTS_RUNQ_FLG_SUSPENDED));
+ if (!pp)
+ goto fail;
- if (!pp->sched.taskq)
- pp->sched.taskq = port_taskq_init(port_taskq_alloc(), pp);
+ if (type != ERTS_PORT_TASK_PROC_SIG) {
+ ptp = port_task_alloc();
- ASSERT(ptp);
+ ptp->type = type;
+ ptp->u.alive.flags = 0;
- ptp->type = type;
- ptp->event = event;
- ptp->event_data = event_data;
+ erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);
- set_handle(ptp, pthp);
+ set_handle(ptp, pthp);
+ }
switch (type) {
- case ERTS_PORT_TASK_FREE:
- erl_exit(ERTS_ABORT_EXIT,
- "erts_port_task_schedule(): Cannot schedule free task\n");
- break;
case ERTS_PORT_TASK_INPUT:
- case ERTS_PORT_TASK_OUTPUT:
- case ERTS_PORT_TASK_EVENT:
+ case ERTS_PORT_TASK_OUTPUT: {
+ va_list argp;
+ va_start(argp, type);
+ ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent);
+ va_end(argp);
erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks);
- /* Fall through... */
+ break;
+ }
+ case ERTS_PORT_TASK_EVENT: {
+ va_list argp;
+ va_start(argp, type);
+ ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent);
+ ptp->u.alive.td.io.event_data = va_arg(argp, ErlDrvEventData);
+ va_end(argp);
+ erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks);
+ break;
+ }
+ case ERTS_PORT_TASK_PROC_SIG: {
+ va_list argp;
+ ASSERT(!pthp);
+ va_start(argp, type);
+ sigdp = va_arg(argp, ErtsProc2PortSigData *);
+ ptp = p2p_sig_data_to_task(sigdp);
+ ptp->u.alive.td.psig.callback = va_arg(argp, ErtsProc2PortSigCallback);
+ ptp->u.alive.flags |= va_arg(argp, int);
+ va_end(argp);
+ if (!(ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND))
+ set_handle(ptp, pthp);
+ else {
+ ns_pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST,
+ sizeof(ErtsPortTaskHandleList));
+ set_handle(ptp, &ns_pthlp->handle);
+ }
+ break;
+ }
default:
- enqueue_task(pp->sched.taskq, ptp);
break;
}
-#ifndef ERTS_SMP
- /*
- * When (!enq_port && !pp->sched.exe_taskq) is true in the smp case,
- * the port might not be in the run queue. If this is the case, another
- * thread is in the process of enqueueing the port. This very seldom
- * occur, but do occur and is a valid scenario. Debug info showing this
- * enqueue in progress must be introduced before we can enable (modified
- * versions of these) assertions in the smp case again.
- */
-#if defined(HARD_DEBUG)
- if (pp->sched.exe_taskq || enq_port)
- ERTS_PT_CHK_NOT_IN_PORTQ(runq, pp);
- else
- ERTS_PT_CHK_IN_PORTQ(runq, pp);
-#elif defined(DEBUG)
- if (!enq_port && !pp->sched.exe_taskq) {
- /* We should be in port run q */
- ASSERT(pp->sched.in_runq);
+ if (!enqueue_task(pp, ptp, sigdp, ns_pthlp, &act)) {
+ reset_handle(ptp);
+ if (ns_pthlp && !(act & ERTS_PTS_FLG_EXIT))
+ goto abort_nosuspend;
+ else
+ goto fail;
}
-#endif
-#endif
- if (!enq_port) {
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
- erts_smp_runq_unlock(runq);
- }
- else {
- enqueue_port(runq, pp);
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
-
- if (erts_system_profile_flags.runnable_ports) {
- profile_runnable_port(pp, am_active);
+ add_flags = ERTS_PTS_FLG_HAVE_TASKS;
+ if (ns_pthlp)
+ add_flags |= ERTS_PTS_FLG_HAVE_NS_TASKS;
+
+ while (1) {
+ erts_aint32_t new, exp;
+
+ if ((act & add_flags) == add_flags
+ && (act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
+ goto done; /* Done */
+
+ new = exp = act;
+ new |= add_flags;
+ if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
+ new |= ERTS_PTS_FLG_IN_RUNQ;
+
+ act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
+
+ if (exp == act) {
+ if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
+ break; /* Need to enqueue port */
+ goto done; /* Done */
}
+ if (act & ERTS_PTS_FLG_EXIT)
+ goto done; /* Died after our task insert... */
+ }
+
+ /* Enqueue port on run-queue */
+
+ runq = erts_port_runq(pp);
+ if (!runq)
+ ERTS_INTERNAL_ERROR("Missing run-queue");
+
+#ifdef ERTS_SMP
+ xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
+ if (xrunq) {
+ /* Port emigrated ... */
+ erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq);
erts_smp_runq_unlock(runq);
+ runq = erts_port_runq(pp);
+ if (!runq)
+ ERTS_INTERNAL_ERROR("Missing run-queue");
+ }
+#endif
- erts_smp_notify_inc_runq(runq);
+ enqueue_port(runq, pp);
+
+ if (erts_system_profile_flags.runnable_ports) {
+ profile_runnable_port(pp, am_active);
}
+
+ erts_smp_runq_unlock(runq);
+
+ erts_smp_notify_inc_runq(runq);
+
+done:
+
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ erts_port_dec_refc(pp);
+#endif
+
+ return 0;
+
+abort_nosuspend:
+
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ erts_port_dec_refc(pp);
+#endif
+
+ abort_nosuspend_task(pp, ptp->type, &ptp->u.alive.td);
+
+ ASSERT(ns_pthlp);
+ erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp);
+ if (ptp)
+ port_task_free(ptp);
+
return 0;
+
+fail:
+
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ erts_port_dec_refc(pp);
+#endif
+
+ if (ns_pthlp)
+ erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp);
+
+ if (ptp)
+ port_task_free(ptp);
+
+ return -1;
}
void
erts_port_task_free_port(Port *pp)
{
+ ErtsProcList *suspended;
+ erts_aint32_t flags;
ErtsRunQueue *runq;
- ErtsPortTaskQueue *ptqp;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
- ASSERT(!(pp->status & ERTS_PORT_SFLGS_DEAD));
+ ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
+
runq = erts_port_runq(pp);
- ASSERT(runq);
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
- ptqp = pp->sched.exe_taskq;
- if (ptqp) {
- /* I (this thread) am currently executing this port, free it
- when scheduled out... */
- ErtsPortTask *ptp;
- enqueue_free:
- ptp = port_task_alloc();
- erts_smp_port_state_lock(pp);
- pp->status &= ~ERTS_PORT_SFLG_CLOSING;
- pp->status |= ERTS_PORT_SFLG_FREE_SCHEDULED;
- erts_may_save_closed_port(pp);
- erts_smp_port_state_unlock(pp);
- ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&pp->refc) > 1);
- ptp->type = ERTS_PORT_TASK_FREE;
- ptp->event = (ErlDrvEvent) -1;
- ptp->event_data = NULL;
- set_handle(ptp, NULL);
- push_task(ptqp, ptp);
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
- erts_smp_runq_unlock(runq);
- }
- else {
- if (pp->sched.in_runq) {
- ptqp = pp->sched.taskq;
- if (!ptqp)
- pp->sched.taskq = ptqp = port_taskq_init(port_taskq_alloc(), pp);
- goto enqueue_free;
- }
- ASSERT(!pp->sched.taskq);
- erts_smp_port_state_lock(pp);
- pp->status &= ~ERTS_PORT_SFLG_CLOSING;
- pp->status |= ERTS_PORT_SFLG_FREE_SCHEDULED;
- erts_may_save_closed_port(pp);
- erts_smp_port_state_unlock(pp);
- erts_smp_atomic_dec_nob(&pp->refc); /* Not alive */
- ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&pp->refc) > 0); /* Lock */
- handle_remaining_tasks(runq, pp); /* May release runq lock */
- ASSERT(!pp->sched.exe_taskq && (!ptqp || !ptqp->first));
- pp->sched.taskq = NULL;
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
- erts_smp_runq_unlock(runq);
- }
-}
+ if (!runq)
+ ERTS_INTERNAL_ERROR("Missing run-queue");
+ erts_port_task_sched_lock(&pp->sched);
+ flags = erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
+ ERTS_PTS_FLG_EXIT);
+ suspended = pp->suspended;
+ pp->suspended = NULL;
+ erts_port_task_sched_unlock(&pp->sched);
+ erts_atomic32_read_bset_relb(&pp->state,
+ (ERTS_PORT_SFLG_CLOSING
+ | ERTS_PORT_SFLG_FREE),
+ ERTS_PORT_SFLG_FREE);
-typedef struct {
- ErtsRunQueue *runq;
- int *resp;
-} ErtsPortTaskExeBlockData;
+ erts_smp_runq_unlock(runq);
+
+ if (erts_proclist_fetch(&suspended, NULL))
+ erts_resume_processes(suspended);
+
+ if (!(flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
+ begin_port_cleanup(pp, NULL);
+}
/*
- * Run all scheduled tasks for the first port in run queue. If
- * new tasks appear while running reschedule port (free task is
- * an exception; it is always handled instantly).
+ * Execute scheduled tasks of a port.
*
* erts_port_task_execute() is called by scheduler threads between
- * scheduleing of processes. Sched lock should be held by caller.
+ * scheduling of processes. Run-queue lock should be held by caller.
*/
int
erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
{
Port *pp;
- ErtsPortTaskQueue *ptqp;
- ErtsPortTask *ptp;
+ ErtsPortTask *execq;
+ int processing_busy_q;
int res = 0;
int reds = ERTS_PORT_REDS_EXECUTE;
erts_aint_t io_tasks_executed = 0;
int fpe_was_unmasked;
+ erts_aint32_t state;
+ int active;
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
- ERTS_PT_CHK_PORTQ(runq);
-
pp = pop_port(runq);
if (!pp) {
res = 0;
goto done;
}
- ASSERT(pp->sched.in_runq);
- pp->sched.in_runq = 0;
- if (!pp->sched.taskq) {
- if (erts_system_profile_flags.runnable_ports)
- profile_runnable_port(pp, am_inactive);
- res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
- != (erts_aint_t) 0);
- goto done;
- }
+ erts_smp_runq_unlock(runq);
*curr_port_pp = pp;
-
- ASSERT(pp->sched.taskq->first);
- ptqp = pp->sched.taskq;
- pp->sched.taskq = NULL;
-
- ASSERT(!pp->sched.exe_taskq);
- pp->sched.exe_taskq = ptqp;
-
- if (erts_smp_port_trylock(pp) == EBUSY) {
- erts_smp_runq_unlock(runq);
- erts_smp_port_lock(pp);
- erts_smp_runq_lock(runq);
- }
if (erts_sched_stat.enabled) {
ErtsSchedulerData *esdp = erts_get_scheduler_data();
@@ -741,77 +1575,94 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
erts_smp_spin_unlock(&erts_sched_stat.lock);
}
+ prepare_exec(pp, &execq, &processing_busy_q);
+
+ erts_smp_port_lock(pp);
+
/* trace port scheduling, in */
if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
trace_sched_ports(pp, am_in);
}
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+ fpe_was_unmasked = erts_block_fpe();
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
- ptp = pop_task(ptqp);
+ state = erts_atomic32_read_nob(&pp->state);
+ goto begin_handle_tasks;
- fpe_was_unmasked = erts_block_fpe();
+ while (1) {
+ erts_aint32_t task_state;
+ ErtsPortTask *ptp;
- while (ptp) {
- ASSERT(pp->sched.taskq != pp->sched.exe_taskq);
+ ptp = select_task_for_exec(pp, &execq, &processing_busy_q);
+ if (!ptp)
+ break;
+
+ task_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
+ ERTS_PT_STATE_EXECUTING,
+ ERTS_PT_STATE_SCHEDULED);
+ if (task_state != ERTS_PT_STATE_SCHEDULED) {
+ ASSERT(task_state == ERTS_PT_STATE_ABORTED);
+ goto aborted_port_task;
+ }
reset_handle(ptp);
- erts_smp_runq_unlock(runq);
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
ERTS_SMP_CHK_NO_PROC_LOCKS;
ASSERT(pp->drv_ptr);
switch (ptp->type) {
- case ERTS_PORT_TASK_FREE: /* May be pushed in q at any time */
- reds += ERTS_PORT_REDS_FREE;
- erts_smp_runq_lock(runq);
-
- erts_unblock_fpe(fpe_was_unmasked);
- ASSERT(pp->status & ERTS_PORT_SFLG_FREE_SCHEDULED);
- if (ptqp->first || (pp->sched.taskq && pp->sched.taskq->first))
- handle_remaining_tasks(runq, pp);
- ASSERT(!ptqp->first
- && (!pp->sched.taskq || !pp->sched.taskq->first));
- erts_smp_atomic_dec_nob(&pp->refc); /* Not alive */
- ERTS_LC_ASSERT(erts_smp_atomic_read_nob(&pp->refc) > 0); /* Lock */
-
- port_task_free(ptp);
- if (pp->sched.taskq)
- port_taskq_free(pp->sched.taskq);
- pp->sched.taskq = NULL;
-
- goto tasks_done;
case ERTS_PORT_TASK_TIMEOUT:
reds += ERTS_PORT_REDS_TIMEOUT;
- if (!(pp->status & ERTS_PORT_SFLGS_DEAD)) {
+ if (!(state & ERTS_PORT_SFLGS_DEAD)) {
DTRACE_DRIVER(driver_timeout, pp);
(*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data);
}
break;
case ERTS_PORT_TASK_INPUT:
reds += ERTS_PORT_REDS_INPUT;
- ASSERT((pp->status & ERTS_PORT_SFLGS_DEAD) == 0);
+ ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_ready_input, pp);
/* NOTE some windows drivers use ->ready_input for input and output */
- (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data, ptp->event);
+ (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data,
+ ptp->u.alive.td.io.event);
io_tasks_executed++;
break;
case ERTS_PORT_TASK_OUTPUT:
reds += ERTS_PORT_REDS_OUTPUT;
- ASSERT((pp->status & ERTS_PORT_SFLGS_DEAD) == 0);
+ ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_ready_output, pp);
- (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, ptp->event);
+ (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data,
+ ptp->u.alive.td.io.event);
io_tasks_executed++;
break;
case ERTS_PORT_TASK_EVENT:
reds += ERTS_PORT_REDS_EVENT;
- ASSERT((pp->status & ERTS_PORT_SFLGS_DEAD) == 0);
+ ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_event, pp);
- (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data, ptp->event, ptp->event_data);
+ (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data,
+ ptp->u.alive.td.io.event,
+ ptp->u.alive.td.io.event_data);
io_tasks_executed++;
break;
+ case ERTS_PORT_TASK_PROC_SIG: {
+ ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
+ ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
+ if (!pp->sched.taskq.bpq)
+ reds += ptp->u.alive.td.psig.callback(pp,
+ state,
+ ERTS_PROC2PORT_SIG_EXEC,
+ sigdp);
+ else {
+ ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
+ reds += ptp->u.alive.td.psig.callback(pp,
+ state,
+ ERTS_PROC2PORT_SIG_EXEC,
+ sigdp);
+ dequeued_proc2port_data(pp, size);
+ }
+ break;
+ }
case ERTS_PORT_TASK_DIST_CMD:
reds += erts_dist_command(pp, CONTEXT_REDS-reds);
break;
@@ -822,33 +1673,31 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
break;
}
- if ((pp->status & ERTS_PORT_SFLG_CLOSING)
- && erts_is_port_ioq_empty(pp)) {
- reds += ERTS_PORT_REDS_TERMINATE;
- erts_terminate_port(pp);
- }
+ reds += erts_port_driver_callback_epilogue(pp, &state);
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+ aborted_port_task:
+ schedule_port_task_free(ptp);
-#ifdef ERTS_SMP
- if (pp->xports)
- erts_smp_xports_unlock(pp);
- ASSERT(!pp->xports);
-#endif
-
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+ begin_handle_tasks:
+ if (state & ERTS_PORT_SFLG_FREE) {
+ reds += ERTS_PORT_REDS_FREE;
- port_task_free(ptp);
+ begin_port_cleanup(pp, &execq);
- erts_smp_runq_lock(runq);
+ break;
+ }
- ptp = pop_task(ptqp);
+ if (reds >= CONTEXT_REDS)
+ break;
}
- tasks_done:
-
erts_unblock_fpe(fpe_was_unmasked);
+ /* trace port scheduling, out */
+ if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
+ trace_sched_ports(pp, am_out);
+ }
+
if (io_tasks_executed) {
ASSERT(erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
>= io_tasks_executed);
@@ -856,15 +1705,19 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
-1*io_tasks_executed);
}
- *curr_port_pp = NULL;
-
#ifdef ERTS_SMP
ASSERT(runq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
#endif
- if (!pp->sched.taskq) {
- ASSERT(pp->sched.exe_taskq);
- pp->sched.exe_taskq = NULL;
+ active = finalize_exec(pp, &execq, processing_busy_q);
+
+ erts_port_release(pp);
+
+ *curr_port_pp = NULL;
+
+ erts_smp_runq_lock(runq);
+
+ if (!active) {
if (erts_system_profile_flags.runnable_ports)
profile_runnable_port(pp, am_inactive);
}
@@ -873,16 +1726,13 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
ErtsRunQueue *xrunq;
#endif
- ASSERT(!(pp->status & ERTS_PORT_SFLGS_DEAD));
- ASSERT(pp->sched.taskq->first);
+ ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
#ifdef ERTS_SMP
xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
if (!xrunq) {
#endif
enqueue_port(runq, pp);
- ASSERT(pp->sched.exe_taskq);
- pp->sched.exe_taskq = NULL;
/* No need to notify ourselves about inc in runq. */
#ifdef ERTS_SMP
}
@@ -892,49 +1742,20 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
erts_smp_runq_unlock(runq);
xrunq = erts_port_runq(pp);
- if (xrunq) {
- enqueue_port(xrunq, pp);
- ASSERT(pp->sched.exe_taskq);
- pp->sched.exe_taskq = NULL;
- erts_smp_runq_unlock(xrunq);
- erts_smp_notify_inc_runq(xrunq);
- }
+ ASSERT(xrunq);
+ enqueue_port(xrunq, pp);
+ erts_smp_runq_unlock(xrunq);
+ erts_smp_notify_inc_runq(xrunq);
erts_smp_runq_lock(runq);
}
#endif
}
+ done:
res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
!= (erts_aint_t) 0);
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
-
- port_taskq_free(ptqp);
-
- /* trace port scheduling, out */
- if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
- trace_sched_ports(pp, am_out);
- }
-#ifndef ERTS_SMP
- erts_port_release(pp);
-#else
- {
- erts_aint_t refc;
- erts_smp_mtx_unlock(pp->lock);
- refc = erts_smp_atomic_dec_read_nob(&pp->refc);
- ASSERT(refc >= 0);
- if (refc == 0) {
- erts_smp_runq_unlock(runq);
- erts_port_cleanup(pp); /* Might aquire runq lock */
- erts_smp_runq_lock(runq);
- res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
- != (erts_aint_t) 0);
- }
- }
-#endif
-
- done:
runq->scheduler->reductions += reds;
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
@@ -943,78 +1764,146 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
return res;
}
-/*
- * Handle remaining tasks after a free task.
- */
+#ifdef ERTS_SMP
+static void
+release_port(void *vport)
+{
+ erts_port_dec_refc((Port *) vport);
+}
+#endif
static void
-handle_remaining_tasks(ErtsRunQueue *runq, Port *pp)
+begin_port_cleanup(Port *pp, ErtsPortTask **execqp)
{
- int i;
- ErtsPortTask *ptp;
- ErtsPortTaskQueue *ptqps[] = {pp->sched.exe_taskq, pp->sched.taskq};
+ int i, max;
+ ErtsPortTask *qs[2];
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
- for (i = 0; i < sizeof(ptqps)/sizeof(ErtsPortTaskQueue *); i++) {
- if (!ptqps[i])
- continue;
- ptp = pop_task(ptqps[i]);
- while (ptp) {
+ /*
+ * Handle remaining tasks...
+ */
+
+ max = 0;
+ if (execqp && *execqp) {
+ qs[max++] = *execqp;
+ *execqp = NULL;
+ }
+
+ erts_port_task_sched_lock(&pp->sched);
+ qs[max] = pp->sched.taskq.in.first;
+ pp->sched.taskq.in.first = NULL;
+ pp->sched.taskq.in.last = NULL;
+ erts_port_task_sched_unlock(&pp->sched);
+ if (qs[max])
+ max++;
+
+ for (i = 0; i < max; i++) {
+ while (1) {
+ erts_aint32_t state;
+ ErtsPortTask *ptp = qs[i];
+ if (!ptp)
+ break;
+
+ qs[i] = ptp->u.alive.next;
+
+ /* Normal case here is aborted tasks... */
+ state = erts_smp_atomic32_read_nob(&ptp->state);
+ if (state == ERTS_PT_STATE_ABORTED)
+ goto aborted_port_task;
+
+ state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
+ ERTS_PT_STATE_EXECUTING,
+ ERTS_PT_STATE_SCHEDULED);
+ if (state != ERTS_PT_STATE_SCHEDULED) {
+ ASSERT(state == ERTS_PT_STATE_ABORTED);
+ goto aborted_port_task;
+ }
+
reset_handle(ptp);
- erts_smp_runq_unlock(runq);
switch (ptp->type) {
- case ERTS_PORT_TASK_FREE:
case ERTS_PORT_TASK_TIMEOUT:
break;
case ERTS_PORT_TASK_INPUT:
- erts_stale_drv_select(pp->id, ptp->event, DO_READ, 1);
+ erts_stale_drv_select(pp->common.id,
+ ptp->u.alive.td.io.event,
+ DO_READ,
+ 1);
break;
case ERTS_PORT_TASK_OUTPUT:
- erts_stale_drv_select(pp->id, ptp->event, DO_WRITE, 1);
+ erts_stale_drv_select(pp->common.id,
+ ptp->u.alive.td.io.event,
+ DO_WRITE,
+ 1);
break;
case ERTS_PORT_TASK_EVENT:
- erts_stale_drv_select(pp->id, ptp->event, 0, 1);
+ erts_stale_drv_select(pp->common.id,
+ ptp->u.alive.td.io.event,
+ 0,
+ 1);
break;
case ERTS_PORT_TASK_DIST_CMD:
break;
+ case ERTS_PORT_TASK_PROC_SIG: {
+ ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
+ if (!pp->sched.taskq.bpq)
+ ptp->u.alive.td.psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT_CLOSED,
+ sigdp);
+ else {
+ ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
+ ptp->u.alive.td.psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT_CLOSED,
+ sigdp);
+ aborted_proc2port_data(pp, size);
+ }
+ break;
+ }
default:
erl_exit(ERTS_ABORT_EXIT,
"Invalid port task type: %d\n",
(int) ptp->type);
}
- port_task_free(ptp);
-
- erts_smp_runq_lock(runq);
- ptp = pop_task(ptqps[i]);
+ aborted_port_task:
+ schedule_port_task_free(ptp);
}
}
- ASSERT(!pp->sched.taskq || !pp->sched.taskq->first);
+ erts_smp_atomic32_read_band_nob(&pp->sched.flags,
+ ~ERTS_PTS_FLG_HAVE_TASKS);
+
+ /*
+ * Schedule cleanup of port structure...
+ */
+#ifdef ERTS_SMP
+ erts_schedule_thr_prgr_later_op(release_port,
+ (void *) pp,
+ &pp->common.u.release);
+#else
+ pp->cleanup = 1;
+#endif
}
int
erts_port_is_scheduled(Port *pp)
{
- int res;
- ErtsRunQueue *runq = erts_port_runq(pp);
- if (!runq)
- return 0;
- res = pp->sched.taskq || pp->sched.exe_taskq;
- erts_smp_runq_unlock(runq);
- return res;
+ erts_aint32_t flags = erts_smp_atomic32_read_acqb(&pp->sched.flags);
+ return (flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)) != 0;
}
#ifdef ERTS_SMP
+
void
erts_enqueue_port(ErtsRunQueue *rq, Port *pp)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
ASSERT(rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
- ASSERT(pp->sched.in_runq);
+ ASSERT(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_IN_RUNQ);
enqueue_port(rq, pp);
}
@@ -1026,7 +1915,8 @@ erts_dequeue_port(ErtsRunQueue *rq)
pp = pop_port(rq);
ASSERT(!pp
|| rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
- ASSERT(!pp || pp->sched.in_runq);
+ ASSERT(!pp || (erts_smp_atomic32_read_nob(&pp->sched.flags)
+ & ERTS_PTS_FLG_IN_RUNQ));
return pp;
}
@@ -1041,5 +1931,5 @@ erts_port_task_init(void)
erts_smp_atomic_init_nob(&erts_port_task_outstanding_io_tasks,
(erts_aint_t) 0);
init_port_task_alloc();
- init_port_taskq_alloc();
+ init_busy_caller_table_alloc();
}