aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_port_task.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_port_task.c')
-rw-r--r--erts/emulator/beam/erl_port_task.c349
1 files changed, 111 insertions, 238 deletions
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index 4d7a86398a..4a3671df0c 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -36,6 +36,7 @@
#include "erl_check_io.h"
#include "dtrace-wrapper.h"
#include "lttng-wrapper.h"
+#include "erl_check_io.h"
#include <stdarg.h>
/*
@@ -83,15 +84,12 @@ static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_q
#define LTTNG_DRIVER(TRACEPOINT, PP) do {} while(0)
#endif
-#define ERTS_SMP_LC_VERIFY_RQ(RQ, PP) \
- do { \
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq)); \
- ERTS_SMP_LC_ASSERT((RQ) == ((ErtsRunQueue *) \
- erts_smp_atomic_read_nob(&(PP)->run_queue))); \
+#define ERTS_LC_VERIFY_RQ(RQ, PP) \
+ do { \
+ ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq)); \
+ ERTS_LC_ASSERT((RQ) == erts_get_runq_port((PP))); \
} while (0)
-erts_smp_atomic_t erts_port_task_outstanding_io_tasks;
-
#define ERTS_PT_STATE_SCHEDULED 0
#define ERTS_PT_STATE_ABORTED 1
#define ERTS_PT_STATE_EXECUTING 2
@@ -99,7 +97,6 @@ erts_smp_atomic_t erts_port_task_outstanding_io_tasks;
typedef union {
struct { /* I/O tasks */
ErlDrvEvent event;
- ErlDrvEventData event_data;
} io;
struct {
ErtsProc2PortSigCallback callback;
@@ -108,7 +105,7 @@ typedef union {
} ErtsPortTaskTypeData;
struct ErtsPortTask_ {
- erts_smp_atomic32_t state;
+ erts_atomic32_t state;
ErtsPortTaskType type;
union {
struct {
@@ -126,9 +123,7 @@ struct ErtsPortTaskHandleList_ {
ErtsPortTaskHandle handle;
union {
ErtsPortTaskHandleList *next;
-#ifdef ERTS_SMP
ErtsThrPrgrLaterOp release;
-#endif
} u;
};
@@ -151,35 +146,29 @@ static void begin_port_cleanup(Port *pp,
ErtsPortTask **execq,
int *processing_busy_q_p);
-ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(port_task,
- ErtsPortTask,
- 1000,
- ERTS_ALC_T_PORT_TASK)
+ERTS_THR_PREF_QUICK_ALLOC_IMPL(port_task,
+ ErtsPortTask,
+ 1000,
+ ERTS_ALC_T_PORT_TASK)
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(busy_caller_table,
ErtsPortTaskBusyCallerTable,
50,
ERTS_ALC_T_BUSY_CALLER_TAB)
-#ifdef ERTS_SMP
static void
call_port_task_free(void *vptp)
{
port_task_free((ErtsPortTask *) vptp);
}
-#endif
static ERTS_INLINE void
schedule_port_task_free(ErtsPortTask *ptp)
{
-#ifdef ERTS_SMP
erts_schedule_thr_prgr_later_cleanup_op(call_port_task_free,
(void *) ptp,
&ptp->u.release,
sizeof(ErtsPortTask));
-#else
- port_task_free(ptp);
-#endif
}
static ERTS_INLINE ErtsPortTask *
@@ -199,7 +188,7 @@ p2p_sig_data_init(ErtsPortTask *ptp)
ptp->type = ERTS_PORT_TASK_PROC_SIG;
ptp->u.alive.flags = ERTS_PT_FLG_SIG_DEP;
- erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);
+ erts_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);
ASSERT(ptp == p2p_sig_data_to_task(&ptp->u.alive.td.psig.data));
@@ -290,7 +279,7 @@ popped_from_busy_queue(Port *pp, ErtsPortTask *ptp, int last)
#ifdef DEBUG
erts_aint32_t flags =
#endif
- erts_smp_atomic32_read_band_nob(
+ erts_atomic32_read_band_nob(
&pp->sched.flags,
~ERTS_PTS_FLG_HAVE_BUSY_TASKS);
ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
@@ -337,7 +326,7 @@ busy_wait_move_to_busy_queue(Port *pp, ErtsPortTask *ptp)
#ifdef DEBUG
flags =
#endif
- erts_smp_atomic32_read_bor_nob(&pp->sched.flags,
+ erts_atomic32_read_bor_nob(&pp->sched.flags,
ERTS_PTS_FLG_HAVE_BUSY_TASKS);
ASSERT(!(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS));
@@ -477,7 +466,7 @@ no_sig_dep_move_from_busyq(Port *pp)
int bix;
erts_aint32_t flags =
#endif
- erts_smp_atomic32_read_band_nob(
+ erts_atomic32_read_band_nob(
&pp->sched.flags,
~ERTS_PTS_FLG_HAVE_BUSY_TASKS);
ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
@@ -510,11 +499,11 @@ chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue)
if (!first) {
ASSERT(!tabp);
ASSERT(!pp->sched.taskq.local.busy.last);
- ASSERT(!(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS));
+ ASSERT(!(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS));
return;
}
- ASSERT(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
+ ASSERT(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
ASSERT(tabp);
tot_count = 0;
@@ -570,13 +559,13 @@ chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue)
static ERTS_INLINE void
reset_port_task_handle(ErtsPortTaskHandle *pthp)
{
- erts_smp_atomic_set_relb(pthp, (erts_aint_t) NULL);
+ erts_atomic_set_relb(pthp, (erts_aint_t) NULL);
}
static ERTS_INLINE ErtsPortTask *
handle2task(ErtsPortTaskHandle *pthp)
{
- return (ErtsPortTask *) erts_smp_atomic_read_acqb(pthp);
+ return (ErtsPortTask *) erts_atomic_read_acqb(pthp);
}
static ERTS_INLINE void
@@ -593,8 +582,9 @@ reset_executed_io_task_handle(ErtsPortTask *ptp)
{
if (ptp->u.alive.handle) {
ASSERT(ptp == handle2task(ptp->u.alive.handle));
- erts_io_notify_port_task_executed(ptp->u.alive.handle);
- reset_port_task_handle(ptp->u.alive.handle);
+ /* The port task handle is reset inside task_executed */
+ erts_io_notify_port_task_executed(ptp->type, ptp->u.alive.handle,
+ reset_port_task_handle);
}
}
@@ -603,7 +593,7 @@ set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
{
ptp->u.alive.handle = pthp;
if (pthp) {
- erts_smp_atomic_set_relb(pthp, (erts_aint_t) ptp);
+ erts_atomic_set_relb(pthp, (erts_aint_t) ptp);
ASSERT(ptp == handle2task(ptp->u.alive.handle));
}
}
@@ -617,7 +607,7 @@ set_tmp_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
* IMPORTANT! Task either need to be aborted, or task handle
* need to be detached before thread progress has been made.
*/
- erts_smp_atomic_set_relb(pthp, (erts_aint_t) ptp);
+ erts_atomic_set_relb(pthp, (erts_aint_t) ptp);
}
}
@@ -635,20 +625,20 @@ check_unset_busy_port_q(Port *pp,
int resume_procs = 0;
ASSERT(bpq);
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
erts_port_task_sched_lock(&pp->sched);
- qsize = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size);
- low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low);
+ qsize = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->size);
+ low = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low);
if (qsize < low) {
erts_aint32_t mask = ~(ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
| ERTS_PTS_FLG_BUSY_PORT_Q);
- flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags, mask);
+ flags = erts_atomic32_read_band_relb(&pp->sched.flags, mask);
if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
resume_procs = 1;
}
else if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) {
- flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags,
+ flags = erts_atomic32_read_band_relb(&pp->sched.flags,
~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
flags &= ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
}
@@ -673,16 +663,16 @@ aborted_proc2port_data(Port *pp, ErlDrvSizeT size)
bpq = pp->sched.taskq.bpq;
- qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size,
+ qsz = (ErlDrvSizeT) erts_atomic_add_read_acqb(&bpq->size,
(erts_aint_t) -size);
ASSERT(qsz + size > qsz);
- flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ flags = erts_atomic32_read_nob(&pp->sched.flags);
ASSERT(pp->sched.taskq.bpq);
if ((flags & (ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
| ERTS_PTS_FLG_BUSY_PORT_Q)) != ERTS_PTS_FLG_BUSY_PORT_Q)
return;
- if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low))
- erts_smp_atomic32_read_bor_nob(&pp->sched.flags,
+ if (qsz < (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low))
+ erts_atomic32_read_bor_nob(&pp->sched.flags,
ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
}
@@ -700,13 +690,13 @@ dequeued_proc2port_data(Port *pp, ErlDrvSizeT size)
bpq = pp->sched.taskq.bpq;
- qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size,
+ qsz = (ErlDrvSizeT) erts_atomic_add_read_acqb(&bpq->size,
(erts_aint_t) -size);
ASSERT(qsz + size > qsz);
- flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ flags = erts_atomic32_read_nob(&pp->sched.flags);
if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q))
return;
- if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->low))
+ if (qsz < (ErlDrvSizeT) erts_atomic_read_acqb(&bpq->low))
check_unset_busy_port_q(pp, flags, bpq);
}
@@ -719,19 +709,19 @@ enqueue_proc2port_data(Port *pp,
if (sigdp && bpq) {
ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
if (size) {
- erts_aint_t asize = erts_smp_atomic_add_read_acqb(&bpq->size,
+ erts_aint_t asize = erts_atomic_add_read_acqb(&bpq->size,
(erts_aint_t) size);
ErlDrvSizeT qsz = (ErlDrvSizeT) asize;
ASSERT(qsz - size < qsz);
if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q) && qsz > bpq->high) {
- flags = erts_smp_atomic32_read_bor_acqb(&pp->sched.flags,
+ flags = erts_atomic32_read_bor_acqb(&pp->sched.flags,
ERTS_PTS_FLG_BUSY_PORT_Q);
flags |= ERTS_PTS_FLG_BUSY_PORT_Q;
- qsz = (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->size);
- if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low)) {
- flags = (erts_smp_atomic32_read_bor_relb(
+ qsz = (ErlDrvSizeT) erts_atomic_read_acqb(&bpq->size);
+ if (qsz < (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low)) {
+ flags = (erts_atomic32_read_bor_relb(
&pp->sched.flags,
ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q));
flags |= ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
@@ -779,18 +769,18 @@ erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp
erts_aint32_t flags;
pp->sched.taskq.bpq = NULL;
flags = ~(ERTS_PTS_FLG_BUSY_PORT_Q|ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
- flags = erts_smp_atomic32_read_band_acqb(&pp->sched.flags, flags);
+ flags = erts_atomic32_read_band_acqb(&pp->sched.flags, flags);
if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
resume_procs = 1;
}
else {
if (!low)
- low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low);
+ low = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low);
else {
if (bpq->high < low)
bpq->high = low;
- erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low);
+ erts_atomic_set_relb(&bpq->low, (erts_aint_t) low);
written = 1;
}
@@ -799,19 +789,19 @@ erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp
else {
if (low > high) {
low = high;
- erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low);
+ erts_atomic_set_relb(&bpq->low, (erts_aint_t) low);
}
bpq->high = high;
written = 1;
}
if (written) {
- ErlDrvSizeT size = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size);
+ ErlDrvSizeT size = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->size);
if (size > high)
- erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
+ erts_atomic32_read_bor_relb(&pp->sched.flags,
ERTS_PTS_FLG_BUSY_PORT_Q);
else if (size < low)
- erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
+ erts_atomic32_read_bor_relb(&pp->sched.flags,
ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
}
}
@@ -830,25 +820,19 @@ erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp
* No-suspend handles.
*/
-#ifdef ERTS_SMP
static void
free_port_task_handle_list(void *vpthlp)
{
erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp);
}
-#endif
static void
schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp)
{
-#ifdef ERTS_SMP
erts_schedule_thr_prgr_later_cleanup_op(free_port_task_handle_list,
(void *) pthlp,
&pthlp->u.release,
sizeof(ErtsPortTaskHandleList));
-#else
- erts_free(ERTS_ALC_T_PT_HNDL_LIST, pthlp);
-#endif
}
static ERTS_INLINE void
@@ -891,7 +875,7 @@ get_free_nosuspend_handles(Port *pp)
{
ErtsPortTaskHandleList *nshp, *last_nshp = NULL;
- ERTS_SMP_LC_ASSERT(erts_port_task_sched_lock_is_locked(&pp->sched));
+ ERTS_LC_ASSERT(erts_port_task_sched_lock_is_locked(&pp->sched));
nshp = pp->sched.taskq.local.busy.nosuspend;
@@ -907,7 +891,7 @@ get_free_nosuspend_handles(Port *pp)
pp->sched.taskq.local.busy.nosuspend = last_nshp->u.next;
last_nshp->u.next = NULL;
if (!pp->sched.taskq.local.busy.nosuspend)
- erts_smp_atomic32_read_band_nob(&pp->sched.flags,
+ erts_atomic32_read_band_nob(&pp->sched.flags,
~ERTS_PTS_FLG_HAVE_NS_TASKS);
}
return nshp;
@@ -930,7 +914,7 @@ free_nosuspend_handles(ErtsPortTaskHandleList *free_nshp)
static ERTS_INLINE void
enqueue_port(ErtsRunQueue *runq, Port *pp)
{
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
+ ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
pp->sched.next = NULL;
if (runq->ports.end) {
ASSERT(runq->ports.start);
@@ -944,19 +928,17 @@ enqueue_port(ErtsRunQueue *runq, Port *pp)
runq->ports.end = pp;
ASSERT(runq->ports.start && runq->ports.end);
- erts_smp_inc_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL);
+ erts_inc_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL);
-#ifdef ERTS_SMP
if (ERTS_RUNQ_FLGS_GET_NOB(runq) & ERTS_RUNQ_FLG_HALTING)
erts_non_empty_runq(runq);
-#endif
}
static ERTS_INLINE Port *
pop_port(ErtsRunQueue *runq)
{
Port *pp = runq->ports.start;
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
+ ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
if (!pp) {
ASSERT(!runq->ports.end);
}
@@ -966,7 +948,7 @@ pop_port(ErtsRunQueue *runq)
ASSERT(runq->ports.end == pp);
runq->ports.end = NULL;
}
- erts_smp_dec_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL);
+ erts_dec_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL);
}
ASSERT(runq->ports.start || !runq->ports.end);
@@ -993,7 +975,7 @@ enqueue_task(Port *pp,
if (ns_pthlp)
fail_flags |= ERTS_PTS_FLG_BUSY_PORT;
erts_port_task_sched_lock(&pp->sched);
- flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ flags = erts_atomic32_read_nob(&pp->sched.flags);
if (flags & fail_flags)
res = 0;
else {
@@ -1024,7 +1006,7 @@ enqueue_task(Port *pp,
static ERTS_INLINE void
prepare_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
{
- erts_aint32_t act = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ erts_aint32_t act = erts_atomic32_read_nob(&pp->sched.flags);
if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY_PORT)) {
*execqp = pp->sched.taskq.local.first;
@@ -1045,7 +1027,7 @@ prepare_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
new &= ~ERTS_PTS_FLG_IN_RUNQ;
new |= ERTS_PTS_FLG_EXEC;
- act = erts_smp_atomic32_cmpxchg_nob(&pp->sched.flags, new, exp);
+ act = erts_atomic32_cmpxchg_nob(&pp->sched.flags, new, exp);
ASSERT(act & ERTS_PTS_FLG_IN_RUNQ);
@@ -1072,7 +1054,7 @@ finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q)
*execq = NULL;
- act = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ act = erts_atomic32_read_nob(&pp->sched.flags);
if (act & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
act = check_unset_busy_port_q(pp, act, pp->sched.taskq.bpq);
@@ -1089,7 +1071,7 @@ finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q)
if (act & ERTS_PTS_FLG_HAVE_TASKS)
new |= ERTS_PTS_FLG_IN_RUNQ;
- act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
+ act = erts_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
ERTS_LC_ASSERT(!(act & ERTS_PTS_FLG_IN_RUNQ));
ERTS_LC_ASSERT(!(act & ERTS_PTS_FLG_EXEC_IMM));
@@ -1115,7 +1097,7 @@ finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q)
static ERTS_INLINE erts_aint32_t
select_queue_for_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
{
- erts_aint32_t flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ erts_aint32_t flags = erts_atomic32_read_nob(&pp->sched.flags);
if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
flags = check_unset_busy_port_q(pp, flags, pp->sched.taskq.bpq);
@@ -1225,7 +1207,7 @@ fetch_in_queue(Port *pp, ErtsPortTask **execqp)
if (ptp)
*execqp = ptp->u.alive.next;
else
- erts_smp_atomic32_read_band_nob(&pp->sched.flags,
+ erts_atomic32_read_band_nob(&pp->sched.flags,
~ERTS_PTS_FLG_HAVE_TASKS);
@@ -1288,7 +1270,7 @@ erl_drv_consume_timeslice(ErlDrvPort dprt, int percent)
void
erts_port_task_tmp_handle_detach(ErtsPortTaskHandle *pthp)
{
- ERTS_SMP_LC_ASSERT(erts_thr_progress_lc_is_delaying());
+ ERTS_LC_ASSERT(erts_thr_progress_lc_is_delaying());
reset_port_task_handle(pthp);
}
@@ -1301,9 +1283,7 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp)
{
int res;
ErtsPortTask *ptp;
-#ifdef ERTS_SMP
ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay();
-#endif
ptp = handle2task(pthp);
if (!ptp)
@@ -1313,41 +1293,25 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp)
#ifdef DEBUG
ErtsPortTaskHandle *saved_pthp = ptp->u.alive.handle;
- ERTS_SMP_READ_MEMORY_BARRIER;
- old_state = erts_smp_atomic32_read_nob(&ptp->state);
+ ERTS_THR_READ_MEMORY_BARRIER;
+ old_state = erts_atomic32_read_nob(&ptp->state);
if (old_state == ERTS_PT_STATE_SCHEDULED) {
ASSERT(!saved_pthp || saved_pthp == pthp);
}
#endif
- old_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
+ old_state = erts_atomic32_cmpxchg_nob(&ptp->state,
ERTS_PT_STATE_ABORTED,
ERTS_PT_STATE_SCHEDULED);
if (old_state != ERTS_PT_STATE_SCHEDULED)
res = - 1; /* Task already aborted, executing, or executed */
else {
-
reset_port_task_handle(pthp);
-
- switch (ptp->type) {
- case ERTS_PORT_TASK_INPUT:
- case ERTS_PORT_TASK_OUTPUT:
- case ERTS_PORT_TASK_EVENT:
- ASSERT(erts_smp_atomic_read_nob(
- &erts_port_task_outstanding_io_tasks) > 0);
- erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks);
- break;
- default:
- break;
- }
-
res = 0;
}
}
-#ifdef ERTS_SMP
erts_thr_progress_unmanaged_continue(dhndl);
-#endif
return res;
}
@@ -1356,12 +1320,10 @@ void
erts_port_task_abort_nosuspend_tasks(Port *pp)
{
ErtsPortTaskHandleList *abort_list;
-#ifdef ERTS_SMP
ErtsThrPrgrDelayHandle dhndl = ERTS_THR_PRGR_DHANDLE_INVALID;
-#endif
erts_port_task_sched_lock(&pp->sched);
- erts_smp_atomic32_read_band_nob(&pp->sched.flags,
+ erts_atomic32_read_band_nob(&pp->sched.flags,
~ERTS_PTS_FLG_HAVE_NS_TASKS);
abort_list = pp->sched.taskq.local.busy.nosuspend;
pp->sched.taskq.local.busy.nosuspend = NULL;
@@ -1381,40 +1343,34 @@ erts_port_task_abort_nosuspend_tasks(Port *pp)
pthlp = abort_list;
abort_list = pthlp->u.next;
-#ifdef ERTS_SMP
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
dhndl = erts_thr_progress_unmanaged_delay();
-#endif
pthp = &pthlp->handle;
ptp = handle2task(pthp);
if (!ptp) {
-#ifdef ERTS_SMP
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
erts_thr_progress_unmanaged_continue(dhndl);
-#endif
schedule_port_task_handle_list_free(pthlp);
continue;
}
#ifdef DEBUG
saved_pthp = ptp->u.alive.handle;
- ERTS_SMP_READ_MEMORY_BARRIER;
- old_state = erts_smp_atomic32_read_nob(&ptp->state);
+ ERTS_THR_READ_MEMORY_BARRIER;
+ old_state = erts_atomic32_read_nob(&ptp->state);
if (old_state == ERTS_PT_STATE_SCHEDULED) {
ASSERT(saved_pthp == pthp);
}
#endif
- old_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
+ old_state = erts_atomic32_cmpxchg_nob(&ptp->state,
ERTS_PT_STATE_ABORTED,
ERTS_PT_STATE_SCHEDULED);
if (old_state != ERTS_PT_STATE_SCHEDULED) {
/* Task already aborted, executing, or executed */
-#ifdef ERTS_SMP
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
erts_thr_progress_unmanaged_continue(dhndl);
-#endif
schedule_port_task_handle_list_free(pthlp);
continue;
}
@@ -1424,10 +1380,8 @@ erts_port_task_abort_nosuspend_tasks(Port *pp)
type = ptp->type;
td = ptp->u.alive.td;
-#ifdef ERTS_SMP
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
erts_thr_progress_unmanaged_continue(dhndl);
-#endif
schedule_port_task_handle_list_free(pthlp);
abort_nosuspend_task(pp, type, &td, pp->sched.taskq.bpq != NULL);
@@ -1446,10 +1400,8 @@ erts_port_task_schedule(Eterm id,
{
ErtsProc2PortSigData *sigdp = NULL;
ErtsPortTaskHandleList *ns_pthlp = NULL;
-#ifdef ERTS_SMP
ErtsRunQueue *xrunq;
ErtsThrPrgrDelayHandle dhndl;
-#endif
ErtsRunQueue *runq;
Port *pp;
ErtsPortTask *ptp = NULL;
@@ -1460,19 +1412,15 @@ erts_port_task_schedule(Eterm id,
ASSERT(is_internal_port(id));
-#ifdef ERTS_SMP
dhndl = erts_thr_progress_unmanaged_delay();
-#endif
pp = erts_port_lookup_raw(id);
-#ifdef ERTS_SMP
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
if (pp)
erts_port_inc_refc(pp);
erts_thr_progress_unmanaged_continue(dhndl);
}
-#endif
if (type != ERTS_PORT_TASK_PROC_SIG) {
if (!pp)
@@ -1483,7 +1431,7 @@ erts_port_task_schedule(Eterm id,
ptp->type = type;
ptp->u.alive.flags = 0;
- erts_smp_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);
+ erts_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);
set_handle(ptp, pthp);
}
@@ -1495,16 +1443,6 @@ erts_port_task_schedule(Eterm id,
va_start(argp, type);
ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent);
va_end(argp);
- erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks);
- break;
- }
- case ERTS_PORT_TASK_EVENT: {
- va_list argp;
- va_start(argp, type);
- ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent);
- ptp->u.alive.td.io.event_data = va_arg(argp, ErlDrvEventData);
- va_end(argp);
- erts_smp_atomic_inc_relb(&erts_port_task_outstanding_io_tasks);
break;
}
case ERTS_PORT_TASK_PROC_SIG: {
@@ -1559,7 +1497,7 @@ erts_port_task_schedule(Eterm id,
if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
new |= ERTS_PTS_FLG_IN_RUNQ;
- act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
+ act = erts_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
if (exp == act) {
if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
@@ -1581,47 +1519,37 @@ erts_port_task_schedule(Eterm id,
/* Enqueue port on run-queue */
runq = erts_port_runq(pp);
- if (!runq)
- ERTS_INTERNAL_ERROR("Missing run-queue");
-#ifdef ERTS_SMP
xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
- ERTS_SMP_LC_ASSERT(runq != xrunq);
- ERTS_SMP_LC_VERIFY_RQ(runq, pp);
+ ERTS_LC_ASSERT(runq != xrunq);
+ ERTS_LC_VERIFY_RQ(runq, pp);
if (xrunq) {
/* Emigrate port ... */
- erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq);
- erts_smp_runq_unlock(runq);
+ erts_set_runq_port(pp, xrunq);
+ erts_runq_unlock(runq);
runq = erts_port_runq(pp);
- if (!runq)
- ERTS_INTERNAL_ERROR("Missing run-queue");
}
-#endif
enqueue_port(runq, pp);
- erts_smp_runq_unlock(runq);
+ erts_runq_unlock(runq);
- erts_smp_notify_inc_runq(runq);
+ erts_notify_inc_runq(runq);
done:
if (prof_runnable_ports)
erts_port_task_sched_unlock(&pp->sched);
-#ifdef ERTS_SMP
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
erts_port_dec_refc(pp);
-#endif
return 0;
abort_nosuspend:
-#ifdef ERTS_SMP
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
erts_port_dec_refc(pp);
-#endif
abort_nosuspend_task(pp, ptp->type, &ptp->u.alive.td, 0);
@@ -1635,10 +1563,8 @@ abort_nosuspend:
fail:
-#ifdef ERTS_SMP
if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
erts_port_dec_refc(pp);
-#endif
if (ptp) {
abort_signal_task(pp, ERTS_PROC2PORT_SIG_ABORT,
@@ -1658,14 +1584,12 @@ erts_port_task_free_port(Port *pp)
erts_aint32_t flags;
ErtsRunQueue *runq;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
runq = erts_port_runq(pp);
- if (!runq)
- ERTS_INTERNAL_ERROR("Missing run-queue");
erts_port_task_sched_lock(&pp->sched);
- flags = erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
+ flags = erts_atomic32_read_bor_relb(&pp->sched.flags,
ERTS_PTS_FLG_EXIT);
erts_port_task_sched_unlock(&pp->sched);
erts_atomic32_read_bset_relb(&pp->state,
@@ -1675,7 +1599,7 @@ erts_port_task_free_port(Port *pp)
| ERTS_PORT_SFLG_FREE),
ERTS_PORT_SFLG_FREE);
- erts_smp_runq_unlock(runq);
+ erts_runq_unlock(runq);
if (!(flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
begin_port_cleanup(pp, NULL, NULL);
@@ -1688,13 +1612,12 @@ erts_port_task_free_port(Port *pp)
* scheduling of processes. Run-queue lock should be held by caller.
*/
-int
+void
erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
{
Port *pp;
ErtsPortTask *execq;
int processing_busy_q;
- int res = 0;
int vreds = 0;
int reds = 0;
erts_aint_t io_tasks_executed = 0;
@@ -1705,17 +1628,16 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
ErtsSchedulerData *esdp = runq->scheduler;
ERTS_MSACC_PUSH_STATE_M();
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
+ ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
pp = pop_port(runq);
if (!pp) {
- res = 0;
goto done;
}
- ERTS_SMP_LC_VERIFY_RQ(runq, pp);
+ ERTS_LC_VERIFY_RQ(runq, pp);
- erts_smp_runq_unlock(runq);
+ erts_runq_unlock(runq);
*curr_port_pp = pp;
@@ -1723,19 +1645,19 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
Uint old = ERTS_PORT_SCHED_ID(pp, esdp->no);
int migrated = old && old != esdp->no;
- erts_smp_spin_lock(&erts_sched_stat.lock);
+ erts_spin_lock(&erts_sched_stat.lock);
erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_executed++;
erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].executed++;
if (migrated) {
erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_migrated++;
erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].migrated++;
}
- erts_smp_spin_unlock(&erts_sched_stat.lock);
+ erts_spin_unlock(&erts_sched_stat.lock);
}
prepare_exec(pp, &execq, &processing_busy_q);
- erts_smp_port_lock(pp);
+ erts_port_lock(pp);
/* trace port scheduling, in */
if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
@@ -1757,7 +1679,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
if (!ptp)
break;
- task_state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
+ task_state = erts_atomic32_cmpxchg_nob(&ptp->state,
ERTS_PT_STATE_EXECUTING,
ERTS_PT_STATE_SCHEDULED);
if (task_state != ERTS_PT_STATE_SCHEDULED) {
@@ -1769,8 +1691,8 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
start_time = erts_timestamp_millis();
}
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
- ERTS_SMP_CHK_NO_PROC_LOCKS;
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
+ ERTS_CHK_NO_PROC_LOCKS;
ASSERT(pp->drv_ptr);
switch (ptp->type) {
@@ -1812,17 +1734,6 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
reset_executed_io_task_handle(ptp);
io_tasks_executed++;
break;
- case ERTS_PORT_TASK_EVENT:
- reds = ERTS_PORT_REDS_EVENT;
- ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
- DTRACE_DRIVER(driver_event, pp);
- LTTNG_DRIVER(driver_event, pp);
- (*pp->drv_ptr->event)((ErlDrvData) pp->drv_data,
- ptp->u.alive.td.io.event,
- ptp->u.alive.td.io.event_data);
- reset_executed_io_task_handle(ptp);
- io_tasks_executed++;
- break;
case ERTS_PORT_TASK_PROC_SIG: {
ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
reset_handle(ptp);
@@ -1887,17 +1798,7 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
erts_unblock_fpe(fpe_was_unmasked);
ERTS_MSACC_POP_STATE_M();
-
- if (io_tasks_executed) {
- ASSERT(erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
- >= io_tasks_executed);
- erts_smp_atomic_add_relb(&erts_port_task_outstanding_io_tasks,
- -1*io_tasks_executed);
- }
-
-#ifdef ERTS_SMP
- ASSERT(runq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
-#endif
+ ASSERT(runq == erts_get_runq_port(pp));
active = finalize_exec(pp, &execq, processing_busy_q);
@@ -1907,54 +1808,42 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
*curr_port_pp = NULL;
- erts_smp_runq_lock(runq);
+ erts_runq_lock(runq);
if (active) {
-#ifdef ERTS_SMP
ErtsRunQueue *xrunq;
-#endif
ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
-#ifdef ERTS_SMP
xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
- ERTS_SMP_LC_ASSERT(runq != xrunq);
- ERTS_SMP_LC_VERIFY_RQ(runq, pp);
+ ERTS_LC_ASSERT(runq != xrunq);
+ ERTS_LC_VERIFY_RQ(runq, pp);
if (!xrunq) {
-#endif
enqueue_port(runq, pp);
/* No need to notify ourselves about inc in runq. */
-#ifdef ERTS_SMP
}
else {
/* Emigrate port... */
- erts_smp_atomic_set_nob(&pp->run_queue, (erts_aint_t) xrunq);
- erts_smp_runq_unlock(runq);
+ erts_set_runq_port(pp, xrunq);
+ erts_runq_unlock(runq);
xrunq = erts_port_runq(pp);
- ASSERT(xrunq);
enqueue_port(xrunq, pp);
- erts_smp_runq_unlock(xrunq);
- erts_smp_notify_inc_runq(xrunq);
+ erts_runq_unlock(xrunq);
+ erts_notify_inc_runq(xrunq);
- erts_smp_runq_lock(runq);
+ erts_runq_lock(runq);
}
-#endif
}
done:
- res = (erts_smp_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
- != (erts_aint_t) 0);
runq->scheduler->reductions += reds;
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(runq));
+ ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
ERTS_PORT_REDUCTIONS_EXECUTED(esdp, runq, reds);
-
- return res;
}
-#ifdef ERTS_SMP
static void
release_port(void *vport)
{
@@ -1970,7 +1859,6 @@ schedule_release_port(void *vport) {
&pp->common.u.release);
}
-#endif
static void
begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
@@ -1981,7 +1869,7 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
ErtsPortTaskHandleList *free_nshp = NULL;
ErtsProcList *plp;
- ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+ ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
/*
* Abort remaining tasks...
@@ -2054,11 +1942,11 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
qs[i] = ptp->u.alive.next;
/* Normal case here is aborted tasks... */
- state = erts_smp_atomic32_read_nob(&ptp->state);
+ state = erts_atomic32_read_nob(&ptp->state);
if (state == ERTS_PT_STATE_ABORTED)
goto aborted_port_task;
- state = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
+ state = erts_atomic32_cmpxchg_nob(&ptp->state,
ERTS_PT_STATE_EXECUTING,
ERTS_PT_STATE_SCHEDULED);
if (state != ERTS_PT_STATE_SCHEDULED) {
@@ -2085,13 +1973,6 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
DO_WRITE,
1);
break;
- case ERTS_PORT_TASK_EVENT:
- erts_stale_drv_select(pp->common.id,
- ERTS_Port2ErlDrvPort(pp),
- ptp->u.alive.td.io.event,
- 0,
- 1);
- break;
case ERTS_PORT_TASK_DIST_CMD:
break;
case ERTS_PORT_TASK_PROC_SIG: {
@@ -2122,7 +2003,7 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
}
}
- erts_smp_atomic32_read_band_nob(&pp->sched.flags,
+ erts_atomic32_read_band_nob(&pp->sched.flags,
~(ERTS_PTS_FLG_HAVE_BUSY_TASKS
|ERTS_PTS_FLG_HAVE_TASKS
|ERTS_PTS_FLGS_BUSY));
@@ -2164,7 +2045,6 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
/*
* Schedule cleanup of port structure...
*/
-#ifdef ERTS_SMP
/* We might not be a scheduler, eg. traceing to port we are sys_msg_dispatcher */
if (!erts_get_scheduler_data()) {
erts_schedule_misc_aux_work(1, schedule_release_port, (void*)pp);
@@ -2174,19 +2054,15 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
(void *) pp,
&pp->common.u.release);
}
-#else
- pp->cleanup = 1;
-#endif
}
-#ifdef ERTS_SMP
void
erts_enqueue_port(ErtsRunQueue *rq, Port *pp)
{
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
- ASSERT(rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
- ASSERT(erts_smp_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_IN_RUNQ);
+ ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
+ ASSERT(rq == erts_get_runq_port(pp));
+ ASSERT(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_IN_RUNQ);
enqueue_port(rq, pp);
}
@@ -2194,16 +2070,14 @@ Port *
erts_dequeue_port(ErtsRunQueue *rq)
{
Port *pp;
- ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(rq));
+ ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
pp = pop_port(rq);
- ASSERT(!pp
- || rq == (ErtsRunQueue *) erts_smp_atomic_read_nob(&pp->run_queue));
- ASSERT(!pp || (erts_smp_atomic32_read_nob(&pp->sched.flags)
+ ASSERT(!pp || rq == erts_get_runq_port(pp));
+ ASSERT(!pp || (erts_atomic32_read_nob(&pp->sched.flags)
& ERTS_PTS_FLG_IN_RUNQ));
return pp;
}
-#endif
/*
* Initialize the module.
@@ -2211,8 +2085,7 @@ erts_dequeue_port(ErtsRunQueue *rq)
void
erts_port_task_init(void)
{
- erts_smp_atomic_init_nob(&erts_port_task_outstanding_io_tasks,
- (erts_aint_t) 0);
- init_port_task_alloc();
+ init_port_task_alloc(erts_no_schedulers + erts_no_poll_threads
+ + 1); /* aux_thread */
init_busy_caller_table_alloc();
}