From 56cef897ca3ad2377e34a6ea5800a54a28cbeb6e Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Mon, 20 Aug 2012 13:48:29 +0200 Subject: Optimize management of port tasks --- erts/emulator/beam/dist.c | 2 +- erts/emulator/beam/dist.h | 9 +- erts/emulator/beam/erl_alloc.types | 1 - erts/emulator/beam/erl_lock_check.c | 4 +- erts/emulator/beam/erl_port.h | 10 +- erts/emulator/beam/erl_port_task.c | 1022 ++++++++++++++----------------- erts/emulator/beam/erl_port_task.h | 71 ++- erts/emulator/beam/erl_ptab.c | 16 +- erts/emulator/beam/io.c | 97 ++- erts/emulator/beam/sys.h | 14 + erts/emulator/sys/common/erl_check_io.c | 8 +- 11 files changed, 614 insertions(+), 640 deletions(-) diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 13715034cb..432b842848 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -458,7 +458,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) && erts_lc_is_port_locked(erts_port_lookup_raw(dep->cid))); if (erts_port_task_is_scheduled(&dep->dist_cmd)) - erts_port_task_abort(dep->cid, &dep->dist_cmd); + erts_port_task_abort(&dep->dist_cmd); if (dep->status & ERTS_DE_SFLG_EXITING) { #ifdef DEBUG diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 7de8786c4a..2bc3d9c881 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -204,13 +204,8 @@ void erts_schedule_dist_command(Port *prt, DistEntry *dist_entry) id = dep->cid; } - if (!erts_smp_atomic_xchg_mb(&dep->dist_cmd_scheduled, 1)) { - (void) erts_port_task_schedule(id, - &dep->dist_cmd, - ERTS_PORT_TASK_DIST_CMD, - (ErlDrvEvent) -1, - NULL); - } + if (!erts_smp_atomic_xchg_mb(&dep->dist_cmd_scheduled, 1)) + erts_port_task_schedule(id, &dep->dist_cmd, ERTS_PORT_TASK_DIST_CMD); } #endif diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 92ba15214e..36beec9553 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -233,7 +233,6 @@ type DDLL_HANDLE STANDARD SYSTEM ddll_handle type DDLL_ERRCODES LONG_LIVED SYSTEM ddll_errcodes type DDLL_TMP_BUF TEMPORARY SYSTEM ddll_tmp_buf type PORT_TASK SHORT_LIVED SYSTEM port_task -type PORT_TASKQ SHORT_LIVED SYSTEM port_task_queue type MISC_OP_LIST SHORT_LIVED SYSTEM misc_op_list type PORT_NAMES SHORT_LIVED SYSTEM port_names type PORT_DATA_LOCK DRIVER SYSTEM port_data_lock diff --git a/erts/emulator/beam/erl_lock_check.c b/erts/emulator/beam/erl_lock_check.c index 06203a706d..2505c094ab 100644 --- a/erts/emulator/beam/erl_lock_check.c +++ b/erts/emulator/beam/erl_lock_check.c @@ -152,12 +152,10 @@ static erts_lc_lock_order_t erts_lock_order[] = { { "pmmap", NULL }, #endif #ifdef ERTS_SMP + { "port_sched_lock", "port_id" }, { "port_task_pre_alloc_lock", "address" }, - { "port_taskq_pre_alloc_lock", "address" }, { "proclist_pre_alloc_lock", "address" }, - { "port_tasks_lock", NULL }, { "port_table", NULL }, - { "port_state", "address" }, { "xports_list_pre_alloc_lock", "address" }, { "inet_buffer_stack_lock", NULL }, { "gc_info", NULL }, diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h index 3de0eed9ea..ce8d809245 100644 --- a/erts/emulator/beam/erl_port.h +++ b/erts/emulator/beam/erl_port.h @@ -224,13 +224,12 @@ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */ /* Immortal port (only certain system ports) */ #define ERTS_PORT_SFLG_IMMORTAL ((Uint32) (1 << 9)) #define ERTS_PORT_SFLG_FREE ((Uint32) (1 << 10)) -#define ERTS_PORT_SFLG_FREE_SCHEDULED ((Uint32) (1 << 11)) -#define ERTS_PORT_SFLG_INITIALIZING ((Uint32) (1 << 12)) +#define ERTS_PORT_SFLG_INITIALIZING ((Uint32) (1 << 11)) /* Port uses port specific locking (opposed to driver specific locking) */ -#define ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK ((Uint32) (1 << 13)) -#define ERTS_PORT_SFLG_INVALID ((Uint32) (1 << 14)) +#define ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK ((Uint32) (1 << 12)) +#define ERTS_PORT_SFLG_INVALID ((Uint32) (1 << 13)) /* Last port to terminate halts the emulator */ -#define ERTS_PORT_SFLG_HALT ((Uint32) (1 << 15)) +#define ERTS_PORT_SFLG_HALT ((Uint32) (1 << 14)) #ifdef DEBUG /* Only debug: make sure all flags aren't cleared unintentionally */ #define ERTS_PORT_SFLG_PORT_DEBUG ((Uint32) (1 << 31)) @@ -239,7 +238,6 @@ extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */ /* Combinations of port status flags */ #define ERTS_PORT_SFLGS_DEAD \ (ERTS_PORT_SFLG_FREE \ - | ERTS_PORT_SFLG_FREE_SCHEDULED \ | ERTS_PORT_SFLG_INITIALIZING) #define ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP \ (ERTS_PORT_SFLGS_DEAD | ERTS_PORT_SFLG_INVALID) diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c index 112d27c94e..edcd3112da 100644 --- a/erts/emulator/beam/erl_port_task.c +++ b/erts/emulator/beam/erl_port_task.c @@ -33,6 +33,7 @@ #include "erl_port_task.h" #include "dist.h" #include "dtrace-wrapper.h" +#include #if defined(DEBUG) && 0 #define HARD_DEBUG @@ -41,18 +42,13 @@ /* * Costs in reductions for some port operations. */ -#define ERTS_PORT_REDS_EXECUTE 0 -#define ERTS_PORT_REDS_FREE 50 -#define ERTS_PORT_REDS_TIMEOUT 200 -#define ERTS_PORT_REDS_INPUT 200 -#define ERTS_PORT_REDS_OUTPUT 200 -#define ERTS_PORT_REDS_EVENT 200 -#define ERTS_PORT_REDS_TERMINATE 100 - - -#define ERTS_PORT_TASK_INVALID_PORT(P, ID) \ - ((erts_atomic32_read_acqb(&(P)->state) & ERTS_PORT_SFLGS_DEAD) \ - || (P)->common.id != (ID)) +#define ERTS_PORT_REDS_EXECUTE 10 +#define ERTS_PORT_REDS_FREE 100 +#define ERTS_PORT_REDS_TIMEOUT 400 +#define ERTS_PORT_REDS_INPUT 400 +#define ERTS_PORT_REDS_OUTPUT 400 +#define ERTS_PORT_REDS_EVENT 400 +#define ERTS_PORT_REDS_TERMINATE 200 #ifdef USE_VM_PROBES #define DTRACE_DRIVER(PROBE_NAME, PP) \ @@ -70,82 +66,87 @@ erts_smp_atomic_t erts_port_task_outstanding_io_tasks; -struct ErtsPortTaskQueue_ { - ErtsPortTask *first; - ErtsPortTask *last; -}; +#define ERTS_PT_STATE_SCHEDULED 0 +#define ERTS_PT_STATE_ABORTED 1 +#define ERTS_PT_STATE_EXECUTING 2 struct ErtsPortTask_ { - ErtsPortTask *prev; - ErtsPortTask *next; - ErtsPortTaskQueue *queue; - ErtsPortTaskHandle *handle; + erts_smp_atomic32_t state; ErtsPortTaskType type; - ErlDrvEvent event; - ErlDrvEventData event_data; + union { + struct { + ErtsPortTask *next; + ErtsPortTaskHandle *handle; + union { + struct { /* I/O tasks */ + ErlDrvEvent event; + ErlDrvEventData event_data; + } io; + } u; + } alive; + ErtsThrPrgrLaterOp release; + } u; }; -#ifdef HARD_DEBUG -#define ERTS_PT_CHK_PORTQ(RQ) check_port_queue((RQ), NULL, 0) -#define ERTS_PT_CHK_PRES_PORTQ(RQ, PP) check_port_queue((RQ), (PP), -1) -#define ERTS_PT_CHK_IN_PORTQ(RQ, PP) check_port_queue((RQ), (PP), 1) -#define ERTS_PT_CHK_NOT_IN_PORTQ(RQ, PP) check_port_queue((RQ), (PP), 0) -#define ERTS_PT_CHK_TASKQ(Q) check_task_queue((Q), NULL, 0) -#define ERTS_PT_CHK_IN_TASKQ(Q, T) check_task_queue((Q), (T), 1) -#define ERTS_PT_CHK_NOT_IN_TASKQ(Q, T) check_task_queue((Q), (T), 0) -static void -check_port_queue(Port *chk_pp, int inq); -static void -check_task_queue(ErtsPortTaskQueue *ptqp, - ErtsPortTask *chk_ptp, - int inq); -#else -#define ERTS_PT_CHK_PORTQ(RQ) -#define ERTS_PT_CHK_PRES_PORTQ(RQ, PP) -#define ERTS_PT_CHK_IN_PORTQ(RQ, PP) -#define ERTS_PT_CHK_NOT_IN_PORTQ(RQ, PP) -#define ERTS_PT_CHK_TASKQ(Q) -#define ERTS_PT_CHK_IN_TASKQ(Q, T) -#define ERTS_PT_CHK_NOT_IN_TASKQ(Q, T) -#endif - -static void handle_remaining_tasks(ErtsRunQueue *runq, Port *pp); +static void begin_port_cleanup(Port *pp, ErtsPortTask **execq); ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(port_task, ErtsPortTask, - 200, + 1000, ERTS_ALC_T_PORT_TASK) -ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(port_taskq, - ErtsPortTaskQueue, - 50, - ERTS_ALC_T_PORT_TASKQ) + +#ifdef ERTS_SMP +static void +call_port_task_free(void *vptp) +{ + port_task_free((ErtsPortTask *) vptp); +} +#endif + +static ERTS_INLINE void +schedule_port_task_free(ErtsPortTask *ptp) +{ +#ifdef ERTS_SMP + erts_schedule_thr_prgr_later_op(call_port_task_free, + (void *) ptp, + &ptp->u.release); +#else + port_task_free(ptp); +#endif +} /* * Task handle manipulation. */ +static ERTS_INLINE void +reset_port_task_handle(ErtsPortTaskHandle *pthp) +{ + erts_smp_atomic_set_relb(pthp, (erts_aint_t) NULL); +} + static ERTS_INLINE ErtsPortTask * handle2task(ErtsPortTaskHandle *pthp) { - return (ErtsPortTask *) erts_smp_atomic_read_nob(pthp); + return (ErtsPortTask *) erts_smp_atomic_read_acqb(pthp); } static ERTS_INLINE void reset_handle(ErtsPortTask *ptp) { - if (ptp->handle) { - ASSERT(ptp == handle2task(ptp->handle)); - erts_smp_atomic_set_nob(ptp->handle, (erts_aint_t) NULL); + if (ptp->u.alive.handle) { + ASSERT(ptp == handle2task(ptp->u.alive.handle)); + reset_port_task_handle(ptp->u.alive.handle); } } static ERTS_INLINE void set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp) { - ptp->handle = pthp; + ptp->u.alive.handle = pthp; if (pthp) { - erts_smp_atomic_set_nob(pthp, (erts_aint_t) ptp); - ASSERT(ptp == handle2task(ptp->handle)); + erts_smp_atomic_set_relb(pthp, (erts_aint_t) ptp); + ASSERT(ptp == handle2task(ptp->u.alive.handle)); } } @@ -158,7 +159,6 @@ enqueue_port(ErtsRunQueue *runq, Port *pp) { ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); pp->sched.next = NULL; - pp->sched.in_runq = 1; if (runq->ports.end) { ASSERT(runq->ports.start); runq->ports.end->sched.next = pp; @@ -196,287 +196,181 @@ pop_port(ErtsRunQueue *runq) return pp; } +/* + * Task queue operations + */ -#ifdef HARD_DEBUG - -static void -check_port_queue(ErtsRunQueue *runq, Port *chk_pp, int inq) +static ERTS_INLINE erts_aint32_t +enqueue_task(Port *pp, ErtsPortTask *ptp) { - Port *pp; - Port *last_pp; - Port *first_pp = runq->ports.start; - int no_forward = 0, no_backward = 0; - int found_forward = 0, found_backward = 0; - if (!first_pp) { - ASSERT(!runq->ports.end); - } - else { - ASSERT(!first_pp->sched.prev); - for (pp = first_pp; pp; pp = pp->sched.next) { - ASSERT(pp->sched.taskq); - if (pp->sched.taskq->first) - no_forward++; - if (chk_pp == pp) - found_forward = 1; - if (!pp->sched.prev) { - ASSERT(first_pp == pp); - } - if (!pp->sched.next) { - ASSERT(runq->ports.end == pp); - last_pp = pp; - } - } - for (pp = last_pp; pp; pp = pp->sched.prev) { - ASSERT(pp->sched.taskq); - if (pp->sched.taskq->last) - no_backward++; - if (chk_pp == pp) - found_backward = 1; - if (!pp->sched.prev) { - ASSERT(first_pp == pp); - } - if (!pp->sched.next) { - ASSERT(runq->ports.end == pp); - } - check_task_queue(pp->sched.taskq, NULL, 0); - } - ASSERT(no_forward == no_backward); - } - ASSERT(no_forward == RUNQ_READ_LEN(&runq->ports.info.len)); - if (chk_pp) { - if (chk_pp->sched.taskq || chk_pp->sched.exe_taskq) { - ASSERT(chk_pp->sched.taskq != chk_pp->sched.exe_taskq); - } - ASSERT(!chk_pp->sched.taskq || chk_pp->sched.taskq->first); - if (inq < 0) - inq = chk_pp->sched.taskq && !chk_pp->sched.exe_taskq; - if (inq) { - ASSERT(found_forward && found_backward); + erts_aint32_t flags; + ptp->u.alive.next = NULL; + erts_port_task_sched_lock(&pp->sched); + flags = erts_smp_atomic32_read_nob(&pp->sched.flags); + if (!(flags & ERTS_PTS_FLG_EXIT)) { + if (pp->sched.taskq.in.last) { + ASSERT(pp->sched.taskq.in.first); + ASSERT(!pp->sched.taskq.in.last->u.alive.next); + + pp->sched.taskq.in.last->u.alive.next = ptp; } else { - ASSERT(!found_forward && !found_backward); + ASSERT(!pp->sched.taskq.in.first); + + pp->sched.taskq.in.first = ptp; } + pp->sched.taskq.in.last = ptp; } + erts_port_task_sched_unlock(&pp->sched); + return flags; } -#endif - -/* - * Task queue operations - */ - -static ERTS_INLINE ErtsPortTaskQueue * -port_taskq_init(ErtsPortTaskQueue *ptqp, Port *pp) +static ERTS_INLINE ErtsPortTask * +pop_task(Port *pp, ErtsPortTask **execqp) { - if (ptqp) { - ptqp->first = NULL; - ptqp->last = NULL; - } - return ptqp; -} + ErtsPortTask *ptp; -static ERTS_INLINE void -enqueue_task(ErtsPortTaskQueue *ptqp, ErtsPortTask *ptp) -{ - ERTS_PT_CHK_NOT_IN_TASKQ(ptqp, ptp); - ptp->next = NULL; - ptp->prev = ptqp->last; - ptp->queue = ptqp; - if (ptqp->last) { - ASSERT(ptqp->first); - ptqp->last->next = ptp; - } - else { - ASSERT(!ptqp->first); - ptqp->first = ptp; + ptp = *execqp; + if (ptp) { + *execqp = ptp->u.alive.next; + return ptp; } - ptqp->last = ptp; - ERTS_PT_CHK_IN_TASKQ(ptqp, ptp); + + ASSERT(!pp->sched.taskq.local); + + erts_port_task_sched_lock(&pp->sched); + ptp = pp->sched.taskq.in.first; + pp->sched.taskq.in.first = NULL; + pp->sched.taskq.in.last = NULL; + if (ptp) + *execqp = ptp->u.alive.next; + else + erts_smp_atomic32_read_band_nob(&pp->sched.flags, + ~ERTS_PTS_FLG_HAVE_TASKS); + erts_port_task_sched_unlock(&pp->sched); + + + return ptp; } static ERTS_INLINE void -push_task(ErtsPortTaskQueue *ptqp, ErtsPortTask *ptp) +prepare_exec(Port *pp, ErtsPortTask **execqp) { - ERTS_PT_CHK_NOT_IN_TASKQ(ptqp, ptp); - ptp->next = ptqp->first; - ptp->prev = NULL; - ptp->queue = ptqp; - if (ptqp->first) { - ASSERT(ptqp->last); - ptqp->first->prev = ptp; - } - else { - ASSERT(!ptqp->last); - ptqp->last = ptp; + erts_aint32_t act; + *execqp = pp->sched.taskq.local; + + /* guess a likely value */ + act = ERTS_PTS_FLG_HAVE_TASKS|ERTS_PTS_FLG_IN_RUNQ; + + while (1) { + erts_aint32_t new, exp; + + new = exp = act; + + new &= ~ERTS_PTS_FLG_IN_RUNQ; + new |= ERTS_PTS_FLG_EXEC; + + act = erts_smp_atomic32_cmpxchg_nob(&pp->sched.flags, new, exp); + + ASSERT(act & ERTS_PTS_FLG_IN_RUNQ); + + if (exp == act) + break; } - ptqp->first = ptp; - ERTS_PT_CHK_IN_TASKQ(ptqp, ptp); } -static ERTS_INLINE void -dequeue_task(ErtsPortTask *ptp) +/* finalize_exec() return value != 0 if port should remain active */ +static ERTS_INLINE int +finalize_exec(Port *pp, ErtsPortTask **execq) { - ASSERT(ptp); - ASSERT(ptp->queue); - ERTS_PT_CHK_IN_TASKQ(ptp->queue, ptp); - if (ptp->next) - ptp->next->prev = ptp->prev; - else { - ASSERT(ptp->queue->last == ptp); - ptp->queue->last = ptp->prev; - } - if (ptp->prev) - ptp->prev->next = ptp->next; - else { - ASSERT(ptp->queue->first == ptp); - ptp->queue->first = ptp->next; - } + erts_aint32_t act; - ASSERT(ptp->queue->first || !ptp->queue->last); - ASSERT(ptp->queue->last || !ptp->queue->first); - ERTS_PT_CHK_NOT_IN_TASKQ(ptp->queue, ptp); -} + pp->sched.taskq.local = *execq; + *execq = NULL; -static ERTS_INLINE ErtsPortTask * -pop_task(ErtsPortTaskQueue *ptqp) -{ - ErtsPortTask *ptp = ptqp->first; - if (!ptp) { - ASSERT(!ptqp->last); - } - else { - ERTS_PT_CHK_IN_TASKQ(ptqp, ptp); - ASSERT(!ptp->prev); - ptqp->first = ptp->next; - if (ptqp->first) - ptqp->first->prev = NULL; - else { - ASSERT(ptqp->last == ptp); - ptqp->last = NULL; - } - ASSERT(ptp->queue->first || !ptp->queue->last); - ASSERT(ptp->queue->last || !ptp->queue->first); - } - ERTS_PT_CHK_NOT_IN_TASKQ(ptqp, ptp); - return ptp; -} + /* guess a likely value */ + act = ERTS_PTS_FLG_EXEC; + if (execq) + act |= ERTS_PTS_FLG_HAVE_TASKS; -#ifdef HARD_DEBUG + while (1) { + erts_aint32_t new, exp; -static void -check_task_queue(ErtsPortTaskQueue *ptqp, - ErtsPortTask *chk_ptp, - int inq) -{ - ErtsPortTask *ptp; - ErtsPortTask *last_ptp; - ErtsPortTask *first_ptp = ptqp->first; - int found_forward = 0, found_backward = 0; - if (!first_ptp) { - ASSERT(!ptqp->last); - } - else { - ASSERT(!first_ptp->prev); - for (ptp = first_ptp; ptp; ptp = ptp->next) { - ASSERT(ptp->queue == ptqp); - if (chk_ptp == ptp) - found_forward = 1; - if (!ptp->prev) { - ASSERT(first_ptp == ptp); - } - if (!ptp->next) { - ASSERT(ptqp->last == ptp); - last_ptp = ptp; - } - } - for (ptp = last_ptp; ptp; ptp = ptp->prev) { - ASSERT(ptp->queue == ptqp); - if (chk_ptp == ptp) - found_backward = 1; - if (!ptp->prev) { - ASSERT(first_ptp == ptp); - } - if (!ptp->next) { - ASSERT(ptqp->last == ptp); - } - } - } - if (chk_ptp) { - if (inq) { - ASSERT(found_forward && found_backward); - } - else { - ASSERT(!found_forward && !found_backward); - } + new = exp = act; + + new &= ~ERTS_PTS_FLG_EXEC; + if (act & ERTS_PTS_FLG_HAVE_TASKS) + new |= ERTS_PTS_FLG_IN_RUNQ; + + act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp); + + ASSERT(!(act & ERTS_PTS_FLG_IN_RUNQ)); + + if (exp == act) + break; } + + return (act & ERTS_PTS_FLG_HAVE_TASKS) != 0; } -#endif /* * Abort a scheduled task. */ int -erts_port_task_abort(Eterm id, ErtsPortTaskHandle *pthp) +erts_port_task_abort(ErtsPortTaskHandle *pthp) { - ErtsRunQueue *runq; - ErtsPortTaskQueue *ptqp; + int res; ErtsPortTask *ptp; - Port *pp; - - pp = erts_port_lookup_raw(id); - if (!pp) - return 1; - - runq = erts_port_runq(pp); - if (!runq) - return 1; +#ifdef ERTS_SMP + ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay(); +#endif ptp = handle2task(pthp); + if (!ptp) + res = -1; + else { + erts_aint32_t old_state; + +#ifdef DEBUG + ErtsPortTaskHandle *saved_pthp = ptp->u.alive.handle; + ERTS_SMP_READ_MEMORY_BARRIER; + old_state = erts_smp_atomic32_read_nob(&ptp->state); + if (old_state == ERTS_PT_STATE_SCHEDULED) { + ASSERT(saved_pthp == pthp); + } +#endif - if (!ptp) { - erts_smp_runq_unlock(runq); - return 1; - } - - ASSERT(ptp->handle == pthp); - ptqp = ptp->queue; - ASSERT(pp == ptqp->port); + old_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state, + ERTS_PT_STATE_ABORTED, + ERTS_PT_STATE_SCHEDULED); + if (old_state != ERTS_PT_STATE_SCHEDULED) + res = - 1; /* Task already aborted, executing, or executed */ + else { - ERTS_PT_CHK_PRES_PORTQ(runq, pp); - ASSERT(ptqp); - ASSERT(ptqp->first); + reset_port_task_handle(pthp); - dequeue_task(ptp); - reset_handle(ptp); + switch (ptp->type) { + case ERTS_PORT_TASK_INPUT: + case ERTS_PORT_TASK_OUTPUT: + case ERTS_PORT_TASK_EVENT: + ASSERT(erts_smp_atomic_read_nob( + &erts_port_task_outstanding_io_tasks) > 0); + erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks); + break; + default: + break; + } - switch (ptp->type) { - case ERTS_PORT_TASK_INPUT: - case ERTS_PORT_TASK_OUTPUT: - case ERTS_PORT_TASK_EVENT: - ASSERT(erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) > 0); - erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks); - break; - default: - break; + res = 0; + } } - ASSERT(ptqp == pp->sched.taskq || ptqp == pp->sched.exe_taskq); - - if (ptqp->first || pp->sched.taskq != ptqp) - ptqp = NULL; - else - pp->sched.taskq = NULL; - - ERTS_PT_CHK_PRES_PORTQ(runq, pp); - - erts_smp_runq_unlock(runq); - - port_task_free(ptp); - if (ptqp) - port_taskq_free(ptqp); +#ifdef ERTS_SMP + erts_thr_progress_unmanaged_continue(dhndl); +#endif - return 0; + return res; } /* @@ -487,242 +381,209 @@ int erts_port_task_schedule(Eterm id, ErtsPortTaskHandle *pthp, ErtsPortTaskType type, - ErlDrvEvent event, - ErlDrvEventData event_data) + ...) { +#ifdef ERTS_SMP + ErtsRunQueue *xrunq; + ErtsThrPrgrDelayHandle dhndl; +#endif ErtsRunQueue *runq; Port *pp; - ErtsPortTask *ptp; - int enq_port = 0; - - /* - * NOTE: We might not have the port lock here. We are only - * allowed to access the 'sched', 'tab_status', - * and 'id' fields of the port struct while - * tasks_lock is held. - */ + ErtsPortTask *ptp = NULL; + erts_aint32_t act; if (pthp && erts_port_task_is_scheduled(pthp)) { ASSERT(0); - erts_port_task_abort(id, pthp); + erts_port_task_abort(pthp); } - ptp = port_task_alloc(); - ASSERT(is_internal_port(id)); - pp = erts_port_lookup_raw(id); - if (!pp) - return -1; - - runq = erts_port_runq(pp); - - if (!runq || ERTS_PORT_TASK_INVALID_PORT(pp, id)) { - if (runq) - erts_smp_runq_unlock(runq); - return -1; - } - - ASSERT(!erts_port_task_is_scheduled(pthp)); - ERTS_PT_CHK_PRES_PORTQ(runq, pp); +#ifdef ERTS_SMP + dhndl = erts_thr_progress_unmanaged_delay(); +#endif - if (!pp->sched.taskq) { - pp->sched.taskq = port_taskq_init(port_taskq_alloc(), pp); - enq_port = !pp->sched.in_runq && !pp->sched.exe_taskq; - } + pp = erts_port_lookup_raw(id); #ifdef ERTS_SMP - if (enq_port) { - ErtsRunQueue *xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL); - if (xrunq) { - /* Port emigrated ... */ - erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq); - erts_smp_runq_unlock(runq); - runq = erts_port_runq(pp); - if (!runq) - return -1; - } + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) { + if (pp) + erts_port_inc_refc(pp); + erts_thr_progress_unmanaged_continue(dhndl); } #endif - ASSERT(pp->sched.taskq); - ASSERT(ptp); + if (!pp) + goto fail; + + ptp = port_task_alloc(); ptp->type = type; - ptp->event = event; - ptp->event_data = event_data; - set_handle(ptp, pthp); + erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED); switch (type) { - case ERTS_PORT_TASK_FREE: - erl_exit(ERTS_ABORT_EXIT, - "erts_port_task_schedule(): Cannot schedule free task\n"); - break; case ERTS_PORT_TASK_INPUT: - case ERTS_PORT_TASK_OUTPUT: - case ERTS_PORT_TASK_EVENT: + case ERTS_PORT_TASK_OUTPUT: { + va_list argp; + va_start(argp, type); + ptp->u.alive.u.io.event = va_arg(argp, ErlDrvEvent); + va_end(argp); erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks); - /* Fall through... */ + break; + } + case ERTS_PORT_TASK_EVENT: { + va_list argp; + va_start(argp, type); + ptp->u.alive.u.io.event = va_arg(argp, ErlDrvEvent); + ptp->u.alive.u.io.event_data = va_arg(argp, ErlDrvEventData); + va_end(argp); + erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks); + break; + } default: - enqueue_task(pp->sched.taskq, ptp); break; } -#ifndef ERTS_SMP - /* - * When (!enq_port && !pp->sched.exe_taskq) is true in the smp case, - * the port might not be in the run queue. If this is the case, another - * thread is in the process of enqueueing the port. This very seldom - * occur, but do occur and is a valid scenario. Debug info showing this - * enqueue in progress must be introduced before we can enable (modified - * versions of these) assertions in the smp case again. - */ -#if defined(HARD_DEBUG) - if (pp->sched.exe_taskq || enq_port) - ERTS_PT_CHK_NOT_IN_PORTQ(runq, pp); - else - ERTS_PT_CHK_IN_PORTQ(runq, pp); -#elif defined(DEBUG) - if (!enq_port && !pp->sched.exe_taskq) { - /* We should be in port run q */ - ASSERT(pp->sched.in_runq); - } -#endif -#endif + set_handle(ptp, pthp); - if (!enq_port) { - ERTS_PT_CHK_PRES_PORTQ(runq, pp); - erts_smp_runq_unlock(runq); + act = enqueue_task(pp, ptp); + if (act & ERTS_PTS_FLG_EXIT) { + reset_handle(ptp); + goto fail; } - else { - enqueue_port(runq, pp); - ERTS_PT_CHK_PRES_PORTQ(runq, pp); - - if (erts_system_profile_flags.runnable_ports) { - profile_runnable_port(pp, am_active); + + while (1) { + erts_aint32_t new, exp; + + if ((act & ERTS_PTS_FLG_HAVE_TASKS) + && (act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) + goto done; /* Done */ + + new = exp = act; + new |= ERTS_PTS_FLG_HAVE_TASKS; + if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) + new |= ERTS_PTS_FLG_IN_RUNQ; + + act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp); + + if (exp == act) { + if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) + break; /* Need to enqueue port */ + goto done; /* Done */ } + if (act & ERTS_PTS_FLG_EXIT) + goto done; /* Died after our task insert... */ + } + + /* Enqueue port on run-queue */ + + runq = erts_port_runq(pp); + if (!runq) + ERTS_INTERNAL_ERROR("Missing run-queue"); + +#ifdef ERTS_SMP + xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL); + if (xrunq) { + /* Port emigrated ... */ + erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq); erts_smp_runq_unlock(runq); + runq = erts_port_runq(pp); + if (!runq) + ERTS_INTERNAL_ERROR("Missing run-queue"); + } +#endif - erts_smp_notify_inc_runq(runq); + enqueue_port(runq, pp); + + if (erts_system_profile_flags.runnable_ports) { + profile_runnable_port(pp, am_active); } + + erts_smp_runq_unlock(runq); + + erts_smp_notify_inc_runq(runq); + +done: + +#ifdef ERTS_SMP + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) + erts_port_dec_refc(pp); +#endif + return 0; + +fail: + +#ifdef ERTS_SMP + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) + erts_port_dec_refc(pp); +#endif + + if (ptp) + port_task_free(ptp); + + return -1; } void erts_port_task_free_port(Port *pp) { + erts_aint32_t flags; ErtsRunQueue *runq; - ErtsPortTaskQueue *ptqp; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)); + runq = erts_port_runq(pp); - ASSERT(runq); - ERTS_PT_CHK_PRES_PORTQ(runq, pp); - ptqp = pp->sched.exe_taskq; - if (ptqp) { - /* I (this thread) am currently executing this port, free it - when scheduled out... */ - ErtsPortTask *ptp; - enqueue_free: - ptp = port_task_alloc(); - erts_atomic32_read_bset_relb(&pp->state, - (ERTS_PORT_SFLG_CLOSING - | ERTS_PORT_SFLG_FREE_SCHEDULED), - ERTS_PORT_SFLG_FREE_SCHEDULED); - ptp->type = ERTS_PORT_TASK_FREE; - ptp->event = (ErlDrvEvent) -1; - ptp->event_data = NULL; - set_handle(ptp, NULL); - push_task(ptqp, ptp); - ERTS_PT_CHK_PRES_PORTQ(runq, pp); - erts_smp_runq_unlock(runq); - } - else { - if (pp->sched.in_runq) { - ptqp = pp->sched.taskq; - if (!ptqp) - pp->sched.taskq = ptqp = port_taskq_init(port_taskq_alloc(), pp); - goto enqueue_free; - } - ASSERT(!pp->sched.taskq); - erts_atomic32_read_bset_relb(&pp->state, - (ERTS_PORT_SFLG_CLOSING - | ERTS_PORT_SFLG_FREE_SCHEDULED), - ERTS_PORT_SFLG_FREE_SCHEDULED); - handle_remaining_tasks(runq, pp); /* May release runq lock */ - ASSERT(!pp->sched.exe_taskq && (!ptqp || !ptqp->first)); - pp->sched.taskq = NULL; - ERTS_PT_CHK_PRES_PORTQ(runq, pp); - erts_smp_runq_unlock(runq); -#ifndef ERTS_SMP - pp->cleanup = 1; -#endif - } -} + if (!runq) + ERTS_INTERNAL_ERROR("Missing run-queue"); + erts_port_task_sched_lock(&pp->sched); + flags = erts_smp_atomic32_read_bor_relb(&pp->sched.flags, + ERTS_PTS_FLG_EXIT); + erts_port_task_sched_unlock(&pp->sched); + erts_atomic32_read_bset_relb(&pp->state, + (ERTS_PORT_SFLG_CLOSING + | ERTS_PORT_SFLG_FREE), + ERTS_PORT_SFLG_FREE); -typedef struct { - ErtsRunQueue *runq; - int *resp; -} ErtsPortTaskExeBlockData; + erts_smp_runq_unlock(runq); + + if (!(flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) + begin_port_cleanup(pp, NULL); +} /* - * Run all scheduled tasks for the first port in run queue. If - * new tasks appear while running reschedule port (free task is - * an exception; it is always handled instantly). + * Execute scheduled tasks of a port. * * erts_port_task_execute() is called by scheduler threads between - * scheduleing of processes. Sched lock should be held by caller. + * scheduling of processes. Run-queue lock should be held by caller. */ int erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) { Port *pp; - ErtsPortTaskQueue *ptqp; - ErtsPortTask *ptp; + ErtsPortTask *execq; int res = 0; int reds = ERTS_PORT_REDS_EXECUTE; erts_aint_t io_tasks_executed = 0; int fpe_was_unmasked; + erts_aint32_t state; + int active; ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); - ERTS_PT_CHK_PORTQ(runq); - pp = pop_port(runq); if (!pp) { res = 0; goto done; } - if (erts_smp_port_trylock(pp) == EBUSY) { - erts_smp_runq_unlock(runq); - erts_smp_port_lock(pp); - erts_smp_runq_lock(runq); - } - - ASSERT(pp->sched.in_runq); - pp->sched.in_runq = 0; - - if (!pp->sched.taskq) { - if (erts_system_profile_flags.runnable_ports) - profile_runnable_port(pp, am_inactive); - res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) - != (erts_aint_t) 0); - goto release_port_done; - } + erts_smp_runq_unlock(runq); *curr_port_pp = pp; - - ASSERT(pp->sched.taskq->first); - ptqp = pp->sched.taskq; - pp->sched.taskq = NULL; - - ASSERT(!pp->sched.exe_taskq); - pp->sched.exe_taskq = ptqp; if (erts_sched_stat.enabled) { ErtsSchedulerData *esdp = erts_get_scheduler_data(); @@ -739,49 +600,43 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) erts_smp_spin_unlock(&erts_sched_stat.lock); } + prepare_exec(pp, &execq); + + erts_smp_port_lock(pp); + /* trace port scheduling, in */ if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) { trace_sched_ports(pp, am_in); } - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); + fpe_was_unmasked = erts_block_fpe(); - ERTS_PT_CHK_PRES_PORTQ(runq, pp); - ptp = pop_task(ptqp); + state = erts_atomic_read_nob(&pp->state); + goto begin_handle_tasks; - fpe_was_unmasked = erts_block_fpe(); + while (1) { + erts_aint32_t task_state; + ErtsPortTask *ptp; - while (ptp) { - ASSERT(pp->sched.taskq != pp->sched.exe_taskq); + ptp = pop_task(pp, &execq); + if (!ptp) + break; + + task_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state, + ERTS_PT_STATE_EXECUTING, + ERTS_PT_STATE_SCHEDULED); + if (task_state != ERTS_PT_STATE_SCHEDULED) { + ASSERT(task_state == ERTS_PT_STATE_ABORTED); + goto aborted_port_task; + } reset_handle(ptp); - erts_smp_runq_unlock(runq); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); ERTS_SMP_CHK_NO_PROC_LOCKS; ASSERT(pp->drv_ptr); switch (ptp->type) { - case ERTS_PORT_TASK_FREE: /* May be pushed in q at any time */ - reds += ERTS_PORT_REDS_FREE; - erts_smp_runq_lock(runq); - - erts_unblock_fpe(fpe_was_unmasked); - ASSERT(erts_atomic32_read_nob(&pp->state) - & ERTS_PORT_SFLG_FREE_SCHEDULED); - if (ptqp->first || (pp->sched.taskq && pp->sched.taskq->first)) - handle_remaining_tasks(runq, pp); - ASSERT(!ptqp->first - && (!pp->sched.taskq || !pp->sched.taskq->first)); - - port_task_free(ptp); - if (pp->sched.taskq) - port_taskq_free(pp->sched.taskq); - pp->sched.taskq = NULL; -#ifndef ERTS_SMP - pp->cleanup = 1; -#endif - goto tasks_done; case ERTS_PORT_TASK_TIMEOUT: reds += ERTS_PORT_REDS_TIMEOUT; if (!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)) { @@ -795,7 +650,8 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_ready_input, pp); /* NOTE some windows drivers use ->ready_input for input and output */ - (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data, ptp->event); + (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data, + ptp->u.alive.u.io.event); io_tasks_executed++; break; case ERTS_PORT_TASK_OUTPUT: @@ -803,7 +659,8 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) ASSERT((erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_ready_output, pp); - (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, ptp->event); + (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, + ptp->u.alive.u.io.event); io_tasks_executed++; break; case ERTS_PORT_TASK_EVENT: @@ -811,7 +668,9 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) ASSERT((erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_event, pp); - (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data, ptp->event, ptp->event_data); + (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data, + ptp->u.alive.u.io.event, + ptp->u.alive.u.io.event_data); io_tasks_executed++; break; case ERTS_PORT_TASK_DIST_CMD: @@ -824,10 +683,11 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) break; } - if ((erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLG_CLOSING) - && erts_is_port_ioq_empty(pp)) { + state = erts_atomic32_read_nob(&pp->state); + if ((state & ERTS_PORT_SFLG_CLOSING) && erts_is_port_ioq_empty(pp)) { reds += ERTS_PORT_REDS_TERMINATE; erts_terminate_port(pp); + state = erts_atomic32_read_nob(&pp->state); } ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); @@ -840,17 +700,29 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); - port_task_free(ptp); + aborted_port_task: + schedule_port_task_free(ptp); - erts_smp_runq_lock(runq); + begin_handle_tasks: + if (state & ERTS_PORT_SFLG_FREE) { + reds += ERTS_PORT_REDS_FREE; - ptp = pop_task(ptqp); - } + begin_port_cleanup(pp, &execq); - tasks_done: + break; + } + + if (reds >= CONTEXT_REDS) + break; + } erts_unblock_fpe(fpe_was_unmasked); + /* trace port scheduling, out */ + if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) { + trace_sched_ports(pp, am_out); + } + if (io_tasks_executed) { ASSERT(erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) >= io_tasks_executed); @@ -858,15 +730,19 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) -1*io_tasks_executed); } - *curr_port_pp = NULL; - #ifdef ERTS_SMP ASSERT(runq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue)); #endif - if (!pp->sched.taskq) { - ASSERT(pp->sched.exe_taskq); - pp->sched.exe_taskq = NULL; + active = finalize_exec(pp, &execq); + + erts_port_release(pp); + + *curr_port_pp = NULL; + + erts_smp_runq_lock(runq); + + if (!active) { if (erts_system_profile_flags.runnable_ports) profile_runnable_port(pp, am_inactive); } @@ -876,15 +752,12 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) #endif ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)); - ASSERT(pp->sched.taskq->first); #ifdef ERTS_SMP xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL); if (!xrunq) { #endif enqueue_port(runq, pp); - ASSERT(pp->sched.exe_taskq); - pp->sched.exe_taskq = NULL; /* No need to notify ourselves about inc in runq. */ #ifdef ERTS_SMP } @@ -894,35 +767,20 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) erts_smp_runq_unlock(runq); xrunq = erts_port_runq(pp); - if (xrunq) { - enqueue_port(xrunq, pp); - ASSERT(pp->sched.exe_taskq); - pp->sched.exe_taskq = NULL; - erts_smp_runq_unlock(xrunq); - erts_smp_notify_inc_runq(xrunq); - } + ASSERT(xrunq); + enqueue_port(xrunq, pp); + erts_smp_runq_unlock(xrunq); + erts_smp_notify_inc_runq(xrunq); erts_smp_runq_lock(runq); } #endif } + done: res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) != (erts_aint_t) 0); - ERTS_PT_CHK_PRES_PORTQ(runq, pp); - - port_taskq_free(ptqp); - - /* trace port scheduling, out */ - if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) { - trace_sched_ports(pp, am_out); - } - -release_port_done: - erts_port_release(pp); - - done: ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); ERTS_PORT_REDUCTIONS_EXECUTED(runq, reds); @@ -930,47 +788,83 @@ release_port_done: return res; } -/* - * Handle remaining tasks after a free task. - */ +#ifdef ERTS_SMP +static void +release_port(void *vport) +{ + erts_port_dec_refc((Port *) vport); +} +#endif static void -handle_remaining_tasks(ErtsRunQueue *runq, Port *pp) +begin_port_cleanup(Port *pp, ErtsPortTask **execqp) { - int i; - ErtsPortTask *ptp; - ErtsPortTaskQueue *ptqps[] = {pp->sched.exe_taskq, pp->sched.taskq}; + int i, max; + ErtsPortTask *qs[2]; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); - for (i = 0; i < sizeof(ptqps)/sizeof(ErtsPortTaskQueue *); i++) { - if (!ptqps[i]) - continue; - ptp = pop_task(ptqps[i]); - while (ptp) { + /* + * Handle remaining tasks... + */ + + max = 0; + if (execqp && *execqp) { + qs[max++] = *execqp; + *execqp = NULL; + } + + erts_port_task_sched_lock(&pp->sched); + qs[max] = pp->sched.taskq.in.first; + pp->sched.taskq.in.first = NULL; + pp->sched.taskq.in.last = NULL; + erts_port_task_sched_unlock(&pp->sched); + if (qs[max]) + max++; + + for (i = 0; i < max; i++) { + while (1) { + erts_aint32_t state; + ErtsPortTask *ptp = qs[i]; + if (!ptp) + break; + + qs[i] = ptp->u.alive.next; + + /* Normal case here is aborted tasks... */ + state = erts_smp_atomic32_read_nob(&ptp->state); + if (state == ERTS_PT_STATE_ABORTED) + goto aborted_port_task; + + state = erts_smp_atomic32_cmpxchg_nob(&ptp->state, + ERTS_PT_STATE_EXECUTING, + ERTS_PT_STATE_SCHEDULED); + if (state != ERTS_PT_STATE_SCHEDULED) { + ASSERT(state == ERTS_PT_STATE_ABORTED); + goto aborted_port_task; + } + reset_handle(ptp); - erts_smp_runq_unlock(runq); switch (ptp->type) { - case ERTS_PORT_TASK_FREE: case ERTS_PORT_TASK_TIMEOUT: break; case ERTS_PORT_TASK_INPUT: erts_stale_drv_select(pp->common.id, - ptp->event, + ptp->u.alive.u.io.event, DO_READ, 1); break; case ERTS_PORT_TASK_OUTPUT: erts_stale_drv_select(pp->common.id, - ptp->event, + ptp->u.alive.u.io.event, DO_WRITE, 1); break; case ERTS_PORT_TASK_EVENT: erts_stale_drv_select(pp->common.id, - ptp->event, + ptp->u.alive.u.io.event, 0, 1); break; @@ -982,35 +876,41 @@ handle_remaining_tasks(ErtsRunQueue *runq, Port *pp) (int) ptp->type); } - port_task_free(ptp); - - erts_smp_runq_lock(runq); - ptp = pop_task(ptqps[i]); + aborted_port_task: + schedule_port_task_free(ptp); } } - ASSERT(!pp->sched.taskq || !pp->sched.taskq->first); + erts_smp_atomic32_read_band_nob(&pp->sched.flags, + ~ERTS_PTS_FLG_HAVE_TASKS); + + /* + * Schedule cleanup of port structure... + */ +#ifdef ERTS_SMP + erts_schedule_thr_prgr_later_op(release_port, + (void *) pp, + &pp->common.u.release); +#else + pp->cleanup = 1; +#endif } int erts_port_is_scheduled(Port *pp) { - int res; - ErtsRunQueue *runq = erts_port_runq(pp); - if (!runq) - return 0; - res = pp->sched.taskq || pp->sched.exe_taskq; - erts_smp_runq_unlock(runq); - return res; + erts_aint32_t flags = erts_smp_atomic32_read_acqb(&pp->sched.flags); + return (flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)) != 0; } #ifdef ERTS_SMP + void erts_enqueue_port(ErtsRunQueue *rq, Port *pp) { ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); ASSERT(rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue)); - ASSERT(pp->sched.in_runq); + ASSERT(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_IN_RUNQ); enqueue_port(rq, pp); } @@ -1022,7 +922,8 @@ erts_dequeue_port(ErtsRunQueue *rq) pp = pop_port(rq); ASSERT(!pp || rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue)); - ASSERT(!pp || pp->sched.in_runq); + ASSERT(!pp || (erts_smp_atomic32_read_nob(&pp->sched.flags) + & ERTS_PTS_FLG_IN_RUNQ)); return pp; } @@ -1037,5 +938,4 @@ erts_port_task_init(void) erts_smp_atomic_init_nob(&erts_port_task_outstanding_io_tasks, (erts_aint_t) 0); init_port_task_alloc(); - init_port_taskq_alloc(); } diff --git a/erts/emulator/beam/erl_port_task.h b/erts/emulator/beam/erl_port_task.h index fd88b1c1ff..3e87d5b778 100644 --- a/erts/emulator/beam/erl_port_task.h +++ b/erts/emulator/beam/erl_port_task.h @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 2006-2011. All Rights Reserved. + * Copyright Ericsson AB 2006-2012. All Rights Reserved. * * The contents of this file are subject to the Erlang Public License, * Version 1.1, (the "License"); you may not use this file except in @@ -44,7 +44,6 @@ typedef erts_smp_atomic_t ErtsPortTaskHandle; #endif typedef enum { - ERTS_PORT_TASK_FREE, ERTS_PORT_TASK_INPUT, ERTS_PORT_TASK_OUTPUT, ERTS_PORT_TASK_EVENT, @@ -57,19 +56,36 @@ typedef enum { extern erts_smp_atomic_t erts_port_task_outstanding_io_tasks; #endif +#define ERTS_PTS_FLG_IN_RUNQ (((erts_aint32_t) 1) << 0) +#define ERTS_PTS_FLG_EXEC (((erts_aint32_t) 1) << 1) +#define ERTS_PTS_FLG_HAVE_TASKS (((erts_aint32_t) 1) << 2) +#define ERTS_PTS_FLG_EXIT (((erts_aint32_t) 1) << 3) + typedef struct ErtsPortTask_ ErtsPortTask; -typedef struct ErtsPortTaskQueue_ ErtsPortTaskQueue; typedef struct { Port *next; - int in_runq; - ErtsPortTaskQueue *taskq; - ErtsPortTaskQueue *exe_taskq; + struct { + ErtsPortTask *local; + struct { + ErtsPortTask *first; + ErtsPortTask *last; + } in; + } taskq; + erts_smp_atomic32_t flags; +#ifdef ERTS_SMP + erts_mtx_t mtx; +#endif } ErtsPortTaskSched; ERTS_GLB_INLINE void erts_port_task_handle_init(ErtsPortTaskHandle *pthp); ERTS_GLB_INLINE int erts_port_task_is_scheduled(ErtsPortTaskHandle *pthp); -ERTS_GLB_INLINE void erts_port_task_init_sched(ErtsPortTaskSched *ptsp); +ERTS_GLB_INLINE void erts_port_task_init_sched(ErtsPortTaskSched *ptsp, + Eterm id); +ERTS_GLB_INLINE void erts_port_task_fini_sched(ErtsPortTaskSched *ptsp); +ERTS_GLB_INLINE void erts_port_task_sched_lock(ErtsPortTaskSched *ptsp); +ERTS_GLB_INLINE void erts_port_task_sched_unlock(ErtsPortTaskSched *ptsp); + #ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS ERTS_GLB_INLINE int erts_port_task_have_outstanding_io_tasks(void); #endif @@ -89,12 +105,40 @@ erts_port_task_is_scheduled(ErtsPortTaskHandle *pthp) } ERTS_GLB_INLINE void -erts_port_task_init_sched(ErtsPortTaskSched *ptsp) +erts_port_task_init_sched(ErtsPortTaskSched *ptsp, Eterm instr_id) { ptsp->next = NULL; - ptsp->in_runq = 0; - ptsp->taskq = NULL; - ptsp->exe_taskq = NULL; + ptsp->taskq.local = NULL; + ptsp->taskq.in.first = NULL; + ptsp->taskq.in.last = NULL; + erts_smp_atomic32_init_nob(&ptsp->flags, 0); +#ifdef ERTS_SMP + erts_mtx_init_x(&ptsp->mtx, "port_sched_lock", instr_id); +#endif +} + +ERTS_GLB_INLINE void +erts_port_task_sched_lock(ErtsPortTaskSched *ptsp) +{ +#ifdef ERTS_SMP + erts_mtx_lock(&ptsp->mtx); +#endif +} + +ERTS_GLB_INLINE void +erts_port_task_sched_unlock(ErtsPortTaskSched *ptsp) +{ +#ifdef ERTS_SMP + erts_mtx_unlock(&ptsp->mtx); +#endif +} + +ERTS_GLB_INLINE void +erts_port_task_fini_sched(ErtsPortTaskSched *ptsp) +{ +#ifdef ERTS_SMP + erts_mtx_destroy(&ptsp->mtx); +#endif } #ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS @@ -115,12 +159,11 @@ int erts_port_task_execute(ErtsRunQueue *, Port **); void erts_port_task_init(void); #endif -int erts_port_task_abort(Eterm id, ErtsPortTaskHandle *); +int erts_port_task_abort(ErtsPortTaskHandle *); int erts_port_task_schedule(Eterm, ErtsPortTaskHandle *, ErtsPortTaskType, - ErlDrvEvent, - ErlDrvEventData); + ...); void erts_port_task_free_port(Port *); int erts_port_is_scheduled(Port *); diff --git a/erts/emulator/beam/erl_ptab.c b/erts/emulator/beam/erl_ptab.c index 7ab756ae8d..8dcdf94c80 100644 --- a/erts/emulator/beam/erl_ptab.c +++ b/erts/emulator/beam/erl_ptab.c @@ -415,11 +415,6 @@ last_data_cmp(Uint64 ld1, Uint64 ld2) #define ERTS_PTAB_LastData2EtermData(LD) \ ((Eterm) ((LD) & ~(~((Uint64) 0) << ERTS_PTAB_ID_DATA_SIZE))) -static void noop(void *unused) -{ - -} - void erts_ptab_init_table(ErtsPTab *ptab, ErtsAlcType_t atype, @@ -484,10 +479,6 @@ erts_ptab_init_table(ErtsPTab *ptab, ptab->r.o.invalid_element = invalid_element; ptab->r.o.invalid_data = erts_ptab_id2data(ptab, invalid_element->id); ptab->r.o.release_element = release_element; - if (release_element) - ptab->r.o.release_element = release_element; - else - ptab->r.o.release_element = noop; erts_smp_interval_init(&ptab->list.data.interval); ptab->list.data.deleted.start = NULL; @@ -678,9 +669,10 @@ erts_ptab_delete_element(ErtsPTab *ptab, erts_ptab_rwunlock(ptab); } - erts_schedule_thr_prgr_later_op(ptab->r.o.release_element, - (void *) ptab_el, - &ptab_el->u.release); + if (ptab->r.o.release_element) + erts_schedule_thr_prgr_later_op(ptab->r.o.release_element, + (void *) ptab_el, + &ptab_el->u.release); } /* diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 1177e7e691..78c27b1385 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -197,19 +197,58 @@ erts_lc_is_port_locked(Port *prt) static void initq(Port* prt); +#if defined(ERTS_ENABLE_LOCK_CHECK) || defined(ERTS_ENABLE_LOCK_COUNT) +#define ERTS_PORT_INIT_INSTR_NEED_ID 1 +#else +#define ERTS_PORT_INIT_INSTR_NEED_ID 0 +#endif + +static ERTS_INLINE void port_init_instr(Port *prt +#if ERTS_PORT_INIT_INSTR_NEED_ID + , Eterm id +#endif + ) +{ +#if !ERTS_PORT_INIT_INSTR_NEED_ID + Eterm id = NIL; /* Not used */ +#endif + + /* + * Stuff that need to be initialized with the port id + * in the instrumented case, but not in the normal case. + */ +#ifdef ERTS_SMP + ASSERT(prt->drv_ptr && prt->lock); + if (!prt->drv_ptr->lock) + erts_mtx_init_locked_x(prt->lock, "port_lock", id); +#endif + erts_port_task_init_sched(&prt->sched, id); +} + +#if !ERTS_PORT_INIT_INSTR_NEED_ID +static ERTS_INLINE void port_init_instr_abort(Port *prt) +{ +#ifdef ERTS_SMP + ASSERT(prt->drv_ptr && prt->lock); + if (!prt->drv_ptr->lock) { + erts_mtx_unlock(prt->lock); + erts_mtx_destroy(prt->lock); + } +#endif + erts_port_task_fini_sched(&prt->sched); +} +#endif + static void insert_port_struct(void *vprt, Eterm data) { Port *prt = (Port *) vprt; Eterm id = make_internal_port(data); -#ifdef ERTS_SMP - ASSERT(prt->drv_ptr && prt->lock); +#if ERTS_PORT_INIT_INSTR_NEED_ID /* - * We are breaking lock order in the port specific locking - * case. This is, however, safe since the lock has not been - * published, yet. + * This cannot be done earlier in the instrumented + * case since we don't now 'id' until now. */ - if (!prt->drv_ptr->lock) - erts_mtx_init_locked_x(prt->lock, "port_lock", id); + port_init_instr(prt, id); #endif prt->common.id = id; erts_atomic32_init_relb(&prt->state, ERTS_PORT_SFLG_INITIALIZING); @@ -292,7 +331,6 @@ static Port *create_port(char *name, sys_memset(&prt->common.u.alive.tm, 0, sizeof(ErlTimer)); #endif erts_port_task_handle_init(&prt->timeout_task); - erts_port_task_init_sched(&prt->sched); prt->psd = NULL; prt->drv_data = (SWord) 0; @@ -301,10 +339,23 @@ static Port *create_port(char *name, ASSERT(((char *) prt) == ((char *) &prt->common)); +#if !ERTS_PORT_INIT_INSTR_NEED_ID + /* + * When 'id' isn't needed (the normal case), it is better to + * do the initialization here avoiding unnecessary contention + * on table... + */ + port_init_instr(prt); +#endif + if (!erts_ptab_new_element(&erts_port, &prt->common, (void *) prt, insert_port_struct)) { + +#if !ERTS_PORT_INIT_INSTR_NEED_ID + port_init_instr_abort(prt); +#endif #ifdef ERTS_SMP if (driver_lock) erts_mtx_unlock(driver_lock); @@ -343,10 +394,9 @@ erts_port_free(Port *prt) #if defined(ERTS_SMP) || defined(ERTS_ENABLE_LOCK_CHECK) erts_aint32_t state = erts_atomic32_read_nob(&prt->state); #endif - ERTS_LC_ASSERT(state & (ERTS_PORT_SFLG_FREE_SCHEDULED - | ERTS_PORT_SFLG_INITIALIZING)); + ERTS_LC_ASSERT(state & (ERTS_PORT_SFLG_INITIALIZING + | ERTS_PORT_SFLG_FREE)); ASSERT(state & ERTS_PORT_SFLG_PORT_DEBUG); - ERTS_LC_ASSERT(!(state & ERTS_PORT_SFLG_FREE)); #ifdef ERTS_SMP ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->common.refc) == 0); @@ -354,6 +404,8 @@ erts_port_free(Port *prt) ERTS_LC_ASSERT(erts_atomic32_read_nob(&prt->refc) == 0); #endif + erts_port_task_fini_sched(&prt->sched); + #ifdef ERTS_SMP ASSERT(prt->lock); if (state & ERTS_PORT_SFLG_PORT_SPECIFIC_LOCK) @@ -372,7 +424,6 @@ erts_port_free(Port *prt) if (prt->drv_ptr->handle) erts_ddll_dereference_driver(prt->drv_ptr->handle); #endif - erts_atomic32_set_nob(&prt->state, ERTS_PORT_SFLG_FREE); erts_free(ERTS_ALC_T_PORT, prt); } @@ -1230,14 +1281,6 @@ int erts_write_to_port(Eterm caller_id, Port *p, Eterm list) } } -#ifdef ERTS_SMP -static void -release_port(void *vport) -{ - erts_port_dec_refc((Port *) vport); -} -#endif - void erts_init_io(int port_tab_size, int port_tab_size_ignore_files) { @@ -1267,11 +1310,7 @@ void erts_init_io(int port_tab_size, erts_ptab_init_table(&erts_port, ERTS_ALC_T_PORT_TABLE, -#ifdef ERTS_SMP - release_port, -#else NULL, -#endif (ErtsPTabElementCommon *) &erts_invalid_port.common, port_tab_size, "port_table"); @@ -2727,11 +2766,9 @@ static void schedule_port_timeout(Port *p) * /Rickard */ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(p)); - (void) erts_port_task_schedule(p->common.id, - &p->timeout_task, - ERTS_PORT_TASK_TIMEOUT, - (ErlDrvEvent) -1, - NULL); + erts_port_task_schedule(p->common.id, + &p->timeout_task, + ERTS_PORT_TASK_TIMEOUT); } ErlDrvTermData driver_mk_term_nil(void) @@ -4198,7 +4235,7 @@ drv_cancel_timer(Port *prt) erts_cancel_timer(&prt->common.u.alive.tm); #endif if (erts_port_task_is_scheduled(&prt->timeout_task)) - erts_port_task_abort(prt->common.id, &prt->timeout_task); + erts_port_task_abort(&prt->timeout_task); } int driver_set_timer(ErlDrvPort ix, unsigned long t) diff --git a/erts/emulator/beam/sys.h b/erts/emulator/beam/sys.h index 12313f0984..fd6f34e96e 100644 --- a/erts/emulator/beam/sys.h +++ b/erts/emulator/beam/sys.h @@ -123,6 +123,16 @@ typedef ERTS_SYS_FD_TYPE ErtsSysFdType; # define ERTS_DECLARE_DUMMY(X) X #endif +#if !defined(__func__) +# if !defined(__STDC_VERSION__) || __STDC_VERSION__ < 199901L +# if !defined(__GNUC__) || __GNUC__ < 2 +# define __func__ "[unknown_function]" +# else +# define __func__ __FUNCTION__ +# endif +# endif +#endif + #if defined(DEBUG) || defined(ERTS_ENABLE_LOCK_CHECK) # undef ERTS_CAN_INLINE # define ERTS_CAN_INLINE 0 @@ -553,6 +563,10 @@ __decl_noreturn void __noreturn erl_exit(int n, char*, ...); #define ERTS_ABORT_EXIT (INT_MIN + 1) /* no crash dump; only abort() */ #define ERTS_DUMP_EXIT (INT_MIN + 2) /* crash dump; then exit() */ +#define ERTS_INTERNAL_ERROR(What) \ + erl_exit(ERTS_ABORT_EXIT, "%s:%d:%s(): Internal error: %s", \ + __FILE__, __LINE__, __func__, What) + Eterm erts_check_io_info(void *p); /* Size of misc memory allocated from system dependent code */ diff --git a/erts/emulator/sys/common/erl_check_io.c b/erts/emulator/sys/common/erl_check_io.c index 9ef861bfe4..9c78fe713b 100644 --- a/erts/emulator/sys/common/erl_check_io.c +++ b/erts/emulator/sys/common/erl_check_io.c @@ -366,7 +366,7 @@ abort_task(Eterm id, ErtsPortTaskHandle *pthp, EventStateType type) || !erts_port_task_is_scheduled(pthp)); } else if (erts_port_task_is_scheduled(pthp)) { - erts_port_task_abort(id, pthp); + erts_port_task_abort(pthp); ASSERT(erts_is_port_alive(id)); } } @@ -1088,8 +1088,7 @@ iready(Eterm id, ErtsDrvEventState *state) if (erts_port_task_schedule(id, &state->driver.select->intask, ERTS_PORT_TASK_INPUT, - (ErlDrvEvent) state->fd, - NULL) != 0) { + (ErlDrvEvent) state->fd) != 0) { stale_drv_select(id, state, ERL_DRV_READ); } } @@ -1100,8 +1099,7 @@ oready(Eterm id, ErtsDrvEventState *state) if (erts_port_task_schedule(id, &state->driver.select->outtask, ERTS_PORT_TASK_OUTPUT, - (ErlDrvEvent) state->fd, - NULL) != 0) { + (ErlDrvEvent) state->fd) != 0) { stale_drv_select(id, state, ERL_DRV_WRITE); } } -- cgit v1.2.3