aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_port_task.h
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_port_task.h')
-rw-r--r--erts/emulator/beam/erl_port_task.h160
1 files changed, 145 insertions, 15 deletions
diff --git a/erts/emulator/beam/erl_port_task.h b/erts/emulator/beam/erl_port_task.h
index fd88b1c1ff..ae6cd69ae2 100644
--- a/erts/emulator/beam/erl_port_task.h
+++ b/erts/emulator/beam/erl_port_task.h
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 2006-2011. All Rights Reserved.
+ * Copyright Ericsson AB 2006-2012. All Rights Reserved.
*
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
@@ -27,6 +27,9 @@
#define ERTS_PORT_TASK_H_BASIC_TYPES__
#include "erl_sys_driver.h"
#include "erl_smp.h"
+#define ERL_PORT_GET_PORT_TYPE_ONLY__
+#include "erl_port.h"
+#undef ERL_PORT_GET_PORT_TYPE_ONLY__
typedef erts_smp_atomic_t ErtsPortTaskHandle;
#endif
@@ -43,13 +46,19 @@ typedef erts_smp_atomic_t ErtsPortTaskHandle;
#define ERTS_INCLUDE_SCHEDULER_INTERNALS
#endif
+#define ERTS_PT_FLG_WAIT_BUSY (1 << 0)
+#define ERTS_PT_FLG_SIG_DEP (1 << 1)
+#define ERTS_PT_FLG_NOSUSPEND (1 << 2)
+#define ERTS_PT_FLG_REF (1 << 3)
+#define ERTS_PT_FLG_BAD_OUTPUT (1 << 4)
+
typedef enum {
- ERTS_PORT_TASK_FREE,
ERTS_PORT_TASK_INPUT,
ERTS_PORT_TASK_OUTPUT,
ERTS_PORT_TASK_EVENT,
ERTS_PORT_TASK_TIMEOUT,
- ERTS_PORT_TASK_DIST_CMD
+ ERTS_PORT_TASK_DIST_CMD,
+ ERTS_PORT_TASK_PROC_SIG
} ErtsPortTaskType;
#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
@@ -57,19 +66,76 @@ typedef enum {
extern erts_smp_atomic_t erts_port_task_outstanding_io_tasks;
#endif
+#define ERTS_PTS_FLG_IN_RUNQ (((erts_aint32_t) 1) << 0)
+#define ERTS_PTS_FLG_EXEC (((erts_aint32_t) 1) << 1)
+#define ERTS_PTS_FLG_HAVE_TASKS (((erts_aint32_t) 1) << 2)
+#define ERTS_PTS_FLG_EXIT (((erts_aint32_t) 1) << 3)
+#define ERTS_PTS_FLG_BUSY_PORT (((erts_aint32_t) 1) << 4)
+#define ERTS_PTS_FLG_BUSY_PORT_Q (((erts_aint32_t) 1) << 5)
+#define ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q (((erts_aint32_t) 1) << 6)
+#define ERTS_PTS_FLG_HAVE_BUSY_TASKS (((erts_aint32_t) 1) << 7)
+#define ERTS_PTS_FLG_HAVE_NS_TASKS (((erts_aint32_t) 1) << 8)
+#define ERTS_PTS_FLG_PARALLELISM (((erts_aint32_t) 1) << 9)
+#define ERTS_PTS_FLG_FORCE_SCHED (((erts_aint32_t) 1) << 10)
+
+#define ERTS_PTS_FLGS_BUSY \
+ (ERTS_PTS_FLG_BUSY_PORT | ERTS_PTS_FLG_BUSY_PORT_Q)
+
+#define ERTS_PTS_FLGS_FORCE_SCHEDULE_OP \
+ (ERTS_PTS_FLG_EXIT \
+ | ERTS_PTS_FLG_HAVE_BUSY_TASKS \
+ | ERTS_PTS_FLG_HAVE_TASKS \
+ | ERTS_PTS_FLG_EXEC \
+ | ERTS_PTS_FLG_FORCE_SCHED)
+
+#define ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_HIGH 8192
+#define ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_LOW 4096
+
+typedef struct {
+ ErlDrvSizeT high;
+ erts_smp_atomic_t low;
+ erts_smp_atomic_t size;
+} ErtsPortTaskBusyPortQ;
+
typedef struct ErtsPortTask_ ErtsPortTask;
-typedef struct ErtsPortTaskQueue_ ErtsPortTaskQueue;
+typedef struct ErtsPortTaskBusyCallerTable_ ErtsPortTaskBusyCallerTable;
+typedef struct ErtsPortTaskHandleList_ ErtsPortTaskHandleList;
typedef struct {
Port *next;
- int in_runq;
- ErtsPortTaskQueue *taskq;
- ErtsPortTaskQueue *exe_taskq;
+ struct {
+ struct {
+ struct {
+ ErtsPortTask *first;
+ ErtsPortTask *last;
+ ErtsPortTaskBusyCallerTable *table;
+ ErtsPortTaskHandleList *nosuspend;
+ } busy;
+ ErtsPortTask *first;
+ } local;
+ struct {
+ ErtsPortTask *first;
+ ErtsPortTask *last;
+ } in;
+ ErtsPortTaskBusyPortQ *bpq;
+ } taskq;
+ erts_smp_atomic32_t flags;
+#ifdef ERTS_SMP
+ erts_mtx_t mtx;
+#endif
} ErtsPortTaskSched;
ERTS_GLB_INLINE void erts_port_task_handle_init(ErtsPortTaskHandle *pthp);
ERTS_GLB_INLINE int erts_port_task_is_scheduled(ErtsPortTaskHandle *pthp);
-ERTS_GLB_INLINE void erts_port_task_init_sched(ErtsPortTaskSched *ptsp);
+ERTS_GLB_INLINE void erts_port_task_pre_init_sched(ErtsPortTaskSched *ptsp,
+ ErtsPortTaskBusyPortQ *bpq);
+ERTS_GLB_INLINE void erts_port_task_init_sched(ErtsPortTaskSched *ptsp,
+ Eterm id);
+ERTS_GLB_INLINE void erts_port_task_fini_sched(ErtsPortTaskSched *ptsp);
+ERTS_GLB_INLINE void erts_port_task_sched_lock(ErtsPortTaskSched *ptsp);
+ERTS_GLB_INLINE void erts_port_task_sched_unlock(ErtsPortTaskSched *ptsp);
+ERTS_GLB_INLINE int erts_port_task_sched_lock_is_locked(ErtsPortTaskSched *ptsp);
+
#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
ERTS_GLB_INLINE int erts_port_task_have_outstanding_io_tasks(void);
#endif
@@ -88,13 +154,75 @@ erts_port_task_is_scheduled(ErtsPortTaskHandle *pthp)
return ((void *) erts_smp_atomic_read_nob(pthp)) != NULL;
}
+ERTS_GLB_INLINE void erts_port_task_pre_init_sched(ErtsPortTaskSched *ptsp,
+ ErtsPortTaskBusyPortQ *bpq)
+{
+ if (bpq) {
+ erts_aint_t low = (erts_aint_t) ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_LOW;
+ erts_smp_atomic_init_nob(&bpq->low, low);
+ bpq->high = (ErlDrvSizeT) ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_HIGH;
+ erts_smp_atomic_init_nob(&bpq->size, (erts_aint_t) 0);
+ }
+ ptsp->taskq.bpq = bpq;
+}
+
ERTS_GLB_INLINE void
-erts_port_task_init_sched(ErtsPortTaskSched *ptsp)
+erts_port_task_init_sched(ErtsPortTaskSched *ptsp, Eterm instr_id)
{
+#ifdef ERTS_SMP
+ char *lock_str = "port_sched_lock";
+#endif
ptsp->next = NULL;
- ptsp->in_runq = 0;
- ptsp->taskq = NULL;
- ptsp->exe_taskq = NULL;
+ ptsp->taskq.local.busy.first = NULL;
+ ptsp->taskq.local.busy.last = NULL;
+ ptsp->taskq.local.busy.table = NULL;
+ ptsp->taskq.local.busy.nosuspend = NULL;
+ ptsp->taskq.local.first = NULL;
+ ptsp->taskq.in.first = NULL;
+ ptsp->taskq.in.last = NULL;
+ erts_smp_atomic32_init_nob(&ptsp->flags, 0);
+#ifdef ERTS_SMP
+#ifdef ERTS_ENABLE_LOCK_COUNT
+ if (!(erts_lcnt_rt_options & ERTS_LCNT_OPT_PORTLOCK))
+ lock_str = NULL;
+#endif
+ erts_mtx_init_x(&ptsp->mtx, lock_str, instr_id);
+#endif
+}
+
+ERTS_GLB_INLINE void
+erts_port_task_sched_lock(ErtsPortTaskSched *ptsp)
+{
+#ifdef ERTS_SMP
+ erts_mtx_lock(&ptsp->mtx);
+#endif
+}
+
+ERTS_GLB_INLINE void
+erts_port_task_sched_unlock(ErtsPortTaskSched *ptsp)
+{
+#ifdef ERTS_SMP
+ erts_mtx_unlock(&ptsp->mtx);
+#endif
+}
+
+ERTS_GLB_INLINE int
+erts_port_task_sched_lock_is_locked(ErtsPortTaskSched *ptsp)
+{
+#if defined(ERTS_SMP) && defined(ERTS_ENABLE_LOCK_CHECK)
+ return erts_lc_mtx_is_locked(&ptsp->mtx);
+#else
+ return 0;
+#endif
+}
+
+
+ERTS_GLB_INLINE void
+erts_port_task_fini_sched(ErtsPortTaskSched *ptsp)
+{
+#ifdef ERTS_SMP
+ erts_mtx_destroy(&ptsp->mtx);
+#endif
}
#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
@@ -115,14 +243,16 @@ int erts_port_task_execute(ErtsRunQueue *, Port **);
void erts_port_task_init(void);
#endif
-int erts_port_task_abort(Eterm id, ErtsPortTaskHandle *);
+int erts_port_task_abort(ErtsPortTaskHandle *);
+void erts_port_task_abort_nosuspend_tasks(Port *);
+
int erts_port_task_schedule(Eterm,
ErtsPortTaskHandle *,
ErtsPortTaskType,
- ErlDrvEvent,
- ErlDrvEventData);
+ ...);
void erts_port_task_free_port(Port *);
int erts_port_is_scheduled(Port *);
+ErtsProc2PortSigData *erts_port_task_alloc_p2p_sig_data(void);
#ifdef ERTS_SMP
void erts_enqueue_port(ErtsRunQueue *rq, Port *pp);