aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/emulator/beam/dist.c2
-rw-r--r--erts/emulator/beam/dist.h9
-rw-r--r--erts/emulator/beam/erl_alloc.types1
-rw-r--r--erts/emulator/beam/erl_lock_check.c4
-rw-r--r--erts/emulator/beam/erl_port.h10
-rw-r--r--erts/emulator/beam/erl_port_task.c1022
-rw-r--r--erts/emulator/beam/erl_port_task.h71
-rw-r--r--erts/emulator/beam/erl_ptab.c16
-rw-r--r--erts/emulator/beam/io.c97
-rw-r--r--erts/emulator/beam/sys.h14
-rw-r--r--erts/emulator/sys/common/erl_check_io.c8
11 files changed, 614 insertions, 640 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 13715034cb..432b842848 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -458,7 +458,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
&& erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid)));
if (erts_port_task_is_scheduled(&dep->dist_cmd))
- erts_port_task_abort(dep->cid, &dep->dist_cmd);
+ erts_port_task_abort(&dep->dist_cmd);
if (dep->status & ERTS_DE_SFLG_EXITING) {
#ifdef DEBUG
diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h
index 7de8786c4a..2bc3d9c881 100644
--- a/erts/emulator/beam/dist.h
+++ b/erts/emulator/beam/dist.h
@@ -204,13 +204,8 @@ void erts_schedule_dist_command(Port *prt, DistEntry *dist_entry)
id = dep->cid;
}
- if (!erts_smp_atomic_xchg_mb(&dep->dist_cmd_scheduled, 1)) {
- (void) erts_port_task_schedule(id,
- &dep->dist_cmd,
- ERTS_PORT_TASK_DIST_CMD,
- (ErlDrvEvent) -1,
- NULL);
- }
+ if (!erts_smp_atomic_xchg_mb(&dep->dist_cmd_scheduled, 1))
+ erts_port_task_schedule(id, &dep->dist_cmd, ERTS_PORT_TASK_DIST_CMD);
}
#endif
diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types
index 92ba15214e..36beec9553 100644
--- a/erts/emulator/beam/erl_alloc.types
+++ b/erts/emulator/beam/erl_alloc.types
@@ -233,7 +233,6 @@ type DDLL_HANDLE STANDARD SYSTEM ddll_handle
type DDLL_ERRCODES LONG_LIVED SYSTEM ddll_errcodes
type DDLL_TMP_BUF TEMPORARY SYSTEM ddll_tmp_buf
type PORT_TASK SHORT_LIVED SYSTEM port_task
-type PORT_TASKQ SHORT_LIVED SYSTEM port_task_queue
type MISC_OP_LIST SHORT_LIVED SYSTEM misc_op_list
type PORT_NAMES SHORT_LIVED SYSTEM port_names
type PORT_DATA_LOCK DRIVER SYSTEM port_data_lock
diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c
index 06203a706d..2505c094ab 100644
--- a/erts/emulator/beam/erl_lock_check.c
+++ b/erts/emulator/beam/erl_lock_check.c
@@ -152,12 +152,10 @@ static erts_lc_lock_order_t erts_lock_order[] = {
{ "pmmap", NULL },
#endif
#ifdef ERTS_SMP
+ { "port_sched_lock", "port_id" },
{ "port_task_pre_alloc_lock", "address" },
- { "port_taskq_pre_alloc_lock", "address" },
{ "proclist_pre_alloc_lock", "address" },
- { "port_tasks_lock", NULL },
{ "port_table", NULL },
- { "port_state", "address" },
{ "xports_list_pre_alloc_lock", "address" },
{ "inet_buffer_stack_lock", NULL },
{ "gc_info", NULL },
diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h
index 3de0eed9ea..ce8d809245 100644
--- a/erts/emulator/beam/erl_port.h
+++ b/erts/emulator/beam/erl_port.h
@@ -224,13 +224,12 @@ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */
/* Immortal port (only certain system ports) */
#define ERTS_PORT_SFLG_IMMORTAL ((Uint32) (1 << 9))
#define ERTS_PORT_SFLG_FREE ((Uint32) (1 << 10))
-#define ERTS_PORT_SFLG_FREE_SCHEDULED ((Uint32) (1 << 11))
-#define ERTS_PORT_SFLG_INITIALIZING ((Uint32) (1 << 12))
+#define ERTS_PORT_SFLG_INITIALIZING ((Uint32) (1 << 11))
/* Port uses port specific locking (opposed to driver specific locking) */
-#define ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK ((Uint32) (1 << 13))
-#define ERTS_PORT_SFLG_INVALID ((Uint32) (1 << 14))
+#define ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK ((Uint32) (1 << 12))
+#define ERTS_PORT_SFLG_INVALID ((Uint32) (1 << 13))
/* Last port to terminate halts the emulator */
-#define ERTS_PORT_SFLG_HALT ((Uint32) (1 << 15))
+#define ERTS_PORT_SFLG_HALT ((Uint32) (1 << 14))
#ifdef DEBUG
/* Only debug: make sure all flags aren't cleared unintentionally */
#define ERTS_PORT_SFLG_PORT_DEBUG ((Uint32) (1 << 31))
@@ -239,7 +238,6 @@ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */
/* Combinations of port status flags */
#define ERTS_PORT_SFLGS_DEAD \
(ERTS_PORT_SFLG_FREE \
- | ERTS_PORT_SFLG_FREE_SCHEDULED \
| ERTS_PORT_SFLG_INITIALIZING)
#define ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP \
(ERTS_PORT_SFLGS_DEAD | ERTS_PORT_SFLG_INVALID)
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index 112d27c94e..edcd3112da 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -33,6 +33,7 @@
#include "erl_port_task.h"
#include "dist.h"
#include "dtrace-wrapper.h"
+#include <stdarg.h>
#if defined(DEBUG) && 0
#define HARD_DEBUG
@@ -41,18 +42,13 @@
/*
* Costs in reductions for some port operations.
*/
-#define ERTS_PORT_REDS_EXECUTE 0
-#define ERTS_PORT_REDS_FREE 50
-#define ERTS_PORT_REDS_TIMEOUT 200
-#define ERTS_PORT_REDS_INPUT 200
-#define ERTS_PORT_REDS_OUTPUT 200
-#define ERTS_PORT_REDS_EVENT 200
-#define ERTS_PORT_REDS_TERMINATE 100
-
-
-#define ERTS_PORT_TASK_INVALID_PORT(P, ID) \
- ((erts_atomic32_read_acqb(&(P)->state) & ERTS_PORT_SFLGS_DEAD) \
- || (P)->common.id != (ID))
+#define ERTS_PORT_REDS_EXECUTE 10
+#define ERTS_PORT_REDS_FREE 100
+#define ERTS_PORT_REDS_TIMEOUT 400
+#define ERTS_PORT_REDS_INPUT 400
+#define ERTS_PORT_REDS_OUTPUT 400
+#define ERTS_PORT_REDS_EVENT 400
+#define ERTS_PORT_REDS_TERMINATE 200
#ifdef USE_VM_PROBES
#define DTRACE_DRIVER(PROBE_NAME, PP) \
@@ -70,82 +66,87 @@
erts_smp_atomic_t erts_port_task_outstanding_io_tasks;
-struct ErtsPortTaskQueue_ {
- ErtsPortTask *first;
- ErtsPortTask *last;
-};
+#define ERTS_PT_STATE_SCHEDULED 0
+#define ERTS_PT_STATE_ABORTED 1
+#define ERTS_PT_STATE_EXECUTING 2
struct ErtsPortTask_ {
- ErtsPortTask *prev;
- ErtsPortTask *next;
- ErtsPortTaskQueue *queue;
- ErtsPortTaskHandle *handle;
+ erts_smp_atomic32_t state;
ErtsPortTaskType type;
- ErlDrvEvent event;
- ErlDrvEventData event_data;
+ union {
+ struct {
+ ErtsPortTask *next;
+ ErtsPortTaskHandle *handle;
+ union {
+ struct { /* I/O tasks */
+ ErlDrvEvent event;
+ ErlDrvEventData event_data;
+ } io;
+ } u;
+ } alive;
+ ErtsThrPrgrLaterOp release;
+ } u;
};
-#ifdef HARD_DEBUG
-#define ERTS_PT_CHK_PORTQ(RQ) check_port_queue((RQ), NULL, 0)
-#define ERTS_PT_CHK_PRES_PORTQ(RQ, PP) check_port_queue((RQ), (PP), -1)
-#define ERTS_PT_CHK_IN_PORTQ(RQ, PP) check_port_queue((RQ), (PP), 1)
-#define ERTS_PT_CHK_NOT_IN_PORTQ(RQ, PP) check_port_queue((RQ), (PP), 0)
-#define ERTS_PT_CHK_TASKQ(Q) check_task_queue((Q), NULL, 0)
-#define ERTS_PT_CHK_IN_TASKQ(Q, T) check_task_queue((Q), (T), 1)
-#define ERTS_PT_CHK_NOT_IN_TASKQ(Q, T) check_task_queue((Q), (T), 0)
-static void
-check_port_queue(Port *chk_pp, int inq);
-static void
-check_task_queue(ErtsPortTaskQueue *ptqp,
- ErtsPortTask *chk_ptp,
- int inq);
-#else
-#define ERTS_PT_CHK_PORTQ(RQ)
-#define ERTS_PT_CHK_PRES_PORTQ(RQ, PP)
-#define ERTS_PT_CHK_IN_PORTQ(RQ, PP)
-#define ERTS_PT_CHK_NOT_IN_PORTQ(RQ, PP)
-#define ERTS_PT_CHK_TASKQ(Q)
-#define ERTS_PT_CHK_IN_TASKQ(Q, T)
-#define ERTS_PT_CHK_NOT_IN_TASKQ(Q, T)
-#endif
-
-static void handle_remaining_tasks(ErtsRunQueue *runq, Port *pp);
+static void begin_port_cleanup(Port *pp, ErtsPortTask **execq);
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(port_task,
ErtsPortTask,
- 200,
+ 1000,
ERTS_ALC_T_PORT_TASK)
-ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(port_taskq,
- ErtsPortTaskQueue,
- 50,
- ERTS_ALC_T_PORT_TASKQ)
+
+#ifdef ERTS_SMP
+static void
+call_port_task_free(void *vptp)
+{
+ port_task_free((ErtsPortTask *) vptp);
+}
+#endif
+
+static ERTS_INLINE void
+schedule_port_task_free(ErtsPortTask *ptp)
+{
+#ifdef ERTS_SMP
+ erts_schedule_thr_prgr_later_op(call_port_task_free,
+ (void *) ptp,
+ &ptp->u.release);
+#else
+ port_task_free(ptp);
+#endif
+}
/*
* Task handle manipulation.
*/
+static ERTS_INLINE void
+reset_port_task_handle(ErtsPortTaskHandle *pthp)
+{
+ erts_smp_atomic_set_relb(pthp, (erts_aint_t) NULL);
+}
+
static ERTS_INLINE ErtsPortTask *
handle2task(ErtsPortTaskHandle *pthp)
{
- return (ErtsPortTask *) erts_smp_atomic_read_nob(pthp);
+ return (ErtsPortTask *) erts_smp_atomic_read_acqb(pthp);
}
static ERTS_INLINE void
reset_handle(ErtsPortTask *ptp)
{
- if (ptp->handle) {
- ASSERT(ptp == handle2task(ptp->handle));
- erts_smp_atomic_set_nob(ptp->handle, (erts_aint_t) NULL);
+ if (ptp->u.alive.handle) {
+ ASSERT(ptp == handle2task(ptp->u.alive.handle));
+ reset_port_task_handle(ptp->u.alive.handle);
}
}
static ERTS_INLINE void
set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
{
- ptp->handle = pthp;
+ ptp->u.alive.handle = pthp;
if (pthp) {
- erts_smp_atomic_set_nob(pthp, (erts_aint_t) ptp);
- ASSERT(ptp == handle2task(ptp->handle));
+ erts_smp_atomic_set_relb(pthp, (erts_aint_t) ptp);
+ ASSERT(ptp == handle2task(ptp->u.alive.handle));
}
}
@@ -158,7 +159,6 @@ enqueue_port(ErtsRunQueue *runq, Port *pp)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
pp->sched.next = NULL;
- pp->sched.in_runq = 1;
if (runq->ports.end) {
ASSERT(runq->ports.start);
runq->ports.end->sched.next = pp;
@@ -196,287 +196,181 @@ pop_port(ErtsRunQueue *runq)
return pp;
}
+/*
+ * Task queue operations
+ */
-#ifdef HARD_DEBUG
-
-static void
-check_port_queue(ErtsRunQueue *runq, Port *chk_pp, int inq)
+static ERTS_INLINE erts_aint32_t
+enqueue_task(Port *pp, ErtsPortTask *ptp)
{
- Port *pp;
- Port *last_pp;
- Port *first_pp = runq->ports.start;
- int no_forward = 0, no_backward = 0;
- int found_forward = 0, found_backward = 0;
- if (!first_pp) {
- ASSERT(!runq->ports.end);
- }
- else {
- ASSERT(!first_pp->sched.prev);
- for (pp = first_pp; pp; pp = pp->sched.next) {
- ASSERT(pp->sched.taskq);
- if (pp->sched.taskq->first)
- no_forward++;
- if (chk_pp == pp)
- found_forward = 1;
- if (!pp->sched.prev) {
- ASSERT(first_pp == pp);
- }
- if (!pp->sched.next) {
- ASSERT(runq->ports.end == pp);
- last_pp = pp;
- }
- }
- for (pp = last_pp; pp; pp = pp->sched.prev) {
- ASSERT(pp->sched.taskq);
- if (pp->sched.taskq->last)
- no_backward++;
- if (chk_pp == pp)
- found_backward = 1;
- if (!pp->sched.prev) {
- ASSERT(first_pp == pp);
- }
- if (!pp->sched.next) {
- ASSERT(runq->ports.end == pp);
- }
- check_task_queue(pp->sched.taskq, NULL, 0);
- }
- ASSERT(no_forward == no_backward);
- }
- ASSERT(no_forward == RUNQ_READ_LEN(&runq->ports.info.len));
- if (chk_pp) {
- if (chk_pp->sched.taskq || chk_pp->sched.exe_taskq) {
- ASSERT(chk_pp->sched.taskq != chk_pp->sched.exe_taskq);
- }
- ASSERT(!chk_pp->sched.taskq || chk_pp->sched.taskq->first);
- if (inq < 0)
- inq = chk_pp->sched.taskq && !chk_pp->sched.exe_taskq;
- if (inq) {
- ASSERT(found_forward && found_backward);
+ erts_aint32_t flags;
+ ptp->u.alive.next = NULL;
+ erts_port_task_sched_lock(&pp->sched);
+ flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ if (!(flags & ERTS_PTS_FLG_EXIT)) {
+ if (pp->sched.taskq.in.last) {
+ ASSERT(pp->sched.taskq.in.first);
+ ASSERT(!pp->sched.taskq.in.last->u.alive.next);
+
+ pp->sched.taskq.in.last->u.alive.next = ptp;
}
else {
- ASSERT(!found_forward && !found_backward);
+ ASSERT(!pp->sched.taskq.in.first);
+
+ pp->sched.taskq.in.first = ptp;
}
+ pp->sched.taskq.in.last = ptp;
}
+ erts_port_task_sched_unlock(&pp->sched);
+ return flags;
}
-#endif
-
-/*
- * Task queue operations
- */
-
-static ERTS_INLINE ErtsPortTaskQueue *
-port_taskq_init(ErtsPortTaskQueue *ptqp, Port *pp)
+static ERTS_INLINE ErtsPortTask *
+pop_task(Port *pp, ErtsPortTask **execqp)
{
- if (ptqp) {
- ptqp->first = NULL;
- ptqp->last = NULL;
- }
- return ptqp;
-}
+ ErtsPortTask *ptp;
-static ERTS_INLINE void
-enqueue_task(ErtsPortTaskQueue *ptqp, ErtsPortTask *ptp)
-{
- ERTS_PT_CHK_NOT_IN_TASKQ(ptqp, ptp);
- ptp->next = NULL;
- ptp->prev = ptqp->last;
- ptp->queue = ptqp;
- if (ptqp->last) {
- ASSERT(ptqp->first);
- ptqp->last->next = ptp;
- }
- else {
- ASSERT(!ptqp->first);
- ptqp->first = ptp;
+ ptp = *execqp;
+ if (ptp) {
+ *execqp = ptp->u.alive.next;
+ return ptp;
}
- ptqp->last = ptp;
- ERTS_PT_CHK_IN_TASKQ(ptqp, ptp);
+
+ ASSERT(!pp->sched.taskq.local);
+
+ erts_port_task_sched_lock(&pp->sched);
+ ptp = pp->sched.taskq.in.first;
+ pp->sched.taskq.in.first = NULL;
+ pp->sched.taskq.in.last = NULL;
+ if (ptp)
+ *execqp = ptp->u.alive.next;
+ else
+ erts_smp_atomic32_read_band_nob(&pp->sched.flags,
+ ~ERTS_PTS_FLG_HAVE_TASKS);
+ erts_port_task_sched_unlock(&pp->sched);
+
+
+ return ptp;
}
static ERTS_INLINE void
-push_task(ErtsPortTaskQueue *ptqp, ErtsPortTask *ptp)
+prepare_exec(Port *pp, ErtsPortTask **execqp)
{
- ERTS_PT_CHK_NOT_IN_TASKQ(ptqp, ptp);
- ptp->next = ptqp->first;
- ptp->prev = NULL;
- ptp->queue = ptqp;
- if (ptqp->first) {
- ASSERT(ptqp->last);
- ptqp->first->prev = ptp;
- }
- else {
- ASSERT(!ptqp->last);
- ptqp->last = ptp;
+ erts_aint32_t act;
+ *execqp = pp->sched.taskq.local;
+
+ /* guess a likely value */
+ act = ERTS_PTS_FLG_HAVE_TASKS|ERTS_PTS_FLG_IN_RUNQ;
+
+ while (1) {
+ erts_aint32_t new, exp;
+
+ new = exp = act;
+
+ new &= ~ERTS_PTS_FLG_IN_RUNQ;
+ new |= ERTS_PTS_FLG_EXEC;
+
+ act = erts_smp_atomic32_cmpxchg_nob(&pp->sched.flags, new, exp);
+
+ ASSERT(act & ERTS_PTS_FLG_IN_RUNQ);
+
+ if (exp == act)
+ break;
}
- ptqp->first = ptp;
- ERTS_PT_CHK_IN_TASKQ(ptqp, ptp);
}
-static ERTS_INLINE void
-dequeue_task(ErtsPortTask *ptp)
+/* finalize_exec() return value != 0 if port should remain active */
+static ERTS_INLINE int
+finalize_exec(Port *pp, ErtsPortTask **execq)
{
- ASSERT(ptp);
- ASSERT(ptp->queue);
- ERTS_PT_CHK_IN_TASKQ(ptp->queue, ptp);
- if (ptp->next)
- ptp->next->prev = ptp->prev;
- else {
- ASSERT(ptp->queue->last == ptp);
- ptp->queue->last = ptp->prev;
- }
- if (ptp->prev)
- ptp->prev->next = ptp->next;
- else {
- ASSERT(ptp->queue->first == ptp);
- ptp->queue->first = ptp->next;
- }
+ erts_aint32_t act;
- ASSERT(ptp->queue->first || !ptp->queue->last);
- ASSERT(ptp->queue->last || !ptp->queue->first);
- ERTS_PT_CHK_NOT_IN_TASKQ(ptp->queue, ptp);
-}
+ pp->sched.taskq.local = *execq;
+ *execq = NULL;
-static ERTS_INLINE ErtsPortTask *
-pop_task(ErtsPortTaskQueue *ptqp)
-{
- ErtsPortTask *ptp = ptqp->first;
- if (!ptp) {
- ASSERT(!ptqp->last);
- }
- else {
- ERTS_PT_CHK_IN_TASKQ(ptqp, ptp);
- ASSERT(!ptp->prev);
- ptqp->first = ptp->next;
- if (ptqp->first)
- ptqp->first->prev = NULL;
- else {
- ASSERT(ptqp->last == ptp);
- ptqp->last = NULL;
- }
- ASSERT(ptp->queue->first || !ptp->queue->last);
- ASSERT(ptp->queue->last || !ptp->queue->first);
- }
- ERTS_PT_CHK_NOT_IN_TASKQ(ptqp, ptp);
- return ptp;
-}
+ /* guess a likely value */
+ act = ERTS_PTS_FLG_EXEC;
+ if (execq)
+ act |= ERTS_PTS_FLG_HAVE_TASKS;
-#ifdef HARD_DEBUG
+ while (1) {
+ erts_aint32_t new, exp;
-static void
-check_task_queue(ErtsPortTaskQueue *ptqp,
- ErtsPortTask *chk_ptp,
- int inq)
-{
- ErtsPortTask *ptp;
- ErtsPortTask *last_ptp;
- ErtsPortTask *first_ptp = ptqp->first;
- int found_forward = 0, found_backward = 0;
- if (!first_ptp) {
- ASSERT(!ptqp->last);
- }
- else {
- ASSERT(!first_ptp->prev);
- for (ptp = first_ptp; ptp; ptp = ptp->next) {
- ASSERT(ptp->queue == ptqp);
- if (chk_ptp == ptp)
- found_forward = 1;
- if (!ptp->prev) {
- ASSERT(first_ptp == ptp);
- }
- if (!ptp->next) {
- ASSERT(ptqp->last == ptp);
- last_ptp = ptp;
- }
- }
- for (ptp = last_ptp; ptp; ptp = ptp->prev) {
- ASSERT(ptp->queue == ptqp);
- if (chk_ptp == ptp)
- found_backward = 1;
- if (!ptp->prev) {
- ASSERT(first_ptp == ptp);
- }
- if (!ptp->next) {
- ASSERT(ptqp->last == ptp);
- }
- }
- }
- if (chk_ptp) {
- if (inq) {
- ASSERT(found_forward && found_backward);
- }
- else {
- ASSERT(!found_forward && !found_backward);
- }
+ new = exp = act;
+
+ new &= ~ERTS_PTS_FLG_EXEC;
+ if (act & ERTS_PTS_FLG_HAVE_TASKS)
+ new |= ERTS_PTS_FLG_IN_RUNQ;
+
+ act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
+
+ ASSERT(!(act & ERTS_PTS_FLG_IN_RUNQ));
+
+ if (exp == act)
+ break;
}
+
+ return (act & ERTS_PTS_FLG_HAVE_TASKS) != 0;
}
-#endif
/*
* Abort a scheduled task.
*/
int
-erts_port_task_abort(Eterm id, ErtsPortTaskHandle *pthp)
+erts_port_task_abort(ErtsPortTaskHandle *pthp)
{
- ErtsRunQueue *runq;
- ErtsPortTaskQueue *ptqp;
+ int res;
ErtsPortTask *ptp;
- Port *pp;
-
- pp = erts_port_lookup_raw(id);
- if (!pp)
- return 1;
-
- runq = erts_port_runq(pp);
- if (!runq)
- return 1;
+#ifdef ERTS_SMP
+ ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay();
+#endif
ptp = handle2task(pthp);
+ if (!ptp)
+ res = -1;
+ else {
+ erts_aint32_t old_state;
+
+#ifdef DEBUG
+ ErtsPortTaskHandle *saved_pthp = ptp->u.alive.handle;
+ ERTS_SMP_READ_MEMORY_BARRIER;
+ old_state = erts_smp_atomic32_read_nob(&ptp->state);
+ if (old_state == ERTS_PT_STATE_SCHEDULED) {
+ ASSERT(saved_pthp == pthp);
+ }
+#endif
- if (!ptp) {
- erts_smp_runq_unlock(runq);
- return 1;
- }
-
- ASSERT(ptp->handle == pthp);
- ptqp = ptp->queue;
- ASSERT(pp == ptqp->port);
+ old_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
+ ERTS_PT_STATE_ABORTED,
+ ERTS_PT_STATE_SCHEDULED);
+ if (old_state != ERTS_PT_STATE_SCHEDULED)
+ res = - 1; /* Task already aborted, executing, or executed */
+ else {
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
- ASSERT(ptqp);
- ASSERT(ptqp->first);
+ reset_port_task_handle(pthp);
- dequeue_task(ptp);
- reset_handle(ptp);
+ switch (ptp->type) {
+ case ERTS_PORT_TASK_INPUT:
+ case ERTS_PORT_TASK_OUTPUT:
+ case ERTS_PORT_TASK_EVENT:
+ ASSERT(erts_smp_atomic_read_nob(
+ &erts_port_task_outstanding_io_tasks) > 0);
+ erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks);
+ break;
+ default:
+ break;
+ }
- switch (ptp->type) {
- case ERTS_PORT_TASK_INPUT:
- case ERTS_PORT_TASK_OUTPUT:
- case ERTS_PORT_TASK_EVENT:
- ASSERT(erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) > 0);
- erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks);
- break;
- default:
- break;
+ res = 0;
+ }
}
- ASSERT(ptqp == pp->sched.taskq || ptqp == pp->sched.exe_taskq);
-
- if (ptqp->first || pp->sched.taskq != ptqp)
- ptqp = NULL;
- else
- pp->sched.taskq = NULL;
-
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
-
- erts_smp_runq_unlock(runq);
-
- port_task_free(ptp);
- if (ptqp)
- port_taskq_free(ptqp);
+#ifdef ERTS_SMP
+ erts_thr_progress_unmanaged_continue(dhndl);
+#endif
- return 0;
+ return res;
}
/*
@@ -487,242 +381,209 @@ int
erts_port_task_schedule(Eterm id,
ErtsPortTaskHandle *pthp,
ErtsPortTaskType type,
- ErlDrvEvent event,
- ErlDrvEventData event_data)
+ ...)
{
+#ifdef ERTS_SMP
+ ErtsRunQueue *xrunq;
+ ErtsThrPrgrDelayHandle dhndl;
+#endif
ErtsRunQueue *runq;
Port *pp;
- ErtsPortTask *ptp;
- int enq_port = 0;
-
- /*
- * NOTE: We might not have the port lock here. We are only
- * allowed to access the 'sched', 'tab_status',
- * and 'id' fields of the port struct while
- * tasks_lock is held.
- */
+ ErtsPortTask *ptp = NULL;
+ erts_aint32_t act;
if (pthp && erts_port_task_is_scheduled(pthp)) {
ASSERT(0);
- erts_port_task_abort(id, pthp);
+ erts_port_task_abort(pthp);
}
- ptp = port_task_alloc();
-
ASSERT(is_internal_port(id));
- pp = erts_port_lookup_raw(id);
- if (!pp)
- return -1;
-
- runq = erts_port_runq(pp);
-
- if (!runq || ERTS_PORT_TASK_INVALID_PORT(pp, id)) {
- if (runq)
- erts_smp_runq_unlock(runq);
- return -1;
- }
-
- ASSERT(!erts_port_task_is_scheduled(pthp));
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
+#ifdef ERTS_SMP
+ dhndl = erts_thr_progress_unmanaged_delay();
+#endif
- if (!pp->sched.taskq) {
- pp->sched.taskq = port_taskq_init(port_taskq_alloc(), pp);
- enq_port = !pp->sched.in_runq && !pp->sched.exe_taskq;
- }
+ pp = erts_port_lookup_raw(id);
#ifdef ERTS_SMP
- if (enq_port) {
- ErtsRunQueue *xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
- if (xrunq) {
- /* Port emigrated ... */
- erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq);
- erts_smp_runq_unlock(runq);
- runq = erts_port_runq(pp);
- if (!runq)
- return -1;
- }
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
+ if (pp)
+ erts_port_inc_refc(pp);
+ erts_thr_progress_unmanaged_continue(dhndl);
}
#endif
- ASSERT(pp->sched.taskq);
- ASSERT(ptp);
+ if (!pp)
+ goto fail;
+
+ ptp = port_task_alloc();
ptp->type = type;
- ptp->event = event;
- ptp->event_data = event_data;
- set_handle(ptp, pthp);
+ erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);
switch (type) {
- case ERTS_PORT_TASK_FREE:
- erl_exit(ERTS_ABORT_EXIT,
- "erts_port_task_schedule(): Cannot schedule free task\n");
- break;
case ERTS_PORT_TASK_INPUT:
- case ERTS_PORT_TASK_OUTPUT:
- case ERTS_PORT_TASK_EVENT:
+ case ERTS_PORT_TASK_OUTPUT: {
+ va_list argp;
+ va_start(argp, type);
+ ptp->u.alive.u.io.event = va_arg(argp, ErlDrvEvent);
+ va_end(argp);
erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks);
- /* Fall through... */
+ break;
+ }
+ case ERTS_PORT_TASK_EVENT: {
+ va_list argp;
+ va_start(argp, type);
+ ptp->u.alive.u.io.event = va_arg(argp, ErlDrvEvent);
+ ptp->u.alive.u.io.event_data = va_arg(argp, ErlDrvEventData);
+ va_end(argp);
+ erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks);
+ break;
+ }
default:
- enqueue_task(pp->sched.taskq, ptp);
break;
}
-#ifndef ERTS_SMP
- /*
- * When (!enq_port && !pp->sched.exe_taskq) is true in the smp case,
- * the port might not be in the run queue. If this is the case, another
- * thread is in the process of enqueueing the port. This very seldom
- * occur, but do occur and is a valid scenario. Debug info showing this
- * enqueue in progress must be introduced before we can enable (modified
- * versions of these) assertions in the smp case again.
- */
-#if defined(HARD_DEBUG)
- if (pp->sched.exe_taskq || enq_port)
- ERTS_PT_CHK_NOT_IN_PORTQ(runq, pp);
- else
- ERTS_PT_CHK_IN_PORTQ(runq, pp);
-#elif defined(DEBUG)
- if (!enq_port && !pp->sched.exe_taskq) {
- /* We should be in port run q */
- ASSERT(pp->sched.in_runq);
- }
-#endif
-#endif
+ set_handle(ptp, pthp);
- if (!enq_port) {
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
- erts_smp_runq_unlock(runq);
+ act = enqueue_task(pp, ptp);
+ if (act & ERTS_PTS_FLG_EXIT) {
+ reset_handle(ptp);
+ goto fail;
}
- else {
- enqueue_port(runq, pp);
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
-
- if (erts_system_profile_flags.runnable_ports) {
- profile_runnable_port(pp, am_active);
+
+ while (1) {
+ erts_aint32_t new, exp;
+
+ if ((act & ERTS_PTS_FLG_HAVE_TASKS)
+ && (act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
+ goto done; /* Done */
+
+ new = exp = act;
+ new |= ERTS_PTS_FLG_HAVE_TASKS;
+ if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
+ new |= ERTS_PTS_FLG_IN_RUNQ;
+
+ act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
+
+ if (exp == act) {
+ if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
+ break; /* Need to enqueue port */
+ goto done; /* Done */
}
+ if (act & ERTS_PTS_FLG_EXIT)
+ goto done; /* Died after our task insert... */
+ }
+
+ /* Enqueue port on run-queue */
+
+ runq = erts_port_runq(pp);
+ if (!runq)
+ ERTS_INTERNAL_ERROR("Missing run-queue");
+
+#ifdef ERTS_SMP
+ xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
+ if (xrunq) {
+ /* Port emigrated ... */
+ erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq);
erts_smp_runq_unlock(runq);
+ runq = erts_port_runq(pp);
+ if (!runq)
+ ERTS_INTERNAL_ERROR("Missing run-queue");
+ }
+#endif
- erts_smp_notify_inc_runq(runq);
+ enqueue_port(runq, pp);
+
+ if (erts_system_profile_flags.runnable_ports) {
+ profile_runnable_port(pp, am_active);
}
+
+ erts_smp_runq_unlock(runq);
+
+ erts_smp_notify_inc_runq(runq);
+
+done:
+
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ erts_port_dec_refc(pp);
+#endif
+
return 0;
+
+fail:
+
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ erts_port_dec_refc(pp);
+#endif
+
+ if (ptp)
+ port_task_free(ptp);
+
+ return -1;
}
void
erts_port_task_free_port(Port *pp)
{
+ erts_aint32_t flags;
ErtsRunQueue *runq;
- ErtsPortTaskQueue *ptqp;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
+
runq = erts_port_runq(pp);
- ASSERT(runq);
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
- ptqp = pp->sched.exe_taskq;
- if (ptqp) {
- /* I (this thread) am currently executing this port, free it
- when scheduled out... */
- ErtsPortTask *ptp;
- enqueue_free:
- ptp = port_task_alloc();
- erts_atomic32_read_bset_relb(&pp->state,
- (ERTS_PORT_SFLG_CLOSING
- | ERTS_PORT_SFLG_FREE_SCHEDULED),
- ERTS_PORT_SFLG_FREE_SCHEDULED);
- ptp->type = ERTS_PORT_TASK_FREE;
- ptp->event = (ErlDrvEvent) -1;
- ptp->event_data = NULL;
- set_handle(ptp, NULL);
- push_task(ptqp, ptp);
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
- erts_smp_runq_unlock(runq);
- }
- else {
- if (pp->sched.in_runq) {
- ptqp = pp->sched.taskq;
- if (!ptqp)
- pp->sched.taskq = ptqp = port_taskq_init(port_taskq_alloc(), pp);
- goto enqueue_free;
- }
- ASSERT(!pp->sched.taskq);
- erts_atomic32_read_bset_relb(&pp->state,
- (ERTS_PORT_SFLG_CLOSING
- | ERTS_PORT_SFLG_FREE_SCHEDULED),
- ERTS_PORT_SFLG_FREE_SCHEDULED);
- handle_remaining_tasks(runq, pp); /* May release runq lock */
- ASSERT(!pp->sched.exe_taskq && (!ptqp || !ptqp->first));
- pp->sched.taskq = NULL;
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
- erts_smp_runq_unlock(runq);
-#ifndef ERTS_SMP
- pp->cleanup = 1;
-#endif
- }
-}
+ if (!runq)
+ ERTS_INTERNAL_ERROR("Missing run-queue");
+ erts_port_task_sched_lock(&pp->sched);
+ flags = erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
+ ERTS_PTS_FLG_EXIT);
+ erts_port_task_sched_unlock(&pp->sched);
+ erts_atomic32_read_bset_relb(&pp->state,
+ (ERTS_PORT_SFLG_CLOSING
+ | ERTS_PORT_SFLG_FREE),
+ ERTS_PORT_SFLG_FREE);
-typedef struct {
- ErtsRunQueue *runq;
- int *resp;
-} ErtsPortTaskExeBlockData;
+ erts_smp_runq_unlock(runq);
+
+ if (!(flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
+ begin_port_cleanup(pp, NULL);
+}
/*
- * Run all scheduled tasks for the first port in run queue. If
- * new tasks appear while running reschedule port (free task is
- * an exception; it is always handled instantly).
+ * Execute scheduled tasks of a port.
*
* erts_port_task_execute() is called by scheduler threads between
- * scheduleing of processes. Sched lock should be held by caller.
+ * scheduling of processes. Run-queue lock should be held by caller.
*/
int
erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
{
Port *pp;
- ErtsPortTaskQueue *ptqp;
- ErtsPortTask *ptp;
+ ErtsPortTask *execq;
int res = 0;
int reds = ERTS_PORT_REDS_EXECUTE;
erts_aint_t io_tasks_executed = 0;
int fpe_was_unmasked;
+ erts_aint32_t state;
+ int active;
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
- ERTS_PT_CHK_PORTQ(runq);
-
pp = pop_port(runq);
if (!pp) {
res = 0;
goto done;
}
- if (erts_smp_port_trylock(pp) == EBUSY) {
- erts_smp_runq_unlock(runq);
- erts_smp_port_lock(pp);
- erts_smp_runq_lock(runq);
- }
-
- ASSERT(pp->sched.in_runq);
- pp->sched.in_runq = 0;
-
- if (!pp->sched.taskq) {
- if (erts_system_profile_flags.runnable_ports)
- profile_runnable_port(pp, am_inactive);
- res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
- != (erts_aint_t) 0);
- goto release_port_done;
- }
+ erts_smp_runq_unlock(runq);
*curr_port_pp = pp;
-
- ASSERT(pp->sched.taskq->first);
- ptqp = pp->sched.taskq;
- pp->sched.taskq = NULL;
-
- ASSERT(!pp->sched.exe_taskq);
- pp->sched.exe_taskq = ptqp;
if (erts_sched_stat.enabled) {
ErtsSchedulerData *esdp = erts_get_scheduler_data();
@@ -739,49 +600,43 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
erts_smp_spin_unlock(&erts_sched_stat.lock);
}
+ prepare_exec(pp, &execq);
+
+ erts_smp_port_lock(pp);
+
/* trace port scheduling, in */
if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
trace_sched_ports(pp, am_in);
}
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+ fpe_was_unmasked = erts_block_fpe();
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
- ptp = pop_task(ptqp);
+ state = erts_atomic_read_nob(&pp->state);
+ goto begin_handle_tasks;
- fpe_was_unmasked = erts_block_fpe();
+ while (1) {
+ erts_aint32_t task_state;
+ ErtsPortTask *ptp;
- while (ptp) {
- ASSERT(pp->sched.taskq != pp->sched.exe_taskq);
+ ptp = pop_task(pp, &execq);
+ if (!ptp)
+ break;
+
+ task_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
+ ERTS_PT_STATE_EXECUTING,
+ ERTS_PT_STATE_SCHEDULED);
+ if (task_state != ERTS_PT_STATE_SCHEDULED) {
+ ASSERT(task_state == ERTS_PT_STATE_ABORTED);
+ goto aborted_port_task;
+ }
reset_handle(ptp);
- erts_smp_runq_unlock(runq);
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
ERTS_SMP_CHK_NO_PROC_LOCKS;
ASSERT(pp->drv_ptr);
switch (ptp->type) {
- case ERTS_PORT_TASK_FREE: /* May be pushed in q at any time */
- reds += ERTS_PORT_REDS_FREE;
- erts_smp_runq_lock(runq);
-
- erts_unblock_fpe(fpe_was_unmasked);
- ASSERT(erts_atomic32_read_nob(&pp->state)
- & ERTS_PORT_SFLG_FREE_SCHEDULED);
- if (ptqp->first || (pp->sched.taskq && pp->sched.taskq->first))
- handle_remaining_tasks(runq, pp);
- ASSERT(!ptqp->first
- && (!pp->sched.taskq || !pp->sched.taskq->first));
-
- port_task_free(ptp);
- if (pp->sched.taskq)
- port_taskq_free(pp->sched.taskq);
- pp->sched.taskq = NULL;
-#ifndef ERTS_SMP
- pp->cleanup = 1;
-#endif
- goto tasks_done;
case ERTS_PORT_TASK_TIMEOUT:
reds += ERTS_PORT_REDS_TIMEOUT;
if (!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)) {
@@ -795,7 +650,8 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
& ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_ready_input, pp);
/* NOTE some windows drivers use ->ready_input for input and output */
- (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data, ptp->event);
+ (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data,
+ ptp->u.alive.u.io.event);
io_tasks_executed++;
break;
case ERTS_PORT_TASK_OUTPUT:
@@ -803,7 +659,8 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
ASSERT((erts_atomic32_read_nob(&pp->state)
& ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_ready_output, pp);
- (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, ptp->event);
+ (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data,
+ ptp->u.alive.u.io.event);
io_tasks_executed++;
break;
case ERTS_PORT_TASK_EVENT:
@@ -811,7 +668,9 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
ASSERT((erts_atomic32_read_nob(&pp->state)
& ERTS_PORT_SFLGS_DEAD) == 0);
DTRACE_DRIVER(driver_event, pp);
- (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data, ptp->event, ptp->event_data);
+ (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data,
+ ptp->u.alive.u.io.event,
+ ptp->u.alive.u.io.event_data);
io_tasks_executed++;
break;
case ERTS_PORT_TASK_DIST_CMD:
@@ -824,10 +683,11 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
break;
}
- if ((erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLG_CLOSING)
- && erts_is_port_ioq_empty(pp)) {
+ state = erts_atomic32_read_nob(&pp->state);
+ if ((state & ERTS_PORT_SFLG_CLOSING) && erts_is_port_ioq_empty(pp)) {
reds += ERTS_PORT_REDS_TERMINATE;
erts_terminate_port(pp);
+ state = erts_atomic32_read_nob(&pp->state);
}
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
@@ -840,17 +700,29 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
- port_task_free(ptp);
+ aborted_port_task:
+ schedule_port_task_free(ptp);
- erts_smp_runq_lock(runq);
+ begin_handle_tasks:
+ if (state & ERTS_PORT_SFLG_FREE) {
+ reds += ERTS_PORT_REDS_FREE;
- ptp = pop_task(ptqp);
- }
+ begin_port_cleanup(pp, &execq);
- tasks_done:
+ break;
+ }
+
+ if (reds >= CONTEXT_REDS)
+ break;
+ }
erts_unblock_fpe(fpe_was_unmasked);
+ /* trace port scheduling, out */
+ if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
+ trace_sched_ports(pp, am_out);
+ }
+
if (io_tasks_executed) {
ASSERT(erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
>= io_tasks_executed);
@@ -858,15 +730,19 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
-1*io_tasks_executed);
}
- *curr_port_pp = NULL;
-
#ifdef ERTS_SMP
ASSERT(runq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
#endif
- if (!pp->sched.taskq) {
- ASSERT(pp->sched.exe_taskq);
- pp->sched.exe_taskq = NULL;
+ active = finalize_exec(pp, &execq);
+
+ erts_port_release(pp);
+
+ *curr_port_pp = NULL;
+
+ erts_smp_runq_lock(runq);
+
+ if (!active) {
if (erts_system_profile_flags.runnable_ports)
profile_runnable_port(pp, am_inactive);
}
@@ -876,15 +752,12 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
#endif
ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
- ASSERT(pp->sched.taskq->first);
#ifdef ERTS_SMP
xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
if (!xrunq) {
#endif
enqueue_port(runq, pp);
- ASSERT(pp->sched.exe_taskq);
- pp->sched.exe_taskq = NULL;
/* No need to notify ourselves about inc in runq. */
#ifdef ERTS_SMP
}
@@ -894,35 +767,20 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
erts_smp_runq_unlock(runq);
xrunq = erts_port_runq(pp);
- if (xrunq) {
- enqueue_port(xrunq, pp);
- ASSERT(pp->sched.exe_taskq);
- pp->sched.exe_taskq = NULL;
- erts_smp_runq_unlock(xrunq);
- erts_smp_notify_inc_runq(xrunq);
- }
+ ASSERT(xrunq);
+ enqueue_port(xrunq, pp);
+ erts_smp_runq_unlock(xrunq);
+ erts_smp_notify_inc_runq(xrunq);
erts_smp_runq_lock(runq);
}
#endif
}
+ done:
res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
!= (erts_aint_t) 0);
- ERTS_PT_CHK_PRES_PORTQ(runq, pp);
-
- port_taskq_free(ptqp);
-
- /* trace port scheduling, out */
- if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
- trace_sched_ports(pp, am_out);
- }
-
-release_port_done:
- erts_port_release(pp);
-
- done:
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
ERTS_PORT_REDUCTIONS_EXECUTED(runq, reds);
@@ -930,47 +788,83 @@ release_port_done:
return res;
}
-/*
- * Handle remaining tasks after a free task.
- */
+#ifdef ERTS_SMP
+static void
+release_port(void *vport)
+{
+ erts_port_dec_refc((Port *) vport);
+}
+#endif
static void
-handle_remaining_tasks(ErtsRunQueue *runq, Port *pp)
+begin_port_cleanup(Port *pp, ErtsPortTask **execqp)
{
- int i;
- ErtsPortTask *ptp;
- ErtsPortTaskQueue *ptqps[] = {pp->sched.exe_taskq, pp->sched.taskq};
+ int i, max;
+ ErtsPortTask *qs[2];
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
- for (i = 0; i < sizeof(ptqps)/sizeof(ErtsPortTaskQueue *); i++) {
- if (!ptqps[i])
- continue;
- ptp = pop_task(ptqps[i]);
- while (ptp) {
+ /*
+ * Handle remaining tasks...
+ */
+
+ max = 0;
+ if (execqp && *execqp) {
+ qs[max++] = *execqp;
+ *execqp = NULL;
+ }
+
+ erts_port_task_sched_lock(&pp->sched);
+ qs[max] = pp->sched.taskq.in.first;
+ pp->sched.taskq.in.first = NULL;
+ pp->sched.taskq.in.last = NULL;
+ erts_port_task_sched_unlock(&pp->sched);
+ if (qs[max])
+ max++;
+
+ for (i = 0; i < max; i++) {
+ while (1) {
+ erts_aint32_t state;
+ ErtsPortTask *ptp = qs[i];
+ if (!ptp)
+ break;
+
+ qs[i] = ptp->u.alive.next;
+
+ /* Normal case here is aborted tasks... */
+ state = erts_smp_atomic32_read_nob(&ptp->state);
+ if (state == ERTS_PT_STATE_ABORTED)
+ goto aborted_port_task;
+
+ state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
+ ERTS_PT_STATE_EXECUTING,
+ ERTS_PT_STATE_SCHEDULED);
+ if (state != ERTS_PT_STATE_SCHEDULED) {
+ ASSERT(state == ERTS_PT_STATE_ABORTED);
+ goto aborted_port_task;
+ }
+
reset_handle(ptp);
- erts_smp_runq_unlock(runq);
switch (ptp->type) {
- case ERTS_PORT_TASK_FREE:
case ERTS_PORT_TASK_TIMEOUT:
break;
case ERTS_PORT_TASK_INPUT:
erts_stale_drv_select(pp->common.id,
- ptp->event,
+ ptp->u.alive.u.io.event,
DO_READ,
1);
break;
case ERTS_PORT_TASK_OUTPUT:
erts_stale_drv_select(pp->common.id,
- ptp->event,
+ ptp->u.alive.u.io.event,
DO_WRITE,
1);
break;
case ERTS_PORT_TASK_EVENT:
erts_stale_drv_select(pp->common.id,
- ptp->event,
+ ptp->u.alive.u.io.event,
0,
1);
break;
@@ -982,35 +876,41 @@ handle_remaining_tasks(ErtsRunQueue *runq, Port *pp)
(int) ptp->type);
}
- port_task_free(ptp);
-
- erts_smp_runq_lock(runq);
- ptp = pop_task(ptqps[i]);
+ aborted_port_task:
+ schedule_port_task_free(ptp);
}
}
- ASSERT(!pp->sched.taskq || !pp->sched.taskq->first);
+ erts_smp_atomic32_read_band_nob(&pp->sched.flags,
+ ~ERTS_PTS_FLG_HAVE_TASKS);
+
+ /*
+ * Schedule cleanup of port structure...
+ */
+#ifdef ERTS_SMP
+ erts_schedule_thr_prgr_later_op(release_port,
+ (void *) pp,
+ &pp->common.u.release);
+#else
+ pp->cleanup = 1;
+#endif
}
int
erts_port_is_scheduled(Port *pp)
{
- int res;
- ErtsRunQueue *runq = erts_port_runq(pp);
- if (!runq)
- return 0;
- res = pp->sched.taskq || pp->sched.exe_taskq;
- erts_smp_runq_unlock(runq);
- return res;
+ erts_aint32_t flags = erts_smp_atomic32_read_acqb(&pp->sched.flags);
+ return (flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)) != 0;
}
#ifdef ERTS_SMP
+
void
erts_enqueue_port(ErtsRunQueue *rq, Port *pp)
{
ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
ASSERT(rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
- ASSERT(pp->sched.in_runq);
+ ASSERT(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_IN_RUNQ);
enqueue_port(rq, pp);
}
@@ -1022,7 +922,8 @@ erts_dequeue_port(ErtsRunQueue *rq)
pp = pop_port(rq);
ASSERT(!pp
|| rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
- ASSERT(!pp || pp->sched.in_runq);
+ ASSERT(!pp || (erts_smp_atomic32_read_nob(&pp->sched.flags)
+ & ERTS_PTS_FLG_IN_RUNQ));
return pp;
}
@@ -1037,5 +938,4 @@ erts_port_task_init(void)
erts_smp_atomic_init_nob(&erts_port_task_outstanding_io_tasks,
(erts_aint_t) 0);
init_port_task_alloc();
- init_port_taskq_alloc();
}
diff --git a/erts/emulator/beam/erl_port_task.h b/erts/emulator/beam/erl_port_task.h
index fd88b1c1ff..3e87d5b778 100644
--- a/erts/emulator/beam/erl_port_task.h
+++ b/erts/emulator/beam/erl_port_task.h
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 2006-2011. All Rights Reserved.
+ * Copyright Ericsson AB 2006-2012. All Rights Reserved.
*
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
@@ -44,7 +44,6 @@ typedef erts_smp_atomic_t ErtsPortTaskHandle;
#endif
typedef enum {
- ERTS_PORT_TASK_FREE,
ERTS_PORT_TASK_INPUT,
ERTS_PORT_TASK_OUTPUT,
ERTS_PORT_TASK_EVENT,
@@ -57,19 +56,36 @@ typedef enum {
extern erts_smp_atomic_t erts_port_task_outstanding_io_tasks;
#endif
+#define ERTS_PTS_FLG_IN_RUNQ (((erts_aint32_t) 1) << 0)
+#define ERTS_PTS_FLG_EXEC (((erts_aint32_t) 1) << 1)
+#define ERTS_PTS_FLG_HAVE_TASKS (((erts_aint32_t) 1) << 2)
+#define ERTS_PTS_FLG_EXIT (((erts_aint32_t) 1) << 3)
+
typedef struct ErtsPortTask_ ErtsPortTask;
-typedef struct ErtsPortTaskQueue_ ErtsPortTaskQueue;
typedef struct {
Port *next;
- int in_runq;
- ErtsPortTaskQueue *taskq;
- ErtsPortTaskQueue *exe_taskq;
+ struct {
+ ErtsPortTask *local;
+ struct {
+ ErtsPortTask *first;
+ ErtsPortTask *last;
+ } in;
+ } taskq;
+ erts_smp_atomic32_t flags;
+#ifdef ERTS_SMP
+ erts_mtx_t mtx;
+#endif
} ErtsPortTaskSched;
ERTS_GLB_INLINE void erts_port_task_handle_init(ErtsPortTaskHandle *pthp);
ERTS_GLB_INLINE int erts_port_task_is_scheduled(ErtsPortTaskHandle *pthp);
-ERTS_GLB_INLINE void erts_port_task_init_sched(ErtsPortTaskSched *ptsp);
+ERTS_GLB_INLINE void erts_port_task_init_sched(ErtsPortTaskSched *ptsp,
+ Eterm id);
+ERTS_GLB_INLINE void erts_port_task_fini_sched(ErtsPortTaskSched *ptsp);
+ERTS_GLB_INLINE void erts_port_task_sched_lock(ErtsPortTaskSched *ptsp);
+ERTS_GLB_INLINE void erts_port_task_sched_unlock(ErtsPortTaskSched *ptsp);
+
#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
ERTS_GLB_INLINE int erts_port_task_have_outstanding_io_tasks(void);
#endif
@@ -89,12 +105,40 @@ erts_port_task_is_scheduled(ErtsPortTaskHandle *pthp)
}
ERTS_GLB_INLINE void
-erts_port_task_init_sched(ErtsPortTaskSched *ptsp)
+erts_port_task_init_sched(ErtsPortTaskSched *ptsp, Eterm instr_id)
{
ptsp->next = NULL;
- ptsp->in_runq = 0;
- ptsp->taskq = NULL;
- ptsp->exe_taskq = NULL;
+ ptsp->taskq.local = NULL;
+ ptsp->taskq.in.first = NULL;
+ ptsp->taskq.in.last = NULL;
+ erts_smp_atomic32_init_nob(&ptsp->flags, 0);
+#ifdef ERTS_SMP
+ erts_mtx_init_x(&ptsp->mtx, "port_sched_lock", instr_id);
+#endif
+}
+
+ERTS_GLB_INLINE void
+erts_port_task_sched_lock(ErtsPortTaskSched *ptsp)
+{
+#ifdef ERTS_SMP
+ erts_mtx_lock(&ptsp->mtx);
+#endif
+}
+
+ERTS_GLB_INLINE void
+erts_port_task_sched_unlock(ErtsPortTaskSched *ptsp)
+{
+#ifdef ERTS_SMP
+ erts_mtx_unlock(&ptsp->mtx);
+#endif
+}
+
+ERTS_GLB_INLINE void
+erts_port_task_fini_sched(ErtsPortTaskSched *ptsp)
+{
+#ifdef ERTS_SMP
+ erts_mtx_destroy(&ptsp->mtx);
+#endif
}
#ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
@@ -115,12 +159,11 @@ int erts_port_task_execute(ErtsRunQueue *, Port **);
void erts_port_task_init(void);
#endif
-int erts_port_task_abort(Eterm id, ErtsPortTaskHandle *);
+int erts_port_task_abort(ErtsPortTaskHandle *);
int erts_port_task_schedule(Eterm,
ErtsPortTaskHandle *,
ErtsPortTaskType,
- ErlDrvEvent,
- ErlDrvEventData);
+ ...);
void erts_port_task_free_port(Port *);
int erts_port_is_scheduled(Port *);
diff --git a/erts/emulator/beam/erl_ptab.c b/erts/emulator/beam/erl_ptab.c
index 7ab756ae8d..8dcdf94c80 100644
--- a/erts/emulator/beam/erl_ptab.c
+++ b/erts/emulator/beam/erl_ptab.c
@@ -415,11 +415,6 @@ last_data_cmp(Uint64 ld1, Uint64 ld2)
#define ERTS_PTAB_LastData2EtermData(LD) \
((Eterm) ((LD) & ~(~((Uint64) 0) << ERTS_PTAB_ID_DATA_SIZE)))
-static void noop(void *unused)
-{
-
-}
-
void
erts_ptab_init_table(ErtsPTab *ptab,
ErtsAlcType_t atype,
@@ -484,10 +479,6 @@ erts_ptab_init_table(ErtsPTab *ptab,
ptab->r.o.invalid_element = invalid_element;
ptab->r.o.invalid_data = erts_ptab_id2data(ptab, invalid_element->id);
ptab->r.o.release_element = release_element;
- if (release_element)
- ptab->r.o.release_element = release_element;
- else
- ptab->r.o.release_element = noop;
erts_smp_interval_init(&ptab->list.data.interval);
ptab->list.data.deleted.start = NULL;
@@ -678,9 +669,10 @@ erts_ptab_delete_element(ErtsPTab *ptab,
erts_ptab_rwunlock(ptab);
}
- erts_schedule_thr_prgr_later_op(ptab->r.o.release_element,
- (void *) ptab_el,
- &ptab_el->u.release);
+ if (ptab->r.o.release_element)
+ erts_schedule_thr_prgr_later_op(ptab->r.o.release_element,
+ (void *) ptab_el,
+ &ptab_el->u.release);
}
/*
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 1177e7e691..78c27b1385 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -197,19 +197,58 @@ erts_lc_is_port_locked(Port *prt)
static void initq(Port* prt);
+#if defined(ERTS_ENABLE_LOCK_CHECK) || defined(ERTS_ENABLE_LOCK_COUNT)
+#define ERTS_PORT_INIT_INSTR_NEED_ID 1
+#else
+#define ERTS_PORT_INIT_INSTR_NEED_ID 0
+#endif
+
+static ERTS_INLINE void port_init_instr(Port *prt
+#if ERTS_PORT_INIT_INSTR_NEED_ID
+ , Eterm id
+#endif
+ )
+{
+#if !ERTS_PORT_INIT_INSTR_NEED_ID
+ Eterm id = NIL; /* Not used */
+#endif
+
+ /*
+ * Stuff that need to be initialized with the port id
+ * in the instrumented case, but not in the normal case.
+ */
+#ifdef ERTS_SMP
+ ASSERT(prt->drv_ptr && prt->lock);
+ if (!prt->drv_ptr->lock)
+ erts_mtx_init_locked_x(prt->lock, "port_lock", id);
+#endif
+ erts_port_task_init_sched(&prt->sched, id);
+}
+
+#if !ERTS_PORT_INIT_INSTR_NEED_ID
+static ERTS_INLINE void port_init_instr_abort(Port *prt)
+{
+#ifdef ERTS_SMP
+ ASSERT(prt->drv_ptr && prt->lock);
+ if (!prt->drv_ptr->lock) {
+ erts_mtx_unlock(prt->lock);
+ erts_mtx_destroy(prt->lock);
+ }
+#endif
+ erts_port_task_fini_sched(&prt->sched);
+}
+#endif
+
static void insert_port_struct(void *vprt, Eterm data)
{
Port *prt = (Port *) vprt;
Eterm id = make_internal_port(data);
-#ifdef ERTS_SMP
- ASSERT(prt->drv_ptr && prt->lock);
+#if ERTS_PORT_INIT_INSTR_NEED_ID
/*
- * We are breaking lock order in the port specific locking
- * case. This is, however, safe since the lock has not been
- * published, yet.
+ * This cannot be done earlier in the instrumented
+ * case since we don't now 'id' until now.
*/
- if (!prt->drv_ptr->lock)
- erts_mtx_init_locked_x(prt->lock, "port_lock", id);
+ port_init_instr(prt, id);
#endif
prt->common.id = id;
erts_atomic32_init_relb(&prt->state, ERTS_PORT_SFLG_INITIALIZING);
@@ -292,7 +331,6 @@ static Port *create_port(char *name,
sys_memset(&prt->common.u.alive.tm, 0, sizeof(ErlTimer));
#endif
erts_port_task_handle_init(&prt->timeout_task);
- erts_port_task_init_sched(&prt->sched);
prt->psd = NULL;
prt->drv_data = (SWord) 0;
@@ -301,10 +339,23 @@ static Port *create_port(char *name,
ASSERT(((char *) prt) == ((char *) &prt->common));
+#if !ERTS_PORT_INIT_INSTR_NEED_ID
+ /*
+ * When 'id' isn't needed (the normal case), it is better to
+ * do the initialization here avoiding unnecessary contention
+ * on table...
+ */
+ port_init_instr(prt);
+#endif
+
if (!erts_ptab_new_element(&erts_port,
&prt->common,
(void *) prt,
insert_port_struct)) {
+
+#if !ERTS_PORT_INIT_INSTR_NEED_ID
+ port_init_instr_abort(prt);
+#endif
#ifdef ERTS_SMP
if (driver_lock)
erts_mtx_unlock(driver_lock);
@@ -343,10 +394,9 @@ erts_port_free(Port *prt)
#if defined(ERTS_SMP) || defined(ERTS_ENABLE_LOCK_CHECK)
erts_aint32_t state = erts_atomic32_read_nob(&prt->state);
#endif
- ERTS_LC_ASSERT(state & (ERTS_PORT_SFLG_FREE_SCHEDULED
- | ERTS_PORT_SFLG_INITIALIZING));
+ ERTS_LC_ASSERT(state & (ERTS_PORT_SFLG_INITIALIZING
+ | ERTS_PORT_SFLG_FREE));
ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG);
- ERTS_LC_ASSERT(!(state & ERTS_PORT_SFLG_FREE));
#ifdef ERTS_SMP
ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->common.refc) == 0);
@@ -354,6 +404,8 @@ erts_port_free(Port *prt)
ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->refc) == 0);
#endif
+ erts_port_task_fini_sched(&prt->sched);
+
#ifdef ERTS_SMP
ASSERT(prt->lock);
if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK)
@@ -372,7 +424,6 @@ erts_port_free(Port *prt)
if (prt->drv_ptr->handle)
erts_ddll_dereference_driver(prt->drv_ptr->handle);
#endif
- erts_atomic32_set_nob(&prt->state, ERTS_PORT_SFLG_FREE);
erts_free(ERTS_ALC_T_PORT, prt);
}
@@ -1230,14 +1281,6 @@ int erts_write_to_port(Eterm caller_id, Port *p, Eterm list)
}
}
-#ifdef ERTS_SMP
-static void
-release_port(void *vport)
-{
- erts_port_dec_refc((Port *) vport);
-}
-#endif
-
void erts_init_io(int port_tab_size,
int port_tab_size_ignore_files)
{
@@ -1267,11 +1310,7 @@ void erts_init_io(int port_tab_size,
erts_ptab_init_table(&erts_port,
ERTS_ALC_T_PORT_TABLE,
-#ifdef ERTS_SMP
- release_port,
-#else
NULL,
-#endif
(ErtsPTabElementCommon *) &erts_invalid_port.common,
port_tab_size,
"port_table");
@@ -2727,11 +2766,9 @@ static void schedule_port_timeout(Port *p)
* /Rickard
*/
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p));
- (void) erts_port_task_schedule(p->common.id,
- &p->timeout_task,
- ERTS_PORT_TASK_TIMEOUT,
- (ErlDrvEvent) -1,
- NULL);
+ erts_port_task_schedule(p->common.id,
+ &p->timeout_task,
+ ERTS_PORT_TASK_TIMEOUT);
}
ErlDrvTermData driver_mk_term_nil(void)
@@ -4198,7 +4235,7 @@ drv_cancel_timer(Port *prt)
erts_cancel_timer(&prt->common.u.alive.tm);
#endif
if (erts_port_task_is_scheduled(&prt->timeout_task))
- erts_port_task_abort(prt->common.id, &prt->timeout_task);
+ erts_port_task_abort(&prt->timeout_task);
}
int driver_set_timer(ErlDrvPort ix, unsigned long t)
diff --git a/erts/emulator/beam/sys.h b/erts/emulator/beam/sys.h
index 12313f0984..fd6f34e96e 100644
--- a/erts/emulator/beam/sys.h
+++ b/erts/emulator/beam/sys.h
@@ -123,6 +123,16 @@ typedef ERTS_SYS_FD_TYPE ErtsSysFdType;
# define ERTS_DECLARE_DUMMY(X) X
#endif
+#if !defined(__func__)
+# if !defined(__STDC_VERSION__) || __STDC_VERSION__ < 199901L
+# if !defined(__GNUC__) || __GNUC__ < 2
+# define __func__ "[unknown_function]"
+# else
+# define __func__ __FUNCTION__
+# endif
+# endif
+#endif
+
#if defined(DEBUG) || defined(ERTS_ENABLE_LOCK_CHECK)
# undef ERTS_CAN_INLINE
# define ERTS_CAN_INLINE 0
@@ -553,6 +563,10 @@ __decl_noreturn void __noreturn erl_exit(int n, char*, ...);
#define ERTS_ABORT_EXIT (INT_MIN + 1) /* no crash dump; only abort() */
#define ERTS_DUMP_EXIT (INT_MIN + 2) /* crash dump; then exit() */
+#define ERTS_INTERNAL_ERROR(What) \
+ erl_exit(ERTS_ABORT_EXIT, "%s:%d:%s(): Internal error: %s", \
+ __FILE__, __LINE__, __func__, What)
+
Eterm erts_check_io_info(void *p);
/* Size of misc memory allocated from system dependent code */
diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c
index 9ef861bfe4..9c78fe713b 100644
--- a/erts/emulator/sys/common/erl_check_io.c
+++ b/erts/emulator/sys/common/erl_check_io.c
@@ -366,7 +366,7 @@ abort_task(Eterm id, ErtsPortTaskHandle *pthp, EventStateType type)
|| !erts_port_task_is_scheduled(pthp));
}
else if (erts_port_task_is_scheduled(pthp)) {
- erts_port_task_abort(id, pthp);
+ erts_port_task_abort(pthp);
ASSERT(erts_is_port_alive(id));
}
}
@@ -1088,8 +1088,7 @@ iready(Eterm id, ErtsDrvEventState *state)
if (erts_port_task_schedule(id,
&state->driver.select->intask,
ERTS_PORT_TASK_INPUT,
- (ErlDrvEvent) state->fd,
- NULL) != 0) {
+ (ErlDrvEvent) state->fd) != 0) {
stale_drv_select(id, state, ERL_DRV_READ);
}
}
@@ -1100,8 +1099,7 @@ oready(Eterm id, ErtsDrvEventState *state)
if (erts_port_task_schedule(id,
&state->driver.select->outtask,
ERTS_PORT_TASK_OUTPUT,
- (ErlDrvEvent) state->fd,
- NULL) != 0) {
+ (ErlDrvEvent) state->fd) != 0) {
stale_drv_select(id, state, ERL_DRV_WRITE);
}
}