aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_port_task.c
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2012-09-21 15:12:07 +0200
committerRickard Green <[email protected]>2012-12-07 00:24:26 +0100
commit6e01408aba71e26884c5db81b8e4fa89bd803576 (patch)
tree709bc0a2da80ffdc73fb7426a3de80a55774ff58 /erts/emulator/beam/erl_port_task.c
parent23c6f9e07a3cae7c05e55abd01ff798384241538 (diff)
downloadotp-6e01408aba71e26884c5db81b8e4fa89bd803576.tar.gz
otp-6e01408aba71e26884c5db81b8e4fa89bd803576.tar.bz2
otp-6e01408aba71e26884c5db81b8e4fa89bd803576.zip
Implement true asynchronous signaling between processes and ports
Diffstat (limited to 'erts/emulator/beam/erl_port_task.c')
-rw-r--r--erts/emulator/beam/erl_port_task.c927
1 files changed, 859 insertions, 68 deletions
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index e613ad0144..e8ed00daee 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -36,9 +36,20 @@
#include <stdarg.h>
#if defined(DEBUG) && 0
-#define HARD_DEBUG
+#define ERTS_HARD_DEBUG_TASK_QUEUES
+#else
+#undef ERTS_HARD_DEBUG_TASK_QUEUES
+#endif
+
+#ifdef ERTS_HARD_DEBUG_TASK_QUEUES
+static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue);
+#define ERTS_PT_DBG_CHK_TASK_QS(PP, EQ, PBQ) \
+ chk_task_queues((PP), (EQ), (PBQ))
+#else
+#define ERTS_PT_DBG_CHK_TASK_QS(PP, EQ, PBQ)
#endif
+
/*
* Costs in reductions for some port operations.
*/
@@ -52,7 +63,7 @@
#ifdef USE_VM_PROBES
#define DTRACE_DRIVER(PROBE_NAME, PP) \
- if (DTRACE_ENABLED(driver_ready_input)) { \
+ if (DTRACE_ENABLED(PROBE_NAME)) { \
DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE); \
DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); \
\
@@ -70,6 +81,17 @@ erts_smp_atomic_t erts_port_task_outstanding_io_tasks;
#define ERTS_PT_STATE_ABORTED 1
#define ERTS_PT_STATE_EXECUTING 2
+typedef union {
+ struct { /* I/O tasks */
+ ErlDrvEvent event;
+ ErlDrvEventData event_data;
+ } io;
+ struct {
+ ErtsProc2PortSigCallback callback;
+ ErtsProc2PortSigData data;
+ } psig;
+} ErtsPortTaskTypeData;
+
struct ErtsPortTask_ {
erts_smp_atomic32_t state;
ErtsPortTaskType type;
@@ -77,17 +99,39 @@ struct ErtsPortTask_ {
struct {
ErtsPortTask *next;
ErtsPortTaskHandle *handle;
- union {
- struct { /* I/O tasks */
- ErlDrvEvent event;
- ErlDrvEventData event_data;
- } io;
- } u;
+ int flags;
+ Uint32 ref[ERTS_MAX_REF_NUMBERS];
+ ErtsPortTaskTypeData td;
} alive;
ErtsThrPrgrLaterOp release;
} u;
};
+struct ErtsPortTaskHandleList_ {
+ ErtsPortTaskHandle handle;
+ union {
+ ErtsPortTaskHandleList *next;
+#ifdef ERTS_SMP
+ ErtsThrPrgrLaterOp release;
+#endif
+ } u;
+};
+
+typedef struct ErtsPortTaskBusyCaller_ ErtsPortTaskBusyCaller;
+struct ErtsPortTaskBusyCaller_ {
+ ErtsPortTaskBusyCaller *next;
+ Eterm caller;
+ SWord count;
+ ErtsPortTask *last;
+};
+
+#define ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS 17
+struct ErtsPortTaskBusyCallerTable_ {
+ ErtsPortTaskBusyCaller *bucket[ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS];
+ ErtsPortTaskBusyCaller pre_alloc_busy_caller;
+};
+
+
static void begin_port_cleanup(Port *pp, ErtsPortTask **execq);
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(port_task,
@@ -95,6 +139,11 @@ ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(port_task,
1000,
ERTS_ALC_T_PORT_TASK)
+ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(busy_caller_table,
+ ErtsPortTaskBusyCallerTable,
+ 50,
+ ERTS_ALC_T_BUSY_CALLER_TAB)
+
#ifdef ERTS_SMP
static void
call_port_task_free(void *vptp)
@@ -115,6 +164,358 @@ schedule_port_task_free(ErtsPortTask *ptp)
#endif
}
+static ERTS_INLINE ErtsPortTask *
+p2p_sig_data_to_task(ErtsProc2PortSigData *sigdp)
+{
+ ErtsPortTask *ptp;
+ char *ptr = (char *) sigdp;
+ ptr -= offsetof(ErtsPortTask, u.alive.td.psig.data);
+ ptp = (ErtsPortTask *) ptr;
+ ASSERT(ptp->type == ERTS_PORT_TASK_PROC_SIG);
+ return ptp;
+}
+
+ErtsProc2PortSigData *
+erts_port_task_alloc_p2p_sig_data(void)
+{
+ ErtsPortTask *ptp = port_task_alloc();
+
+ ptp->type = ERTS_PORT_TASK_PROC_SIG;
+ ptp->u.alive.flags = ERTS_PT_FLG_SIG_DEP;
+ erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);
+
+ ASSERT(ptp == p2p_sig_data_to_task(&ptp->u.alive.td.psig.data));
+
+ return &ptp->u.alive.td.psig.data;
+}
+
+static ERTS_INLINE Eterm
+task_caller(ErtsPortTask *ptp)
+{
+ Eterm caller;
+
+ ASSERT(ptp->type == ERTS_PORT_TASK_PROC_SIG);
+
+ caller = ptp->u.alive.td.psig.data.caller;
+
+ ASSERT(is_internal_pid(caller) || is_internal_port(caller));
+
+ return caller;
+}
+
+/*
+ * Busy queue management
+ */
+
+static ERTS_INLINE int
+caller2bix(Eterm caller)
+{
+ ASSERT(is_internal_pid(caller) || is_internal_port(caller));
+ return (int) (_GET_PID_DATA(caller) % ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS);
+}
+
+
+static void
+popped_from_busy_queue(Port *pp, ErtsPortTask *ptp, int last)
+{
+ ErtsPortTaskBusyCaller **prev_bcpp = NULL, *bcp;
+ ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
+ Eterm caller = task_caller(ptp);
+ int bix = caller2bix(caller);
+
+ ASSERT(is_internal_pid(caller));
+
+ ASSERT(tabp);
+ bcp = tabp->bucket[bix];
+ prev_bcpp = &tabp->bucket[bix];
+ ASSERT(bcp);
+ while (bcp->caller != caller) {
+ prev_bcpp = &bcp->next;
+ bcp = bcp->next;
+ ASSERT(bcp);
+ }
+ ASSERT(bcp->count > 0);
+ if (--bcp->count != 0) {
+ ASSERT(!last);
+ }
+ else {
+ *prev_bcpp = bcp->next;
+ if (bcp == &tabp->pre_alloc_busy_caller)
+ bcp->caller = am_undefined;
+ else
+ erts_free(ERTS_ALC_T_BUSY_CALLER, bcp);
+ if (last) {
+#ifdef DEBUG
+ erts_aint32_t flags =
+#endif
+ erts_smp_atomic32_read_band_nob(
+ &pp->sched.flags,
+ ~ERTS_PTS_FLG_HAVE_BUSY_TASKS);
+ ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
+#ifdef DEBUG
+ for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
+ ASSERT(!tabp->bucket[bix]);
+ }
+#endif
+ busy_caller_table_free(tabp);
+ pp->sched.taskq.local.busy.first = NULL;
+ pp->sched.taskq.local.busy.last = NULL;
+ pp->sched.taskq.local.busy.table = NULL;
+ }
+ }
+}
+
+static void
+busy_wait_move_to_busy_queue(Port *pp, ErtsPortTask *ptp)
+{
+ ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
+ Eterm caller = task_caller(ptp);
+ ErtsPortTaskBusyCaller *bcp;
+ int bix;
+
+ ASSERT(is_internal_pid(caller));
+ /*
+ * Port is busy and this task type needs to wait until not busy.
+ */
+
+ ASSERT(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY);
+
+ ptp->u.alive.next = NULL;
+ if (pp->sched.taskq.local.busy.last) {
+ ASSERT(pp->sched.taskq.local.busy.first);
+ pp->sched.taskq.local.busy.last->u.alive.next = ptp;
+ }
+ else {
+ int i;
+ erts_aint32_t flags;
+
+ pp->sched.taskq.local.busy.first = ptp;
+ flags = erts_smp_atomic32_read_bor_nob(&pp->sched.flags,
+ ERTS_PTS_FLG_HAVE_BUSY_TASKS);
+ ASSERT(!(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS));
+
+ ASSERT(!tabp);
+
+ tabp = busy_caller_table_alloc();
+ pp->sched.taskq.local.busy.table = tabp;
+ for (i = 0; i < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; i++)
+ tabp->bucket[i] = NULL;
+ tabp->pre_alloc_busy_caller.caller = am_undefined;
+ }
+ pp->sched.taskq.local.busy.last = ptp;
+
+ bix = caller2bix(caller);
+ ASSERT(tabp);
+ bcp = tabp->bucket[bix];
+
+ while (bcp && bcp->caller != caller)
+ bcp = bcp->next;
+
+ if (bcp)
+ bcp->count++;
+ else {
+ if (tabp->pre_alloc_busy_caller.caller == am_undefined)
+ bcp = &tabp->pre_alloc_busy_caller;
+ else
+ bcp = erts_alloc(ERTS_ALC_T_BUSY_CALLER,
+ sizeof(ErtsPortTaskBusyCaller));
+ bcp->caller = caller;
+ bcp->count = 1;
+ bcp->next = tabp->bucket[bix];
+ tabp->bucket[bix] = bcp;
+ }
+
+ bcp->last = ptp;
+}
+
+static ERTS_INLINE int
+check_sig_dep_move_to_busy_queue(Port *pp, ErtsPortTask *ptp)
+{
+ ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
+ ErtsPortTask *last_ptp;
+ ErtsPortTaskBusyCaller *bcp;
+ int bix;
+ Eterm caller;
+
+ ASSERT(ptp->u.alive.flags & ERTS_PT_FLG_SIG_DEP);
+ ASSERT(pp->sched.taskq.local.busy.last);
+ ASSERT(tabp);
+
+
+ /*
+ * We are either not busy, or the task does not imply wait on busy port.
+ * However, due to the signaling order requirements the task might depend
+ * on other tasks in the busy queue.
+ */
+
+ caller = task_caller(ptp);
+ bix = caller2bix(caller);
+ bcp = tabp->bucket[bix];
+ while (bcp && bcp->caller != caller)
+ bcp = bcp->next;
+
+ if (!bcp)
+ return 0;
+
+ /*
+ * There are other tasks that we depend on in the busy queue;
+ * move into busy queue.
+ */
+
+ bcp->count++;
+ last_ptp = bcp->last;
+ ptp->u.alive.next = last_ptp->u.alive.next;
+ if (!ptp->u.alive.next) {
+ ASSERT(pp->sched.taskq.local.busy.last == last_ptp);
+ pp->sched.taskq.local.busy.last = ptp;
+ }
+ last_ptp->u.alive.next = ptp;
+ bcp->last = ptp;
+
+ return 1;
+}
+
+static void
+no_sig_dep_move_from_busyq(Port *pp)
+{
+ ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
+ ErtsPortTask *first_ptp, *last_ptp, *ptp;
+ ErtsPortTaskBusyCaller **prev_bcpp = NULL, *bcp = NULL;
+
+ /*
+ * Move tasks at the head of the busy queue that no longer
+ * have any dependencies to busy wait tasks into the ordinary
+ * queue.
+ */
+
+ first_ptp = ptp = pp->sched.taskq.local.busy.first;
+
+ ASSERT(ptp && !(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY));
+ ASSERT(tabp);
+
+ do {
+ Eterm caller = task_caller(ptp);
+
+ if (!bcp || bcp->caller != caller) {
+ int bix = caller2bix(caller);
+
+ prev_bcpp = &tabp->bucket[bix];
+ bcp = tabp->bucket[bix];
+ ASSERT(bcp);
+ while (bcp->caller != caller) {
+ ASSERT(bcp);
+ prev_bcpp = &bcp->next;
+ bcp = bcp->next;
+ }
+ }
+
+ ASSERT(bcp->caller == caller);
+ ASSERT(bcp->count > 0);
+
+ if (--bcp->count == 0) {
+ *prev_bcpp = bcp->next;
+ if (bcp == &tabp->pre_alloc_busy_caller)
+ bcp->caller = am_undefined;
+ else
+ erts_free(ERTS_ALC_T_BUSY_CALLER, bcp);
+ }
+
+ last_ptp = ptp;
+ ptp = ptp->u.alive.next;
+ } while (ptp && !(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY));
+
+ pp->sched.taskq.local.busy.first = last_ptp->u.alive.next;
+ if (!pp->sched.taskq.local.busy.first) {
+#ifdef DEBUG
+ int bix;
+ erts_aint32_t flags =
+#endif
+ erts_smp_atomic32_read_band_nob(
+ &pp->sched.flags,
+ ~ERTS_PTS_FLG_HAVE_BUSY_TASKS);
+ ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
+#ifdef DEBUG
+ for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
+ ASSERT(!tabp->bucket[bix]);
+ }
+#endif
+ busy_caller_table_free(tabp);
+ pp->sched.taskq.local.busy.last = NULL;
+ pp->sched.taskq.local.busy.table = NULL;
+ }
+ last_ptp->u.alive.next = pp->sched.taskq.local.first;
+ pp->sched.taskq.local.first = first_ptp;
+}
+
+#ifdef ERTS_HARD_DEBUG_TASK_QUEUES
+
+static void
+chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue)
+{
+ Sint tot_count, tot_table_count;
+ int bix;
+ ErtsPortTask *ptp, *last;
+ ErtsPortTask *first = processing_busy_queue ? execq : pp->sched.taskq.local.busy.first;
+ ErtsPortTask *nb_task_queue = processing_busy_queue ? pp->sched.taskq.local.first : execq;
+ ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
+ ErtsPortTaskBusyCaller *bcp;
+
+ if (!first) {
+ ASSERT(!tabp);
+ ASSERT(!pp->sched.taskq.local.busy.last);
+ ASSERT(!(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS));
+ return;
+ }
+
+ ASSERT(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
+ ASSERT(tabp);
+
+ tot_count = 0;
+ ptp = first;
+ while (ptp) {
+ Sint count = 0;
+ Eterm caller = task_caller(ptp);
+ int bix = caller2bix(caller);
+ for (bcp = tabp->bucket[bix]; bcp; bcp = bcp->next)
+ if (bcp->caller == caller)
+ break;
+ ASSERT(bcp && bcp->caller == caller);
+
+ ASSERT(bcp->last);
+ while (1) {
+ ErtsPortTask *ptp2;
+
+ ASSERT(caller == task_caller(ptp));
+ count++;
+ tot_count++;
+ last = ptp;
+
+ for (ptp2 = nb_task_queue; ptp2; ptp2 = ptp2->u.alive.next) {
+ ASSERT(ptp != ptp2);
+ }
+
+ if (ptp == bcp->last)
+ break;
+ ptp = ptp->u.alive.next;
+ }
+
+ ASSERT(count == bcp->count);
+ ptp = ptp->u.alive.next;
+ }
+
+ tot_table_count = 0;
+ for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
+ for (bcp = tabp->bucket[bix]; bcp; bcp = bcp->next)
+ tot_table_count += bcp->count;
+ }
+
+ ASSERT(tot_count == tot_table_count);
+
+ ASSERT(last == pp->sched.taskq.local.busy.last);
+}
+
+#endif /* ERTS_HARD_DEBUG_TASK_QUEUES */
+
/*
* Task handle manipulation.
*/
@@ -151,6 +552,141 @@ set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
}
/*
+ * No-suspend handles.
+ */
+
+#ifdef ERTS_SMP
+static void
+free_port_task_handle_list(void *vpthlp)
+{
+ erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp);
+}
+#endif
+
+static void
+schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp)
+{
+#ifdef ERTS_SMP
+ erts_schedule_thr_prgr_later_op(free_port_task_handle_list,
+ (void *) pthlp,
+ &pthlp->u.release);
+#else
+ erts_free(ERTS_ALC_T_PT_HNDL_LIST, pthlp);
+#endif
+}
+
+static ERTS_INLINE void
+abort_nosuspend_task(ErtsPortTaskType type,
+ ErtsPortTaskTypeData *tdp)
+{
+
+ if (type != ERTS_PORT_TASK_PROC_SIG)
+ ERTS_INTERNAL_ERROR("Invalid no-suspend port task type");
+
+ tdp->psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND,
+ &tdp->psig.data);
+}
+
+
+static void
+save_nosuspend_handle(Port *pp, ErtsPortTask *ptp)
+{
+ erts_aint32_t act;
+ ErtsPortTaskHandleList *pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST,
+ sizeof(ErtsPortTaskHandleList));
+
+ set_handle(ptp, &pthlp->handle);
+
+ ASSERT(ptp == handle2task(&pthlp->handle));
+ ASSERT(ptp->u.alive.handle == &pthlp->handle);
+
+ act = erts_smp_atomic32_read_nob(&pp->sched.flags);
+
+ if (!(act & ERTS_PTS_FLG_BUSY)) {
+
+ erts_port_task_sched_lock(&pp->sched);
+ pthlp->u.next = pp->sched.taskq.local.busy.nosuspend;
+ pp->sched.taskq.local.busy.nosuspend = pthlp;
+ erts_port_task_sched_unlock(&pp->sched);
+
+ act = erts_smp_atomic32_read_nob(&pp->sched.flags);
+
+ while (1) {
+ erts_aint32_t exp, new;
+
+ if ((act & (ERTS_PTS_FLG_BUSY|ERTS_PTS_FLG_HAVE_NS_TASKS))
+ == ERTS_PTS_FLG_HAVE_NS_TASKS)
+ return;
+
+ if (act & ERTS_PTS_FLG_BUSY) {
+ erts_aint32_t s;
+ s = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
+ ERTS_PT_STATE_ABORTED,
+ ERTS_PT_STATE_SCHEDULED);
+ if (s == ERTS_PT_STATE_SCHEDULED) {
+ reset_port_task_handle(&pthlp->handle);
+ break; /* Abort task */
+ }
+ /* Else: someone else handled it */
+ return;
+ }
+
+ new = exp = act;
+
+ new |= ERTS_PTS_FLG_HAVE_NS_TASKS;
+
+ act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
+ if (act == exp)
+ return;
+
+ }
+ }
+
+
+ abort_nosuspend_task(ptp->type, &ptp->u.alive.td);
+}
+
+
+static ErtsPortTaskHandleList *
+get_free_nosuspend_handles(Port *pp)
+{
+ ErtsPortTaskHandleList *nshp, *last_nshp = NULL;
+
+ ERTS_SMP_LC_ASSERT(erts_port_task_sched_lock_is_locked(&pp->sched));
+
+ nshp = pp->sched.taskq.local.busy.nosuspend;
+
+ while (nshp && !erts_port_task_is_scheduled(&nshp->handle)) {
+ last_nshp = nshp;
+ nshp = nshp->u.next;
+ }
+
+ if (!last_nshp)
+ nshp = NULL;
+ else {
+ nshp = pp->sched.taskq.local.busy.nosuspend;
+ pp->sched.taskq.local.busy.nosuspend = last_nshp->u.next;
+ last_nshp->u.next = NULL;
+ if (!pp->sched.taskq.local.busy.nosuspend)
+ erts_smp_atomic32_read_band_nob(&pp->sched.flags,
+ ~ERTS_PTS_FLG_HAVE_NS_TASKS);
+ }
+ return nshp;
+}
+
+static void
+free_nosuspend_handles(ErtsPortTaskHandleList *free_nshp)
+{
+ while (free_nshp) {
+ ErtsPortTaskHandleList *nshp = free_nshp;
+ free_nshp = free_nshp->u.next;
+ schedule_port_task_handle_list_free(nshp);
+ }
+}
+
+/*
* Port queue operations
*/
@@ -225,42 +761,21 @@ enqueue_task(Port *pp, ErtsPortTask *ptp)
return flags;
}
-static ERTS_INLINE ErtsPortTask *
-pop_task(Port *pp, ErtsPortTask **execqp)
+static ERTS_INLINE void
+prepare_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
{
- ErtsPortTask *ptp;
+ erts_aint32_t act = erts_smp_atomic32_read_nob(&pp->sched.flags);
- ptp = *execqp;
- if (ptp) {
- *execqp = ptp->u.alive.next;
- return ptp;
+ if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY)) {
+ *execqp = pp->sched.taskq.local.first;
+ *processing_busy_q_p = 0;
+ }
+ else {
+ *execqp = pp->sched.taskq.local.busy.first;
+ *processing_busy_q_p = 1;
}
- ASSERT(!pp->sched.taskq.local);
-
- erts_port_task_sched_lock(&pp->sched);
- ptp = pp->sched.taskq.in.first;
- pp->sched.taskq.in.first = NULL;
- pp->sched.taskq.in.last = NULL;
- if (ptp)
- *execqp = ptp->u.alive.next;
- else
- erts_smp_atomic32_read_band_nob(&pp->sched.flags,
- ~ERTS_PTS_FLG_HAVE_TASKS);
- erts_port_task_sched_unlock(&pp->sched);
-
-
- return ptp;
-}
-
-static ERTS_INLINE void
-prepare_exec(Port *pp, ErtsPortTask **execqp)
-{
- erts_aint32_t act;
- *execqp = pp->sched.taskq.local;
-
- /* guess a likely value */
- act = ERTS_PTS_FLG_HAVE_TASKS|ERTS_PTS_FLG_IN_RUNQ;
+ ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
while (1) {
erts_aint32_t new, exp;
@@ -281,11 +796,19 @@ prepare_exec(Port *pp, ErtsPortTask **execqp)
/* finalize_exec() return value != 0 if port should remain active */
static ERTS_INLINE int
-finalize_exec(Port *pp, ErtsPortTask **execq)
+finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q)
{
erts_aint32_t act;
- pp->sched.taskq.local = *execq;
+ if (!processing_busy_q)
+ pp->sched.taskq.local.first = *execq;
+ else {
+ pp->sched.taskq.local.busy.first = *execq;
+ ASSERT(*execq);
+ }
+
+ ERTS_PT_DBG_CHK_TASK_QS(pp, *execq, processing_busy_q);
+
*execq = NULL;
/* guess a likely value */
@@ -313,6 +836,155 @@ finalize_exec(Port *pp, ErtsPortTask **execq)
return (act & ERTS_PTS_FLG_HAVE_TASKS) != 0;
}
+static ERTS_INLINE erts_aint32_t
+select_queue_for_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
+{
+ erts_aint32_t flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+
+ ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
+
+ if (flags & ERTS_PTS_FLG_BUSY) {
+ if (*processing_busy_q_p) {
+ ErtsPortTask *ptp;
+
+ ptp = pp->sched.taskq.local.busy.first = *execqp;
+ if (!ptp)
+ pp->sched.taskq.local.busy.last = NULL;
+ else if (!(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY))
+ no_sig_dep_move_from_busyq(pp);
+
+ *execqp = pp->sched.taskq.local.first;
+ *processing_busy_q_p = 0;
+
+ ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
+ }
+
+ return flags;
+ }
+
+ /* Not busy */
+
+ if (!*processing_busy_q_p && pp->sched.taskq.local.busy.first) {
+ pp->sched.taskq.local.first = *execqp;
+ *execqp = pp->sched.taskq.local.busy.first;
+ *processing_busy_q_p = 1;
+
+ ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
+ }
+
+ return flags;
+}
+
+/*
+ * check_task_for_exec() returns a value !0 if the task
+ * is ok to execute; otherwise 0.
+ */
+static ERTS_INLINE int
+check_task_for_exec(Port *pp,
+ erts_aint32_t flags,
+ ErtsPortTask **execqp,
+ int *processing_busy_q_p,
+ ErtsPortTask *ptp)
+{
+
+ if (!*processing_busy_q_p) {
+ /* Processing normal queue */
+
+ ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);
+
+ if ((flags & ERTS_PTS_FLG_BUSY)
+ && (ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)) {
+
+ busy_wait_move_to_busy_queue(pp, ptp);
+ ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
+
+ return 0;
+ }
+
+ if (pp->sched.taskq.local.busy.last
+ && (ptp->u.alive.flags & ERTS_PT_FLG_SIG_DEP)) {
+
+ int res = !check_sig_dep_move_to_busy_queue(pp, ptp);
+ ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
+
+ return res;
+ }
+
+ }
+ else {
+ /* Processing busy queue */
+
+ ASSERT(!(flags & ERTS_PTS_FLG_BUSY));
+
+ ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);
+
+ popped_from_busy_queue(pp, ptp, !*execqp);
+
+ if (!*execqp) {
+ *execqp = pp->sched.taskq.local.first;
+ *processing_busy_q_p = 0;
+ }
+
+ ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
+
+ }
+
+ return 1;
+}
+
+static ErtsPortTask *
+fetch_in_queue(Port *pp, ErtsPortTask **execqp)
+{
+ ErtsPortTask *ptp;
+ ErtsPortTaskHandleList *free_nshp = NULL;
+
+ erts_port_task_sched_lock(&pp->sched);
+
+ ptp = pp->sched.taskq.in.first;
+ pp->sched.taskq.in.first = NULL;
+ pp->sched.taskq.in.last = NULL;
+ if (ptp)
+ *execqp = ptp->u.alive.next;
+ else
+ erts_smp_atomic32_read_band_nob(&pp->sched.flags,
+ ~ERTS_PTS_FLG_HAVE_TASKS);
+
+
+ if (pp->sched.taskq.local.busy.nosuspend)
+ free_nshp = get_free_nosuspend_handles(pp);
+
+ erts_port_task_sched_unlock(&pp->sched);
+
+ if (free_nshp)
+ free_nosuspend_handles(free_nshp);
+
+ return ptp;
+}
+
+static ERTS_INLINE ErtsPortTask *
+select_task_for_exec(Port *pp,
+ ErtsPortTask **execqp,
+ int *processing_busy_q_p)
+{
+ ErtsPortTask *ptp;
+ erts_aint32_t flags;
+
+ flags = select_queue_for_exec(pp, execqp, processing_busy_q_p);
+
+ while (1) {
+ ptp = *execqp;
+ if (ptp)
+ *execqp = ptp->u.alive.next;
+ else {
+ ptp = fetch_in_queue(pp, execqp);
+ if (!ptp)
+ return NULL;
+ }
+ if (check_task_for_exec(pp, flags, execqp, processing_busy_q_p, ptp))
+ return ptp;
+ }
+}
+
/*
* Abort a scheduled task.
*/
@@ -358,6 +1030,12 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp)
&erts_port_task_outstanding_io_tasks) > 0);
erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks);
break;
+ case ERTS_PORT_TASK_PROC_SIG:
+ ptp->u.alive.td.psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT,
+ &ptp->u.alive.td.psig.data);
+ break;
default:
break;
}
@@ -373,6 +1051,88 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp)
return res;
}
+void
+erts_port_task_abort_nosuspend_tasks(Port *pp)
+{
+ ErtsPortTaskHandleList *abort_list;
+#ifdef ERTS_SMP
+ ErtsThrPrgrDelayHandle dhndl = ERTS_THR_PRGR_DHANDLE_INVALID;
+#endif
+
+ erts_port_task_sched_lock(&pp->sched);
+ erts_smp_atomic32_read_band_nob(&pp->sched.flags,
+ ~ERTS_PTS_FLG_HAVE_NS_TASKS);
+ abort_list = pp->sched.taskq.local.busy.nosuspend;
+ pp->sched.taskq.local.busy.nosuspend = NULL;
+ erts_port_task_sched_unlock(&pp->sched);
+
+ while (abort_list) {
+#ifdef DEBUG
+ ErtsPortTaskHandle *saved_pthp;
+#endif
+ ErtsPortTaskType type;
+ ErtsPortTaskTypeData td;
+ ErtsPortTaskHandle *pthp;
+ ErtsPortTask *ptp;
+ ErtsPortTaskHandleList *pthlp;
+ erts_aint32_t old_state;
+
+ pthlp = abort_list;
+ abort_list = pthlp->u.next;
+
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ dhndl = erts_thr_progress_unmanaged_delay();
+#endif
+
+ pthp = &pthlp->handle;
+ ptp = handle2task(pthp);
+ if (!ptp) {
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ erts_thr_progress_unmanaged_continue(dhndl);
+#endif
+ schedule_port_task_handle_list_free(pthlp);
+ continue;
+ }
+
+#ifdef DEBUG
+ saved_pthp = ptp->u.alive.handle;
+ ERTS_SMP_READ_MEMORY_BARRIER;
+ old_state = erts_smp_atomic32_read_nob(&ptp->state);
+ if (old_state == ERTS_PT_STATE_SCHEDULED) {
+ ASSERT(saved_pthp == pthp);
+ }
+#endif
+
+ old_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
+ ERTS_PT_STATE_ABORTED,
+ ERTS_PT_STATE_SCHEDULED);
+ if (old_state != ERTS_PT_STATE_SCHEDULED) {
+ /* Task already aborted, executing, or executed */
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ erts_thr_progress_unmanaged_continue(dhndl);
+#endif
+ schedule_port_task_handle_list_free(pthlp);
+ continue;
+ }
+
+ reset_port_task_handle(pthp);
+
+ type = ptp->type;
+ td = ptp->u.alive.td;
+
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ erts_thr_progress_unmanaged_continue(dhndl);
+#endif
+ schedule_port_task_handle_list_free(pthlp);
+
+ abort_nosuspend_task(type, &td);
+ }
+}
+
/*
* Schedule a task.
*/
@@ -416,18 +1176,23 @@ erts_port_task_schedule(Eterm id,
if (!pp)
goto fail;
- ptp = port_task_alloc();
+ if (type != ERTS_PORT_TASK_PROC_SIG) {
+ ptp = port_task_alloc();
- ptp->type = type;
+ ptp->type = type;
+ ptp->u.alive.flags = 0;
- erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);
+ erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);
+
+ set_handle(ptp, pthp);
+ }
switch (type) {
case ERTS_PORT_TASK_INPUT:
case ERTS_PORT_TASK_OUTPUT: {
va_list argp;
va_start(argp, type);
- ptp->u.alive.u.io.event = va_arg(argp, ErlDrvEvent);
+ ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent);
va_end(argp);
erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks);
break;
@@ -435,18 +1200,32 @@ erts_port_task_schedule(Eterm id,
case ERTS_PORT_TASK_EVENT: {
va_list argp;
va_start(argp, type);
- ptp->u.alive.u.io.event = va_arg(argp, ErlDrvEvent);
- ptp->u.alive.u.io.event_data = va_arg(argp, ErlDrvEventData);
+ ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent);
+ ptp->u.alive.td.io.event_data = va_arg(argp, ErlDrvEventData);
va_end(argp);
erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks);
break;
}
+ case ERTS_PORT_TASK_PROC_SIG: {
+ ErtsProc2PortSigData *sigdp;
+ va_list argp;
+ ASSERT(!pthp);
+ va_start(argp, type);
+ sigdp = va_arg(argp, ErtsProc2PortSigData *);
+ ptp = p2p_sig_data_to_task(sigdp);
+ ptp->u.alive.td.psig.callback = va_arg(argp, ErtsProc2PortSigCallback);
+ ptp->u.alive.flags |= va_arg(argp, int);
+ va_end(argp);
+ if (ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND)
+ save_nosuspend_handle(pp, ptp);
+ else
+ set_handle(ptp, pthp);
+ break;
+ }
default:
break;
}
- set_handle(ptp, pthp);
-
act = enqueue_task(pp, ptp);
if (act & ERTS_PTS_FLG_EXIT) {
reset_handle(ptp);
@@ -572,6 +1351,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
{
Port *pp;
ErtsPortTask *execq;
+ int processing_busy_q;
int res = 0;
int reds = ERTS_PORT_REDS_EXECUTE;
erts_aint_t io_tasks_executed = 0;
@@ -606,7 +1386,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
erts_smp_spin_unlock(&erts_sched_stat.lock);
}
- prepare_exec(pp, &execq);
+ prepare_exec(pp, &execq, &processing_busy_q);
erts_smp_port_lock(pp);
@@ -617,14 +1397,14 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
fpe_was_unmasked = erts_block_fpe();
- state = erts_atomic_read_nob(&pp->state);
+ state = erts_atomic32_read_nob(&pp->state);
goto begin_handle_tasks;
while (1) {
erts_aint32_t task_state;
ErtsPortTask *ptp;
- ptp = pop_task(pp, &execq);
+ ptp = select_task_for_exec(pp, &execq, &processing_busy_q);
if (!ptp)
break;
@@ -645,40 +1425,44 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
switch (ptp->type) {
case ERTS_PORT_TASK_TIMEOUT:
reds += ERTS_PORT_REDS_TIMEOUT;
- if (!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)) {
+ if (!(state & ERTS_PORT_SFLGS_DEAD)) {
DTRACE_DRIVER(driver_timeout, pp);
(*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data);
}
break;
case ERTS_PORT_TASK_INPUT:
reds += ERTS_PORT_REDS_INPUT;
- ASSERT((erts_atomic32_read_nob(&pp->state)
- & ERTS_PORT_SFLGS_DEAD) == 0);
+ ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_ready_input, pp);
/* NOTE some windows drivers use ->ready_input for input and output */
(*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data,
- ptp->u.alive.u.io.event);
+ ptp->u.alive.td.io.event);
io_tasks_executed++;
break;
case ERTS_PORT_TASK_OUTPUT:
reds += ERTS_PORT_REDS_OUTPUT;
- ASSERT((erts_atomic32_read_nob(&pp->state)
- & ERTS_PORT_SFLGS_DEAD) == 0);
+ ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_ready_output, pp);
(*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data,
- ptp->u.alive.u.io.event);
+ ptp->u.alive.td.io.event);
io_tasks_executed++;
break;
case ERTS_PORT_TASK_EVENT:
reds += ERTS_PORT_REDS_EVENT;
- ASSERT((erts_atomic32_read_nob(&pp->state)
- & ERTS_PORT_SFLGS_DEAD) == 0);
+ ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_event, pp);
(*pp->drv_ptr->event)((ErlDrvData) pp->drv_data,
- ptp->u.alive.u.io.event,
- ptp->u.alive.u.io.event_data);
+ ptp->u.alive.td.io.event,
+ ptp->u.alive.td.io.event_data);
io_tasks_executed++;
break;
+ case ERTS_PORT_TASK_PROC_SIG:
+ ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
+ reds += ptp->u.alive.td.psig.callback(pp,
+ state,
+ ERTS_PROC2PORT_SIG_EXEC,
+ &ptp->u.alive.td.psig.data);
+ break;
case ERTS_PORT_TASK_DIST_CMD:
reds += erts_dist_command(pp, CONTEXT_REDS-reds);
break;
@@ -740,7 +1524,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
ASSERT(runq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
#endif
- active = finalize_exec(pp, &execq);
+ active = finalize_exec(pp, &execq, processing_busy_q);
erts_port_release(pp);
@@ -858,24 +1642,30 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp)
break;
case ERTS_PORT_TASK_INPUT:
erts_stale_drv_select(pp->common.id,
- ptp->u.alive.u.io.event,
+ ptp->u.alive.td.io.event,
DO_READ,
1);
break;
case ERTS_PORT_TASK_OUTPUT:
erts_stale_drv_select(pp->common.id,
- ptp->u.alive.u.io.event,
+ ptp->u.alive.td.io.event,
DO_WRITE,
1);
break;
case ERTS_PORT_TASK_EVENT:
erts_stale_drv_select(pp->common.id,
- ptp->u.alive.u.io.event,
+ ptp->u.alive.td.io.event,
0,
1);
break;
case ERTS_PORT_TASK_DIST_CMD:
break;
+ case ERTS_PORT_TASK_PROC_SIG:
+ ptp->u.alive.td.psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT_CLOSED,
+ &ptp->u.alive.td.psig.data);
+ break;
default:
erl_exit(ERTS_ABORT_EXIT,
"Invalid port task type: %d\n",
@@ -944,4 +1734,5 @@ erts_port_task_init(void)
erts_smp_atomic_init_nob(&erts_port_task_outstanding_io_tasks,
(erts_aint_t) 0);
init_port_task_alloc();
+ init_busy_caller_table_alloc();
}