diff options
Diffstat (limited to 'erts/emulator/beam/erl_port_task.c')
-rw-r--r-- | erts/emulator/beam/erl_port_task.c | 559 |
1 files changed, 275 insertions, 284 deletions
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c index 31d9a1e26e..a588477320 100644 --- a/erts/emulator/beam/erl_port_task.c +++ b/erts/emulator/beam/erl_port_task.c @@ -1,18 +1,19 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 2006-2013. All Rights Reserved. + * Copyright Ericsson AB 2006-2017. All Rights Reserved. * - * The contents of this file are subject to the Erlang Public License, - * Version 1.1, (the "License"); you may not use this file except in - * compliance with the License. You should have received a copy of the - * Erlang Public License along with this software. If not, it can be - * retrieved online at http://www.erlang.org/. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See - * the License for the specific language governing rights and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * * %CopyrightEnd% */ @@ -32,7 +33,10 @@ #include "global.h" #include "erl_port_task.h" #include "dist.h" +#include "erl_check_io.h" #include "dtrace-wrapper.h" +#include "lttng-wrapper.h" +#include "erl_check_io.h" #include <stdarg.h> /* @@ -67,8 +71,25 @@ static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_q #else #define DTRACE_DRIVER(PROBE_NAME, PP) do {} while(0) #endif +#ifdef USE_LTTNG_VM_TRACEPOINTS +#define LTTNG_DRIVER(TRACEPOINT, PP) \ + if (LTTNG_ENABLED(TRACEPOINT)) { \ + lttng_decl_portbuf(port_str); \ + lttng_decl_procbuf(proc_str); \ + lttng_pid_to_str(ERTS_PORT_GET_CONNECTED(PP), proc_str); \ + lttng_port_to_str((PP), port_str); \ + LTTNG3(TRACEPOINT, proc_str, port_str, (PP)->name); \ + } +#else +#define LTTNG_DRIVER(TRACEPOINT, PP) do {} while(0) +#endif -erts_smp_atomic_t erts_port_task_outstanding_io_tasks; +#define ERTS_LC_VERIFY_RQ(RQ, PP) \ + do { \ + ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq)); \ + ERTS_LC_ASSERT((RQ) == ((ErtsRunQueue *) \ + erts_atomic_read_nob(&(PP)->run_queue))); \ + } while (0) #define ERTS_PT_STATE_SCHEDULED 0 #define ERTS_PT_STATE_ABORTED 1 @@ -77,7 +98,6 @@ erts_smp_atomic_t erts_port_task_outstanding_io_tasks; typedef union { struct { /* I/O tasks */ ErlDrvEvent event; - ErlDrvEventData event_data; } io; struct { ErtsProc2PortSigCallback callback; @@ -86,7 +106,7 @@ typedef union { } ErtsPortTaskTypeData; struct ErtsPortTask_ { - erts_smp_atomic32_t state; + erts_atomic32_t state; ErtsPortTaskType type; union { struct { @@ -104,9 +124,7 @@ struct ErtsPortTaskHandleList_ { ErtsPortTaskHandle handle; union { ErtsPortTaskHandleList *next; -#ifdef ERTS_SMP ErtsThrPrgrLaterOp release; -#endif } u; }; @@ -129,35 +147,29 @@ static void begin_port_cleanup(Port *pp, ErtsPortTask **execq, int *processing_busy_q_p); -ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(port_task, - ErtsPortTask, - 1000, - ERTS_ALC_T_PORT_TASK) +ERTS_THR_PREF_QUICK_ALLOC_IMPL(port_task, + ErtsPortTask, + 1000, + ERTS_ALC_T_PORT_TASK) ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(busy_caller_table, ErtsPortTaskBusyCallerTable, 50, ERTS_ALC_T_BUSY_CALLER_TAB) -#ifdef ERTS_SMP static void call_port_task_free(void *vptp) { port_task_free((ErtsPortTask *) vptp); } -#endif static ERTS_INLINE void schedule_port_task_free(ErtsPortTask *ptp) { -#ifdef ERTS_SMP erts_schedule_thr_prgr_later_cleanup_op(call_port_task_free, (void *) ptp, &ptp->u.release, sizeof(ErtsPortTask)); -#else - port_task_free(ptp); -#endif } static ERTS_INLINE ErtsPortTask * @@ -171,20 +183,44 @@ p2p_sig_data_to_task(ErtsProc2PortSigData *sigdp) return ptp; } -ErtsProc2PortSigData * -erts_port_task_alloc_p2p_sig_data(void) +static ERTS_INLINE ErtsProc2PortSigData * +p2p_sig_data_init(ErtsPortTask *ptp) { - ErtsPortTask *ptp = port_task_alloc(); ptp->type = ERTS_PORT_TASK_PROC_SIG; ptp->u.alive.flags = ERTS_PT_FLG_SIG_DEP; - erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED); + erts_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED); ASSERT(ptp == p2p_sig_data_to_task(&ptp->u.alive.td.psig.data)); return &ptp->u.alive.td.psig.data; } +ErtsProc2PortSigData * +erts_port_task_alloc_p2p_sig_data(void) +{ + ErtsPortTask *ptp = port_task_alloc(); + + return p2p_sig_data_init(ptp); +} + +ErtsProc2PortSigData * +erts_port_task_alloc_p2p_sig_data_extra(size_t extra, void **extra_ptr) +{ + ErtsPortTask *ptp = erts_alloc(ERTS_ALC_T_PORT_TASK, + sizeof(ErtsPortTask) + extra); + + *extra_ptr = ptp+1; + + return p2p_sig_data_init(ptp); +} + +void +erts_port_task_free_p2p_sig_data(ErtsProc2PortSigData *sigdp) +{ + schedule_port_task_free(p2p_sig_data_to_task(sigdp)); +} + static ERTS_INLINE Eterm task_caller(ErtsPortTask *ptp) { @@ -244,7 +280,7 @@ popped_from_busy_queue(Port *pp, ErtsPortTask *ptp, int last) #ifdef DEBUG erts_aint32_t flags = #endif - erts_smp_atomic32_read_band_nob( + erts_atomic32_read_band_nob( &pp->sched.flags, ~ERTS_PTS_FLG_HAVE_BUSY_TASKS); ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS); @@ -291,7 +327,7 @@ busy_wait_move_to_busy_queue(Port *pp, ErtsPortTask *ptp) #ifdef DEBUG flags = #endif - erts_smp_atomic32_read_bor_nob(&pp->sched.flags, + erts_atomic32_read_bor_nob(&pp->sched.flags, ERTS_PTS_FLG_HAVE_BUSY_TASKS); ASSERT(!(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS)); @@ -431,7 +467,7 @@ no_sig_dep_move_from_busyq(Port *pp) int bix; erts_aint32_t flags = #endif - erts_smp_atomic32_read_band_nob( + erts_atomic32_read_band_nob( &pp->sched.flags, ~ERTS_PTS_FLG_HAVE_BUSY_TASKS); ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS); @@ -464,11 +500,11 @@ chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue) if (!first) { ASSERT(!tabp); ASSERT(!pp->sched.taskq.local.busy.last); - ASSERT(!(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS)); + ASSERT(!(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS)); return; } - ASSERT(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS); + ASSERT(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS); ASSERT(tabp); tot_count = 0; @@ -524,13 +560,13 @@ chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue) static ERTS_INLINE void reset_port_task_handle(ErtsPortTaskHandle *pthp) { - erts_smp_atomic_set_relb(pthp, (erts_aint_t) NULL); + erts_atomic_set_relb(pthp, (erts_aint_t) NULL); } static ERTS_INLINE ErtsPortTask * handle2task(ErtsPortTaskHandle *pthp) { - return (ErtsPortTask *) erts_smp_atomic_read_acqb(pthp); + return (ErtsPortTask *) erts_atomic_read_acqb(pthp); } static ERTS_INLINE void @@ -543,11 +579,22 @@ reset_handle(ErtsPortTask *ptp) } static ERTS_INLINE void +reset_executed_io_task_handle(ErtsPortTask *ptp) +{ + if (ptp->u.alive.handle) { + ASSERT(ptp == handle2task(ptp->u.alive.handle)); + /* The port task handle is reset inside task_executed */ + erts_io_notify_port_task_executed(ptp->type, ptp->u.alive.handle, + reset_port_task_handle); + } +} + +static ERTS_INLINE void set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp) { ptp->u.alive.handle = pthp; if (pthp) { - erts_smp_atomic_set_relb(pthp, (erts_aint_t) ptp); + erts_atomic_set_relb(pthp, (erts_aint_t) ptp); ASSERT(ptp == handle2task(ptp->u.alive.handle)); } } @@ -561,7 +608,7 @@ set_tmp_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp) * IMPORTANT! Task either need to be aborted, or task handle * need to be detached before thread progress has been made. */ - erts_smp_atomic_set_relb(pthp, (erts_aint_t) ptp); + erts_atomic_set_relb(pthp, (erts_aint_t) ptp); } } @@ -579,20 +626,20 @@ check_unset_busy_port_q(Port *pp, int resume_procs = 0; ASSERT(bpq); - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); + ERTS_LC_ASSERT(erts_lc_is_port_locked(pp)); erts_port_task_sched_lock(&pp->sched); - qsize = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size); - low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low); + qsize = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->size); + low = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low); if (qsize < low) { erts_aint32_t mask = ~(ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q | ERTS_PTS_FLG_BUSY_PORT_Q); - flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags, mask); + flags = erts_atomic32_read_band_relb(&pp->sched.flags, mask); if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q) resume_procs = 1; } else if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) { - flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags, + flags = erts_atomic32_read_band_relb(&pp->sched.flags, ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q); flags &= ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q; } @@ -617,16 +664,16 @@ aborted_proc2port_data(Port *pp, ErlDrvSizeT size) bpq = pp->sched.taskq.bpq; - qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size, + qsz = (ErlDrvSizeT) erts_atomic_add_read_acqb(&bpq->size, (erts_aint_t) -size); ASSERT(qsz + size > qsz); - flags = erts_smp_atomic32_read_nob(&pp->sched.flags); + flags = erts_atomic32_read_nob(&pp->sched.flags); ASSERT(pp->sched.taskq.bpq); if ((flags & (ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q | ERTS_PTS_FLG_BUSY_PORT_Q)) != ERTS_PTS_FLG_BUSY_PORT_Q) return; - if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low)) - erts_smp_atomic32_read_bor_nob(&pp->sched.flags, + if (qsz < (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low)) + erts_atomic32_read_bor_nob(&pp->sched.flags, ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q); } @@ -644,13 +691,13 @@ dequeued_proc2port_data(Port *pp, ErlDrvSizeT size) bpq = pp->sched.taskq.bpq; - qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size, + qsz = (ErlDrvSizeT) erts_atomic_add_read_acqb(&bpq->size, (erts_aint_t) -size); ASSERT(qsz + size > qsz); - flags = erts_smp_atomic32_read_nob(&pp->sched.flags); + flags = erts_atomic32_read_nob(&pp->sched.flags); if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q)) return; - if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->low)) + if (qsz < (ErlDrvSizeT) erts_atomic_read_acqb(&bpq->low)) check_unset_busy_port_q(pp, flags, bpq); } @@ -663,19 +710,19 @@ enqueue_proc2port_data(Port *pp, if (sigdp && bpq) { ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp); if (size) { - erts_aint_t asize = erts_smp_atomic_add_read_acqb(&bpq->size, + erts_aint_t asize = erts_atomic_add_read_acqb(&bpq->size, (erts_aint_t) size); ErlDrvSizeT qsz = (ErlDrvSizeT) asize; ASSERT(qsz - size < qsz); if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q) && qsz > bpq->high) { - flags = erts_smp_atomic32_read_bor_acqb(&pp->sched.flags, + flags = erts_atomic32_read_bor_acqb(&pp->sched.flags, ERTS_PTS_FLG_BUSY_PORT_Q); flags |= ERTS_PTS_FLG_BUSY_PORT_Q; - qsz = (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->size); - if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low)) { - flags = (erts_smp_atomic32_read_bor_relb( + qsz = (ErlDrvSizeT) erts_atomic_read_acqb(&bpq->size); + if (qsz < (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low)) { + flags = (erts_atomic32_read_bor_relb( &pp->sched.flags, ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)); flags |= ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q; @@ -723,18 +770,18 @@ erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp erts_aint32_t flags; pp->sched.taskq.bpq = NULL; flags = ~(ERTS_PTS_FLG_BUSY_PORT_Q|ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q); - flags = erts_smp_atomic32_read_band_acqb(&pp->sched.flags, flags); + flags = erts_atomic32_read_band_acqb(&pp->sched.flags, flags); if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q) resume_procs = 1; } else { if (!low) - low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low); + low = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low); else { if (bpq->high < low) bpq->high = low; - erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low); + erts_atomic_set_relb(&bpq->low, (erts_aint_t) low); written = 1; } @@ -743,19 +790,19 @@ erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp else { if (low > high) { low = high; - erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low); + erts_atomic_set_relb(&bpq->low, (erts_aint_t) low); } bpq->high = high; written = 1; } if (written) { - ErlDrvSizeT size = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size); + ErlDrvSizeT size = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->size); if (size > high) - erts_smp_atomic32_read_bor_relb(&pp->sched.flags, + erts_atomic32_read_bor_relb(&pp->sched.flags, ERTS_PTS_FLG_BUSY_PORT_Q); else if (size < low) - erts_smp_atomic32_read_bor_relb(&pp->sched.flags, + erts_atomic32_read_bor_relb(&pp->sched.flags, ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q); } } @@ -774,56 +821,62 @@ erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp * No-suspend handles. */ -#ifdef ERTS_SMP static void free_port_task_handle_list(void *vpthlp) { erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp); } -#endif static void schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp) { -#ifdef ERTS_SMP erts_schedule_thr_prgr_later_cleanup_op(free_port_task_handle_list, (void *) pthlp, &pthlp->u.release, sizeof(ErtsPortTaskHandleList)); -#else - erts_free(ERTS_ALC_T_PT_HNDL_LIST, pthlp); -#endif } static ERTS_INLINE void -abort_nosuspend_task(Port *pp, - ErtsPortTaskType type, - ErtsPortTaskTypeData *tdp) +abort_signal_task(Port *pp, + int abort_type, + ErtsPortTaskType type, + ErtsPortTaskTypeData *tdp, + int bpq_data) { ASSERT(type == ERTS_PORT_TASK_PROC_SIG); - if (!pp->sched.taskq.bpq) + if (!bpq_data) tdp->psig.callback(NULL, ERTS_PORT_SFLG_INVALID, - ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND, + abort_type, &tdp->psig.data); else { ErlDrvSizeT size = erts_proc2port_sig_command_data_size(&tdp->psig.data); tdp->psig.callback(NULL, ERTS_PORT_SFLG_INVALID, - ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND, + abort_type, &tdp->psig.data); aborted_proc2port_data(pp, size); } } + +static ERTS_INLINE void +abort_nosuspend_task(Port *pp, + ErtsPortTaskType type, + ErtsPortTaskTypeData *tdp, + int bpq_data) +{ + abort_signal_task(pp, ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND, type, tdp, bpq_data); +} + static ErtsPortTaskHandleList * get_free_nosuspend_handles(Port *pp) { ErtsPortTaskHandleList *nshp, *last_nshp = NULL; - ERTS_SMP_LC_ASSERT(erts_port_task_sched_lock_is_locked(&pp->sched)); + ERTS_LC_ASSERT(erts_port_task_sched_lock_is_locked(&pp->sched)); nshp = pp->sched.taskq.local.busy.nosuspend; @@ -839,7 +892,7 @@ get_free_nosuspend_handles(Port *pp) pp->sched.taskq.local.busy.nosuspend = last_nshp->u.next; last_nshp->u.next = NULL; if (!pp->sched.taskq.local.busy.nosuspend) - erts_smp_atomic32_read_band_nob(&pp->sched.flags, + erts_atomic32_read_band_nob(&pp->sched.flags, ~ERTS_PTS_FLG_HAVE_NS_TASKS); } return nshp; @@ -862,7 +915,7 @@ free_nosuspend_handles(ErtsPortTaskHandleList *free_nshp) static ERTS_INLINE void enqueue_port(ErtsRunQueue *runq, Port *pp) { - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); + ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq)); pp->sched.next = NULL; if (runq->ports.end) { ASSERT(runq->ports.start); @@ -876,19 +929,17 @@ enqueue_port(ErtsRunQueue *runq, Port *pp) runq->ports.end = pp; ASSERT(runq->ports.start && runq->ports.end); - erts_smp_inc_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL); + erts_inc_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL); -#ifdef ERTS_SMP - if (runq->halt_in_progress) + if (ERTS_RUNQ_FLGS_GET_NOB(runq) & ERTS_RUNQ_FLG_HALTING) erts_non_empty_runq(runq); -#endif } static ERTS_INLINE Port * pop_port(ErtsRunQueue *runq) { Port *pp = runq->ports.start; - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); + ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq)); if (!pp) { ASSERT(!runq->ports.end); } @@ -898,7 +949,7 @@ pop_port(ErtsRunQueue *runq) ASSERT(runq->ports.end == pp); runq->ports.end = NULL; } - erts_smp_dec_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL); + erts_dec_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL); } ASSERT(runq->ports.start || !runq->ports.end); @@ -925,7 +976,7 @@ enqueue_task(Port *pp, if (ns_pthlp) fail_flags |= ERTS_PTS_FLG_BUSY_PORT; erts_port_task_sched_lock(&pp->sched); - flags = erts_smp_atomic32_read_nob(&pp->sched.flags); + flags = erts_atomic32_read_nob(&pp->sched.flags); if (flags & fail_flags) res = 0; else { @@ -956,7 +1007,7 @@ enqueue_task(Port *pp, static ERTS_INLINE void prepare_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) { - erts_aint32_t act = erts_smp_atomic32_read_nob(&pp->sched.flags); + erts_aint32_t act = erts_atomic32_read_nob(&pp->sched.flags); if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY_PORT)) { *execqp = pp->sched.taskq.local.first; @@ -977,7 +1028,7 @@ prepare_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) new &= ~ERTS_PTS_FLG_IN_RUNQ; new |= ERTS_PTS_FLG_EXEC; - act = erts_smp_atomic32_cmpxchg_nob(&pp->sched.flags, new, exp); + act = erts_atomic32_cmpxchg_nob(&pp->sched.flags, new, exp); ASSERT(act & ERTS_PTS_FLG_IN_RUNQ); @@ -991,6 +1042,7 @@ static ERTS_INLINE int finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q) { erts_aint32_t act; + unsigned int prof_runnable_ports; if (!processing_busy_q) pp->sched.taskq.local.first = *execq; @@ -1003,10 +1055,14 @@ finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q) *execq = NULL; - act = erts_smp_atomic32_read_nob(&pp->sched.flags); + act = erts_atomic32_read_nob(&pp->sched.flags); if (act & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) act = check_unset_busy_port_q(pp, act, pp->sched.taskq.bpq); + prof_runnable_ports = erts_system_profile_flags.runnable_ports; + if (prof_runnable_ports) + erts_port_task_sched_lock(&pp->sched); + while (1) { erts_aint32_t new, exp; @@ -1016,21 +1072,33 @@ finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q) if (act & ERTS_PTS_FLG_HAVE_TASKS) new |= ERTS_PTS_FLG_IN_RUNQ; - act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp); + act = erts_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp); - ASSERT(!(act & ERTS_PTS_FLG_IN_RUNQ)); + ERTS_LC_ASSERT(!(act & ERTS_PTS_FLG_IN_RUNQ)); + ERTS_LC_ASSERT(!(act & ERTS_PTS_FLG_EXEC_IMM)); if (exp == act) break; } + if (prof_runnable_ports | IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) { + /* trace port scheduling, out */ + if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) + trace_sched_ports(pp, am_out); + if (prof_runnable_ports) { + if (!(act & (ERTS_PTS_FLG_EXEC_IMM|ERTS_PTS_FLG_HAVE_TASKS))) + profile_runnable_port(pp, am_inactive); + erts_port_task_sched_unlock(&pp->sched); + } + } + return (act & ERTS_PTS_FLG_HAVE_TASKS) != 0; } static ERTS_INLINE erts_aint32_t select_queue_for_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) { - erts_aint32_t flags = erts_smp_atomic32_read_nob(&pp->sched.flags); + erts_aint32_t flags = erts_atomic32_read_nob(&pp->sched.flags); if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) flags = check_unset_busy_port_q(pp, flags, pp->sched.taskq.bpq); @@ -1140,7 +1208,7 @@ fetch_in_queue(Port *pp, ErtsPortTask **execqp) if (ptp) *execqp = ptp->u.alive.next; else - erts_smp_atomic32_read_band_nob(&pp->sched.flags, + erts_atomic32_read_band_nob(&pp->sched.flags, ~ERTS_PTS_FLG_HAVE_TASKS); @@ -1203,7 +1271,7 @@ erl_drv_consume_timeslice(ErlDrvPort dprt, int percent) void erts_port_task_tmp_handle_detach(ErtsPortTaskHandle *pthp) { - ERTS_SMP_LC_ASSERT(erts_thr_progress_lc_is_delaying()); + ERTS_LC_ASSERT(erts_thr_progress_lc_is_delaying()); reset_port_task_handle(pthp); } @@ -1216,9 +1284,7 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp) { int res; ErtsPortTask *ptp; -#ifdef ERTS_SMP ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay(); -#endif ptp = handle2task(pthp); if (!ptp) @@ -1228,41 +1294,25 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp) #ifdef DEBUG ErtsPortTaskHandle *saved_pthp = ptp->u.alive.handle; - ERTS_SMP_READ_MEMORY_BARRIER; - old_state = erts_smp_atomic32_read_nob(&ptp->state); + ERTS_THR_READ_MEMORY_BARRIER; + old_state = erts_atomic32_read_nob(&ptp->state); if (old_state == ERTS_PT_STATE_SCHEDULED) { ASSERT(!saved_pthp || saved_pthp == pthp); } #endif - old_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state, + old_state = erts_atomic32_cmpxchg_nob(&ptp->state, ERTS_PT_STATE_ABORTED, ERTS_PT_STATE_SCHEDULED); if (old_state != ERTS_PT_STATE_SCHEDULED) res = - 1; /* Task already aborted, executing, or executed */ else { - reset_port_task_handle(pthp); - - switch (ptp->type) { - case ERTS_PORT_TASK_INPUT: - case ERTS_PORT_TASK_OUTPUT: - case ERTS_PORT_TASK_EVENT: - ASSERT(erts_smp_atomic_read_nob( - &erts_port_task_outstanding_io_tasks) > 0); - erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks); - break; - default: - break; - } - res = 0; } } -#ifdef ERTS_SMP erts_thr_progress_unmanaged_continue(dhndl); -#endif return res; } @@ -1271,12 +1321,10 @@ void erts_port_task_abort_nosuspend_tasks(Port *pp) { ErtsPortTaskHandleList *abort_list; -#ifdef ERTS_SMP ErtsThrPrgrDelayHandle dhndl = ERTS_THR_PRGR_DHANDLE_INVALID; -#endif erts_port_task_sched_lock(&pp->sched); - erts_smp_atomic32_read_band_nob(&pp->sched.flags, + erts_atomic32_read_band_nob(&pp->sched.flags, ~ERTS_PTS_FLG_HAVE_NS_TASKS); abort_list = pp->sched.taskq.local.busy.nosuspend; pp->sched.taskq.local.busy.nosuspend = NULL; @@ -1296,40 +1344,34 @@ erts_port_task_abort_nosuspend_tasks(Port *pp) pthlp = abort_list; abort_list = pthlp->u.next; -#ifdef ERTS_SMP if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) dhndl = erts_thr_progress_unmanaged_delay(); -#endif pthp = &pthlp->handle; ptp = handle2task(pthp); if (!ptp) { -#ifdef ERTS_SMP if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) erts_thr_progress_unmanaged_continue(dhndl); -#endif schedule_port_task_handle_list_free(pthlp); continue; } #ifdef DEBUG saved_pthp = ptp->u.alive.handle; - ERTS_SMP_READ_MEMORY_BARRIER; - old_state = erts_smp_atomic32_read_nob(&ptp->state); + ERTS_THR_READ_MEMORY_BARRIER; + old_state = erts_atomic32_read_nob(&ptp->state); if (old_state == ERTS_PT_STATE_SCHEDULED) { ASSERT(saved_pthp == pthp); } #endif - old_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state, + old_state = erts_atomic32_cmpxchg_nob(&ptp->state, ERTS_PT_STATE_ABORTED, ERTS_PT_STATE_SCHEDULED); if (old_state != ERTS_PT_STATE_SCHEDULED) { /* Task already aborted, executing, or executed */ -#ifdef ERTS_SMP if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) erts_thr_progress_unmanaged_continue(dhndl); -#endif schedule_port_task_handle_list_free(pthlp); continue; } @@ -1339,13 +1381,11 @@ erts_port_task_abort_nosuspend_tasks(Port *pp) type = ptp->type; td = ptp->u.alive.td; -#ifdef ERTS_SMP if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) erts_thr_progress_unmanaged_continue(dhndl); -#endif schedule_port_task_handle_list_free(pthlp); - abort_nosuspend_task(pp, type, &td); + abort_nosuspend_task(pp, type, &td, pp->sched.taskq.bpq != NULL); } } @@ -1361,46 +1401,38 @@ erts_port_task_schedule(Eterm id, { ErtsProc2PortSigData *sigdp = NULL; ErtsPortTaskHandleList *ns_pthlp = NULL; -#ifdef ERTS_SMP ErtsRunQueue *xrunq; ErtsThrPrgrDelayHandle dhndl; -#endif ErtsRunQueue *runq; Port *pp; ErtsPortTask *ptp = NULL; erts_aint32_t act, add_flags; + unsigned int prof_runnable_ports; - if (pthp && erts_port_task_is_scheduled(pthp)) { - ASSERT(0); - erts_port_task_abort(pthp); - } + ERTS_LC_ASSERT(!pthp || !erts_port_task_is_scheduled(pthp)); ASSERT(is_internal_port(id)); -#ifdef ERTS_SMP dhndl = erts_thr_progress_unmanaged_delay(); -#endif pp = erts_port_lookup_raw(id); -#ifdef ERTS_SMP if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) { if (pp) erts_port_inc_refc(pp); erts_thr_progress_unmanaged_continue(dhndl); } -#endif - - if (!pp) - goto fail; if (type != ERTS_PORT_TASK_PROC_SIG) { + if (!pp) + goto fail; + ptp = port_task_alloc(); ptp->type = type; ptp->u.alive.flags = 0; - erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED); + erts_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED); set_handle(ptp, pthp); } @@ -1412,16 +1444,6 @@ erts_port_task_schedule(Eterm id, va_start(argp, type); ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent); va_end(argp); - erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks); - break; - } - case ERTS_PORT_TASK_EVENT: { - va_list argp; - va_start(argp, type); - ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent); - ptp->u.alive.td.io.event_data = va_arg(argp, ErlDrvEventData); - va_end(argp); - erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks); break; } case ERTS_PORT_TASK_PROC_SIG: { @@ -1432,6 +1454,9 @@ erts_port_task_schedule(Eterm id, ptp->u.alive.td.psig.callback = va_arg(argp, ErtsProc2PortSigCallback); ptp->u.alive.flags |= va_arg(argp, int); va_end(argp); + if (!pp) + goto fail; + if (!(ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND)) set_tmp_handle(ptp, pthp); else { @@ -1457,6 +1482,10 @@ erts_port_task_schedule(Eterm id, if (ns_pthlp) add_flags |= ERTS_PTS_FLG_HAVE_NS_TASKS; + prof_runnable_ports = erts_system_profile_flags.runnable_ports; + if (prof_runnable_ports) + erts_port_task_sched_lock(&pp->sched); + while (1) { erts_aint32_t new, exp; @@ -1469,7 +1498,7 @@ erts_port_task_schedule(Eterm id, if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) new |= ERTS_PTS_FLG_IN_RUNQ; - act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp); + act = erts_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp); if (exp == act) { if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) @@ -1481,72 +1510,76 @@ erts_port_task_schedule(Eterm id, goto done; /* Died after our task insert... */ } + if (prof_runnable_ports) { + if (!(act & ERTS_PTS_FLG_EXEC_IMM)) + profile_runnable_port(pp, am_active); + erts_port_task_sched_unlock(&pp->sched); + prof_runnable_ports = 0; + } + /* Enqueue port on run-queue */ runq = erts_port_runq(pp); if (!runq) ERTS_INTERNAL_ERROR("Missing run-queue"); -#ifdef ERTS_SMP xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL); + ERTS_LC_ASSERT(runq != xrunq); + ERTS_LC_VERIFY_RQ(runq, pp); if (xrunq) { - /* Port emigrated ... */ - erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq); - erts_smp_runq_unlock(runq); + /* Emigrate port ... */ + erts_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq); + erts_runq_unlock(runq); runq = erts_port_runq(pp); if (!runq) ERTS_INTERNAL_ERROR("Missing run-queue"); } -#endif enqueue_port(runq, pp); - - if (erts_system_profile_flags.runnable_ports) { - profile_runnable_port(pp, am_active); - } - erts_smp_runq_unlock(runq); + erts_runq_unlock(runq); - erts_smp_notify_inc_runq(runq); + erts_notify_inc_runq(runq); done: -#ifdef ERTS_SMP + if (prof_runnable_ports) + erts_port_task_sched_unlock(&pp->sched); + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) erts_port_dec_refc(pp); -#endif return 0; abort_nosuspend: -#ifdef ERTS_SMP if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) erts_port_dec_refc(pp); -#endif - abort_nosuspend_task(pp, ptp->type, &ptp->u.alive.td); + abort_nosuspend_task(pp, ptp->type, &ptp->u.alive.td, 0); ASSERT(ns_pthlp); erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp); - if (ptp) - port_task_free(ptp); + + ASSERT(ptp); + port_task_free(ptp); return 0; fail: -#ifdef ERTS_SMP if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) erts_port_dec_refc(pp); -#endif + + if (ptp) { + abort_signal_task(pp, ERTS_PROC2PORT_SIG_ABORT, + ptp->type, &ptp->u.alive.td, 0); + port_task_free(ptp); + } if (ns_pthlp) erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp); - if (ptp) - port_task_free(ptp); - return -1; } @@ -1556,14 +1589,14 @@ erts_port_task_free_port(Port *pp) erts_aint32_t flags; ErtsRunQueue *runq; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); + ERTS_LC_ASSERT(erts_lc_is_port_locked(pp)); ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)); runq = erts_port_runq(pp); if (!runq) ERTS_INTERNAL_ERROR("Missing run-queue"); erts_port_task_sched_lock(&pp->sched); - flags = erts_smp_atomic32_read_bor_relb(&pp->sched.flags, + flags = erts_atomic32_read_bor_relb(&pp->sched.flags, ERTS_PTS_FLG_EXIT); erts_port_task_sched_unlock(&pp->sched); erts_atomic32_read_bset_relb(&pp->state, @@ -1573,7 +1606,7 @@ erts_port_task_free_port(Port *pp) | ERTS_PORT_SFLG_FREE), ERTS_PORT_SFLG_FREE); - erts_smp_runq_unlock(runq); + erts_runq_unlock(runq); if (!(flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC))) begin_port_cleanup(pp, NULL, NULL); @@ -1586,13 +1619,12 @@ erts_port_task_free_port(Port *pp) * scheduling of processes. Run-queue lock should be held by caller. */ -int +void erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) { Port *pp; ErtsPortTask *execq; int processing_busy_q; - int res = 0; int vreds = 0; int reds = 0; erts_aint_t io_tasks_executed = 0; @@ -1600,37 +1632,39 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) erts_aint32_t state; int active; Uint64 start_time = 0; + ErtsSchedulerData *esdp = runq->scheduler; + ERTS_MSACC_PUSH_STATE_M(); - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); + ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq)); pp = pop_port(runq); if (!pp) { - res = 0; goto done; } - erts_smp_runq_unlock(runq); + ERTS_LC_VERIFY_RQ(runq, pp); + + erts_runq_unlock(runq); *curr_port_pp = pp; if (erts_sched_stat.enabled) { - ErtsSchedulerData *esdp = erts_get_scheduler_data(); Uint old = ERTS_PORT_SCHED_ID(pp, esdp->no); int migrated = old && old != esdp->no; - erts_smp_spin_lock(&erts_sched_stat.lock); + erts_spin_lock(&erts_sched_stat.lock); erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_executed++; erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].executed++; if (migrated) { erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_migrated++; erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].migrated++; } - erts_smp_spin_unlock(&erts_sched_stat.lock); + erts_spin_unlock(&erts_sched_stat.lock); } prepare_exec(pp, &execq, &processing_busy_q); - erts_smp_port_lock(pp); + erts_port_lock(pp); /* trace port scheduling, in */ if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) { @@ -1641,6 +1675,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) state = erts_atomic32_read_nob(&pp->state); pp->reds = ERTS_PORT_REDS_EXECUTE; + ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT); goto begin_handle_tasks; while (1) { @@ -1651,7 +1686,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) if (!ptp) break; - task_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state, + task_state = erts_atomic32_cmpxchg_nob(&ptp->state, ERTS_PT_STATE_EXECUTING, ERTS_PT_STATE_SCHEDULED); if (task_state != ERTS_PT_STATE_SCHEDULED) { @@ -1659,53 +1694,56 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) goto aborted_port_task; } - reset_handle(ptp); - if (erts_system_monitor_long_schedule != 0) { start_time = erts_timestamp_millis(); } - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); - ERTS_SMP_CHK_NO_PROC_LOCKS; + ERTS_LC_ASSERT(erts_lc_is_port_locked(pp)); + ERTS_CHK_NO_PROC_LOCKS; ASSERT(pp->drv_ptr); switch (ptp->type) { case ERTS_PORT_TASK_TIMEOUT: - reds = ERTS_PORT_REDS_TIMEOUT; - if (!(state & ERTS_PORT_SFLGS_DEAD)) { - DTRACE_DRIVER(driver_timeout, pp); - (*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data); - } + reset_handle(ptp); + if (!ERTS_PTMR_IS_TIMED_OUT(pp)) + reds = 0; + else { + ERTS_PTMR_CLEAR(pp); + reds = ERTS_PORT_REDS_TIMEOUT; + if (!(state & ERTS_PORT_SFLGS_DEAD)) { + DTRACE_DRIVER(driver_timeout, pp); + LTTNG_DRIVER(driver_timeout, pp); + if (IS_TRACED_FL(pp, F_TRACE_RECEIVE)) + trace_port(pp, am_receive, am_timeout); + (*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data); + } + } break; case ERTS_PORT_TASK_INPUT: reds = ERTS_PORT_REDS_INPUT; ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_ready_input, pp); - /* NOTE some windows/ose drivers use ->ready_input + LTTNG_DRIVER(driver_ready_input, pp); + /* NOTE some windows drivers use ->ready_input for input and output */ (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data, ptp->u.alive.td.io.event); + reset_executed_io_task_handle(ptp); io_tasks_executed++; break; case ERTS_PORT_TASK_OUTPUT: reds = ERTS_PORT_REDS_OUTPUT; ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); DTRACE_DRIVER(driver_ready_output, pp); + LTTNG_DRIVER(driver_ready_output, pp); (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data, ptp->u.alive.td.io.event); - io_tasks_executed++; - break; - case ERTS_PORT_TASK_EVENT: - reds = ERTS_PORT_REDS_EVENT; - ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); - DTRACE_DRIVER(driver_event, pp); - (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data, - ptp->u.alive.td.io.event, - ptp->u.alive.td.io.event_data); + reset_executed_io_task_handle(ptp); io_tasks_executed++; break; case ERTS_PORT_TASK_PROC_SIG: { ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data; + reset_handle(ptp); ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0); if (!pp->sched.taskq.bpq) reds = ptp->u.alive.td.psig.callback(pp, @@ -1723,10 +1761,11 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) break; } case ERTS_PORT_TASK_DIST_CMD: + reset_handle(ptp); reds = erts_dist_command(pp, CONTEXT_REDS - pp->reds); break; default: - erl_exit(ERTS_ABORT_EXIT, + erts_exit(ERTS_ABORT_EXIT, "Invalid port task type: %d\n", (int) ptp->type); break; @@ -1764,22 +1803,9 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) } erts_unblock_fpe(fpe_was_unmasked); + ERTS_MSACC_POP_STATE_M(); - /* trace port scheduling, out */ - if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) { - trace_sched_ports(pp, am_out); - } - - if (io_tasks_executed) { - ASSERT(erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) - >= io_tasks_executed); - erts_smp_atomic_add_relb(&erts_port_task_outstanding_io_tasks, - -1*io_tasks_executed); - } - -#ifdef ERTS_SMP - ASSERT(runq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue)); -#endif + ASSERT(runq == (ErtsRunQueue *) erts_atomic_read_nob(&pp->run_queue)); active = finalize_exec(pp, &execq, processing_busy_q); @@ -1789,56 +1815,43 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp) *curr_port_pp = NULL; - erts_smp_runq_lock(runq); + erts_runq_lock(runq); - if (!active) { - if (erts_system_profile_flags.runnable_ports) - profile_runnable_port(pp, am_inactive); - } - else { -#ifdef ERTS_SMP + if (active) { ErtsRunQueue *xrunq; -#endif ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD)); -#ifdef ERTS_SMP xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL); + ERTS_LC_ASSERT(runq != xrunq); + ERTS_LC_VERIFY_RQ(runq, pp); if (!xrunq) { -#endif enqueue_port(runq, pp); /* No need to notify ourselves about inc in runq. */ -#ifdef ERTS_SMP } else { - /* Port emigrated ... */ - erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq); - erts_smp_runq_unlock(runq); + /* Emigrate port... */ + erts_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq); + erts_runq_unlock(runq); xrunq = erts_port_runq(pp); ASSERT(xrunq); enqueue_port(xrunq, pp); - erts_smp_runq_unlock(xrunq); - erts_smp_notify_inc_runq(xrunq); + erts_runq_unlock(xrunq); + erts_notify_inc_runq(xrunq); - erts_smp_runq_lock(runq); + erts_runq_lock(runq); } -#endif } done: - res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks) - != (erts_aint_t) 0); runq->scheduler->reductions += reds; - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); - ERTS_PORT_REDUCTIONS_EXECUTED(runq, reds); - - return res; + ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq)); + ERTS_PORT_REDUCTIONS_EXECUTED(esdp, runq, reds); } -#ifdef ERTS_SMP static void release_port(void *vport) { @@ -1854,7 +1867,6 @@ schedule_release_port(void *vport) { &pp->common.u.release); } -#endif static void begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) @@ -1865,7 +1877,7 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) ErtsPortTaskHandleList *free_nshp = NULL; ErtsProcList *plp; - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp)); + ERTS_LC_ASSERT(erts_lc_is_port_locked(pp)); /* * Abort remaining tasks... @@ -1938,11 +1950,11 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) qs[i] = ptp->u.alive.next; /* Normal case here is aborted tasks... */ - state = erts_smp_atomic32_read_nob(&ptp->state); + state = erts_atomic32_read_nob(&ptp->state); if (state == ERTS_PT_STATE_ABORTED) goto aborted_port_task; - state = erts_smp_atomic32_cmpxchg_nob(&ptp->state, + state = erts_atomic32_cmpxchg_nob(&ptp->state, ERTS_PT_STATE_EXECUTING, ERTS_PT_STATE_SCHEDULED); if (state != ERTS_PT_STATE_SCHEDULED) { @@ -1969,13 +1981,6 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) DO_WRITE, 1); break; - case ERTS_PORT_TASK_EVENT: - erts_stale_drv_select(pp->common.id, - ERTS_Port2ErlDrvPort(pp), - ptp->u.alive.td.io.event, - 0, - 1); - break; case ERTS_PORT_TASK_DIST_CMD: break; case ERTS_PORT_TASK_PROC_SIG: { @@ -1996,7 +2001,7 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) break; } default: - erl_exit(ERTS_ABORT_EXIT, + erts_exit(ERTS_ABORT_EXIT, "Invalid port task type: %d\n", (int) ptp->type); } @@ -2006,7 +2011,7 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) } } - erts_smp_atomic32_read_band_nob(&pp->sched.flags, + erts_atomic32_read_band_nob(&pp->sched.flags, ~(ERTS_PTS_FLG_HAVE_BUSY_TASKS |ERTS_PTS_FLG_HAVE_TASKS |ERTS_PTS_FLGS_BUSY)); @@ -2048,7 +2053,6 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) /* * Schedule cleanup of port structure... */ -#ifdef ERTS_SMP /* We might not be a scheduler, eg. traceing to port we are sys_msg_dispatcher */ if (!erts_get_scheduler_data()) { erts_schedule_misc_aux_work(1, schedule_release_port, (void*)pp); @@ -2058,26 +2062,15 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p) (void *) pp, &pp->common.u.release); } -#else - pp->cleanup = 1; -#endif -} - -int -erts_port_is_scheduled(Port *pp) -{ - erts_aint32_t flags = erts_smp_atomic32_read_acqb(&pp->sched.flags); - return (flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)) != 0; } -#ifdef ERTS_SMP void erts_enqueue_port(ErtsRunQueue *rq, Port *pp) { - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); - ASSERT(rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue)); - ASSERT(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_IN_RUNQ); + ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq)); + ASSERT(rq == (ErtsRunQueue *) erts_atomic_read_nob(&pp->run_queue)); + ASSERT(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_IN_RUNQ); enqueue_port(rq, pp); } @@ -2085,16 +2078,15 @@ Port * erts_dequeue_port(ErtsRunQueue *rq) { Port *pp; - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq)); + ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq)); pp = pop_port(rq); ASSERT(!pp - || rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue)); - ASSERT(!pp || (erts_smp_atomic32_read_nob(&pp->sched.flags) + || rq == (ErtsRunQueue *) erts_atomic_read_nob(&pp->run_queue)); + ASSERT(!pp || (erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_IN_RUNQ)); return pp; } -#endif /* * Initialize the module. @@ -2102,8 +2094,7 @@ erts_dequeue_port(ErtsRunQueue *rq) void erts_port_task_init(void) { - erts_smp_atomic_init_nob(&erts_port_task_outstanding_io_tasks, - (erts_aint_t) 0); - init_port_task_alloc(); + init_port_task_alloc(erts_no_schedulers + erts_no_poll_threads + + 1); /* aux_thread */ init_busy_caller_table_alloc(); } |