aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2012-11-29 01:24:43 +0100
committerRickard Green <[email protected]>2012-12-07 00:24:27 +0100
commit9e4895da833b7777e69efc173f5dc777aaea3201 (patch)
tree02338dbdbf0b449b7b643437bf2cdd0cf73e4615 /erts/emulator
parent43ebafb5fb40aee326b951d18c1880e6e5fdef6b (diff)
downloadotp-9e4895da833b7777e69efc173f5dc777aaea3201.tar.gz
otp-9e4895da833b7777e69efc173f5dc777aaea3201.tar.bz2
otp-9e4895da833b7777e69efc173f5dc777aaea3201.zip
Add support for busy port message queue
Diffstat (limited to 'erts/emulator')
-rw-r--r--erts/emulator/beam/dist.c19
-rw-r--r--erts/emulator/beam/erl_driver.h10
-rw-r--r--erts/emulator/beam/erl_port.h64
-rw-r--r--erts/emulator/beam/erl_port_task.c431
-rw-r--r--erts/emulator/beam/erl_port_task.h46
-rw-r--r--erts/emulator/beam/io.c163
-rw-r--r--erts/emulator/drivers/common/inet_drv.c72
7 files changed, 610 insertions, 195 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index a939b90eac..fd5efd89f1 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -2039,7 +2039,7 @@ erts_dist_command(Port *prt, int reds_limit)
if (reds > reds_limit)
goto preempted;
- if (!(sched_flags & ERTS_PTS_FLG_BUSY) && foq.first) {
+ if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) {
int preempt = 0;
do {
Uint size;
@@ -2057,7 +2057,7 @@ erts_dist_command(Port *prt, int reds_limit)
free_dist_obuf(fob);
sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT);
- if (sched_flags & ERTS_PTS_FLG_BUSY)
+ if (sched_flags & ERTS_PTS_FLG_BUSY_PORT)
break;
} while (foq.first && !preempt);
if (!foq.first)
@@ -2066,7 +2066,7 @@ erts_dist_command(Port *prt, int reds_limit)
goto preempted;
}
- if (sched_flags & ERTS_PTS_FLG_BUSY) {
+ if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) {
if (oq.first) {
ErtsDistOutputBuf *ob;
int preempt;
@@ -2139,7 +2139,7 @@ erts_dist_command(Port *prt, int reds_limit)
free_dist_obuf(fob);
sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT);
- if ((sched_flags & ERTS_PTS_FLG_BUSY) && oq.first && !preempt)
+ if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt)
goto finalize_only;
}
@@ -2168,7 +2168,7 @@ erts_dist_command(Port *prt, int reds_limit)
ASSERT(dep->qsize >= obufsize);
dep->qsize -= obufsize;
obufsize = 0;
- if (!(sched_flags & ERTS_PTS_FLG_BUSY)
+ if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT)
&& (dep->qflgs & ERTS_DE_QFLG_BUSY)
&& dep->qsize < erts_dist_buf_busy_limit) {
ErtsProcList *suspendees;
@@ -2692,6 +2692,15 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
+ /*
+ * Dist-ports do not use the "busy port message queue" functionality, but
+ * instead use "busy dist entry" functionality.
+ */
+ {
+ ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
+ erl_drv_busy_msgq_limits((ErlDrvPort) pp, &disable, NULL);
+ }
+
pp->dist_entry = dep;
dep->version = version;
diff --git a/erts/emulator/beam/erl_driver.h b/erts/emulator/beam/erl_driver.h
index fb9e92e44b..cbc7bd4ede 100644
--- a/erts/emulator/beam/erl_driver.h
+++ b/erts/emulator/beam/erl_driver.h
@@ -157,6 +157,7 @@ typedef struct {
#define ERL_DRV_FLAG_USE_PORT_LOCKING (1 << 0)
#define ERL_DRV_FLAG_SOFT_BUSY (1 << 1)
+#define ERL_DRV_FLAG_NO_BUSY_MSGQ (1 << 2)
/*
* Integer types
@@ -384,9 +385,18 @@ typedef struct erl_drv_entry {
ErlDrvEntry* driver_init(void)
#endif
+#define ERL_DRV_BUSY_MSGQ_DISABLED (~((ErlDrvSizeT) 0))
+#define ERL_DRV_BUSY_MSGQ_READ_ONLY ((ErlDrvSizeT) 0)
+#define ERL_DRV_BUSY_MSGQ_LIM_MAX (ERL_DRV_BUSY_MSGQ_DISABLED - 1)
+#define ERL_DRV_BUSY_MSGQ_LIM_MIN ((ErlDrvSizeT) 1)
+
/*
* These are the functions available for driver writers.
*/
+EXTERN void erl_drv_busy_msgq_limits(ErlDrvPort port,
+ ErlDrvSizeT *low,
+ ErlDrvSizeT *high);
+
EXTERN int driver_select(ErlDrvPort port, ErlDrvEvent event, int mode, int on);
EXTERN int driver_event(ErlDrvPort port, ErlDrvEvent event,
ErlDrvEventData event_data);
diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h
index 1eb6dea710..67c8c28f3f 100644
--- a/erts/emulator/beam/erl_port.h
+++ b/erts/emulator/beam/erl_port.h
@@ -761,15 +761,36 @@ erts_port_driver_callback_epilogue(Port *prt, erts_aint32_t *statep)
#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
+void erts_port_resume_procs(Port *);
+
struct binary;
-#define ERTS_P2P_SIG_DATA_FLG_BANG_OP (1 << 0)
-#define ERTS_P2P_SIG_DATA_FLG_REPLY (1 << 1)
-#define ERTS_P2P_SIG_DATA_FLG_NOSUSPEND (1 << 2)
-#define ERTS_P2P_SIG_DATA_FLG_FORCE (1 << 3)
-#define ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT (1 << 4)
-#define ERTS_P2P_SIG_DATA_FLG_BROKEN_LINK (1 << 5)
-#define ERTS_P2P_SIG_DATA_FLG_SCHED (1 << 6)
+#define ERTS_P2P_SIG_TYPE_BAD 0
+#define ERTS_P2P_SIG_TYPE_OUTPUT 1
+#define ERTS_P2P_SIG_TYPE_OUTPUTV 2
+#define ERTS_P2P_SIG_TYPE_CONNECT 3
+#define ERTS_P2P_SIG_TYPE_EXIT 4
+#define ERTS_P2P_SIG_TYPE_CONTROL 5
+#define ERTS_P2P_SIG_TYPE_CALL 6
+#define ERTS_P2P_SIG_TYPE_INFO 7
+#define ERTS_P2P_SIG_TYPE_LINK 8
+#define ERTS_P2P_SIG_TYPE_UNLINK 9
+#define ERTS_P2P_SIG_TYPE_SET_DATA 10
+#define ERTS_P2P_SIG_TYPE_GET_DATA 11
+
+#define ERTS_P2P_SIG_TYPE_BITS 4
+#define ERTS_P2P_SIG_TYPE_MASK \
+ ((1 << ERTS_P2P_SIG_TYPE_BITS) - 1)
+
+#define ERTS_P2P_SIG_DATA_FLG(N) \
+ (1 << (ERTS_P2P_SIG_TYPE_BITS + (N)))
+#define ERTS_P2P_SIG_DATA_FLG_BANG_OP ERTS_P2P_SIG_DATA_FLG(0)
+#define ERTS_P2P_SIG_DATA_FLG_REPLY ERTS_P2P_SIG_DATA_FLG(1)
+#define ERTS_P2P_SIG_DATA_FLG_NOSUSPEND ERTS_P2P_SIG_DATA_FLG(2)
+#define ERTS_P2P_SIG_DATA_FLG_FORCE ERTS_P2P_SIG_DATA_FLG(3)
+#define ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT ERTS_P2P_SIG_DATA_FLG(4)
+#define ERTS_P2P_SIG_DATA_FLG_BROKEN_LINK ERTS_P2P_SIG_DATA_FLG(5)
+#define ERTS_P2P_SIG_DATA_FLG_SCHED ERTS_P2P_SIG_DATA_FLG(6)
struct ErtsProc2PortSigData_ {
int flags;
@@ -823,6 +844,35 @@ struct ErtsProc2PortSigData_ {
} u;
} ;
+ERTS_GLB_INLINE int
+erts_proc2port_sig_is_command_op(ErtsProc2PortSigData *sigdp);
+ERTS_GLB_INLINE ErlDrvSizeT
+erts_proc2port_sig_command_data_size(ErtsProc2PortSigData *sigdp);
+
+#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+
+ERTS_GLB_INLINE int
+erts_proc2port_sig_is_command_op(ErtsProc2PortSigData *sigdp)
+{
+ switch (sigdp->flags & ERTS_P2P_SIG_TYPE_MASK) {
+ case ERTS_P2P_SIG_TYPE_OUTPUT: return !0;
+ case ERTS_P2P_SIG_TYPE_OUTPUTV: return !0;
+ default: return 0;
+ }
+}
+
+ERTS_GLB_INLINE ErlDrvSizeT
+erts_proc2port_sig_command_data_size(ErtsProc2PortSigData *sigdp)
+{
+ switch (sigdp->flags & ERTS_P2P_SIG_TYPE_MASK) {
+ case ERTS_P2P_SIG_TYPE_OUTPUT: return sigdp->u.output.size;
+ case ERTS_P2P_SIG_TYPE_OUTPUTV: return sigdp->u.outputv.evp->size;
+ default: return (ErlDrvSizeT) 0;
+ }
+}
+
+#endif
+
#define ERTS_PROC2PORT_SIG_EXEC 0
#define ERTS_PROC2PORT_SIG_ABORT 1
#define ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND 2
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index b6b113b753..89f6a4e774 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -539,103 +539,257 @@ set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
}
}
+
/*
- * No-suspend handles.
+ * Busy port queue management
*/
-#ifdef ERTS_SMP
-static void
-free_port_task_handle_list(void *vpthlp)
+static erts_aint32_t
+check_unset_busy_port_q(Port *pp,
+ erts_aint32_t flags,
+ ErtsPortTaskBusyPortQ *bpq)
{
- erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp);
-}
-#endif
+ ErlDrvSizeT qsize, low;
+ int resume_procs = 0;
-static void
-schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp)
-{
-#ifdef ERTS_SMP
- erts_schedule_thr_prgr_later_op(free_port_task_handle_list,
- (void *) pthlp,
- &pthlp->u.release);
-#else
- erts_free(ERTS_ALC_T_PT_HNDL_LIST, pthlp);
-#endif
+ ASSERT(bpq);
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+
+ erts_port_task_sched_lock(&pp->sched);
+ qsize = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size);
+ low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low);
+ if (qsize < low) {
+ erts_aint32_t mask = ~(ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
+ | ERTS_PTS_FLG_BUSY_PORT_Q);
+ flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags, mask);
+ if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
+ resume_procs = 1;
+ }
+ else if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) {
+ flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags,
+ ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
+ flags &= ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
+ }
+ erts_port_task_sched_unlock(&pp->sched);
+ if (resume_procs)
+ erts_port_resume_procs(pp);
+
+ return flags;
}
static ERTS_INLINE void
-abort_nosuspend_task(ErtsPortTaskType type,
- ErtsPortTaskTypeData *tdp)
+aborted_proc2port_data(Port *pp, ErlDrvSizeT size)
{
+ ErtsPortTaskBusyPortQ *bpq;
+ erts_aint32_t flags;
+ ErlDrvSizeT qsz;
- if (type != ERTS_PORT_TASK_PROC_SIG)
- ERTS_INTERNAL_ERROR("Invalid no-suspend port task type");
+ ASSERT(pp->sched.taskq.bpq);
- tdp->psig.callback(NULL,
- ERTS_PORT_SFLG_INVALID,
- ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND,
- &tdp->psig.data);
-}
+ if (size == 0)
+ return;
+ bpq = pp->sched.taskq.bpq;
-static void
-save_nosuspend_handle(Port *pp, ErtsPortTask *ptp)
-{
- erts_aint32_t act;
- ErtsPortTaskHandleList *pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST,
- sizeof(ErtsPortTaskHandleList));
+ qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size,
+ (erts_aint_t) -size);
+ ASSERT(qsz + size > qsz);
+ flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ ASSERT(pp->sched.taskq.bpq);
+ if ((flags & (ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
+ | ERTS_PTS_FLG_BUSY_PORT_Q)) != ERTS_PTS_FLG_BUSY_PORT_Q)
+ return;
+ if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low))
+ erts_smp_atomic32_read_bor_nob(&pp->sched.flags,
+ ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
+}
- set_handle(ptp, &pthlp->handle);
+static ERTS_INLINE void
+dequeued_proc2port_data(Port *pp, ErlDrvSizeT size)
+{
+ ErtsPortTaskBusyPortQ *bpq;
+ erts_aint32_t flags;
+ ErlDrvSizeT qsz;
- ASSERT(ptp == handle2task(&pthlp->handle));
- ASSERT(ptp->u.alive.handle == &pthlp->handle);
+ ASSERT(pp->sched.taskq.bpq);
- act = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ if (size == 0)
+ return;
- if (!(act & ERTS_PTS_FLG_BUSY)) {
+ bpq = pp->sched.taskq.bpq;
- erts_port_task_sched_lock(&pp->sched);
- pthlp->u.next = pp->sched.taskq.local.busy.nosuspend;
- pp->sched.taskq.local.busy.nosuspend = pthlp;
- erts_port_task_sched_unlock(&pp->sched);
+ qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size,
+ (erts_aint_t) -size);
+ ASSERT(qsz + size > qsz);
+ flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q))
+ return;
+ if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->low))
+ check_unset_busy_port_q(pp, flags, bpq);
+}
- act = erts_smp_atomic32_read_nob(&pp->sched.flags);
+static ERTS_INLINE erts_aint32_t
+enqueue_proc2port_data(Port *pp,
+ ErtsProc2PortSigData *sigdp,
+ erts_aint32_t flags)
+{
+ ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq;
+ if (sigdp && bpq) {
+ ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
+ if (size) {
+ erts_aint_t asize = erts_smp_atomic_add_read_acqb(&bpq->size,
+ (erts_aint_t) size);
+ ErlDrvSizeT qsz = (ErlDrvSizeT) asize;
+
+ ASSERT(qsz - size < qsz);
+
+ if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q) && qsz > bpq->high) {
+ flags = erts_smp_atomic32_read_bor_acqb(&pp->sched.flags,
+ ERTS_PTS_FLG_BUSY_PORT_Q);
+ flags |= ERTS_PTS_FLG_BUSY_PORT_Q;
+ qsz = (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->size);
+ if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low)) {
+ flags = (erts_smp_atomic32_read_bor_relb(
+ &pp->sched.flags,
+ ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q));
+ flags |= ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
+ }
+ }
+ ASSERT(!(flags & ERTS_PTS_FLG_EXIT));
+ }
+ }
+ return flags;
+}
- while (1) {
- erts_aint32_t exp, new;
+/*
+ * erl_drv_busy_msgq_limits() is called by drivers either reading or
+ * writing the limits.
+ *
+ * A limit of zero is interpreted as a read only request (using a
+ * limit of zero would not be useful). Other values are interpreted
+ * as a write-read request.
+ */
- if ((act & (ERTS_PTS_FLG_BUSY|ERTS_PTS_FLG_HAVE_NS_TASKS))
- == ERTS_PTS_FLG_HAVE_NS_TASKS)
- return;
+void
+erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp)
+{
+ Port *pp = erts_drvport2port(dport, NULL);
+ ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq;
+ int written = 0, resume_procs = 0;
+ ErlDrvSizeT low, high;
+
+ if (!pp || !bpq) {
+ if (lowp)
+ *lowp = ERL_DRV_BUSY_MSGQ_DISABLED;
+ if (highp)
+ *highp = ERL_DRV_BUSY_MSGQ_DISABLED;
+ return;
+ }
- if (act & ERTS_PTS_FLG_BUSY) {
- erts_aint32_t s;
- s = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
- ERTS_PT_STATE_ABORTED,
- ERTS_PT_STATE_SCHEDULED);
- if (s == ERTS_PT_STATE_SCHEDULED) {
- reset_port_task_handle(&pthlp->handle);
- break; /* Abort task */
- }
- /* Else: someone else handled it */
- return;
- }
+ low = lowp ? *lowp : 0;
+ high = highp ? *highp : 0;
- new = exp = act;
+ erts_port_task_sched_lock(&pp->sched);
- new |= ERTS_PTS_FLG_HAVE_NS_TASKS;
+ if (low == ERL_DRV_BUSY_MSGQ_DISABLED
+ || high == ERL_DRV_BUSY_MSGQ_DISABLED) {
+ /* Disable busy msgq feature */
+ erts_aint32_t flags;
+ pp->sched.taskq.bpq = NULL;
+ flags = ~(ERTS_PTS_FLG_BUSY_PORT_Q|ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
+ flags = erts_smp_atomic32_read_band_acqb(&pp->sched.flags, flags);
+ if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
+ resume_procs = 1;
+ }
+ else {
- act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
- if (act == exp)
- return;
+ if (!low)
+ low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low);
+ else {
+ if (bpq->high < low)
+ bpq->high = low;
+ erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low);
+ written = 1;
+ }
+
+ if (!high)
+ high = bpq->high;
+ else {
+ if (low > high) {
+ low = high;
+ erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low);
+ }
+ bpq->high = high;
+ written = 1;
+ }
+ if (written) {
+ ErlDrvSizeT size = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size);
+ if (size > high)
+ erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
+ ERTS_PTS_FLG_BUSY_PORT_Q);
+ else if (size < low)
+ erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
+ ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
}
}
+ erts_port_task_sched_unlock(&pp->sched);
- abort_nosuspend_task(ptp->type, &ptp->u.alive.td);
+ if (resume_procs)
+ erts_port_resume_procs(pp);
+ if (lowp)
+ *lowp = low;
+ if (highp)
+ *highp = high;
}
+/*
+ * No-suspend handles.
+ */
+
+#ifdef ERTS_SMP
+static void
+free_port_task_handle_list(void *vpthlp)
+{
+ erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp);
+}
+#endif
+
+static void
+schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp)
+{
+#ifdef ERTS_SMP
+ erts_schedule_thr_prgr_later_op(free_port_task_handle_list,
+ (void *) pthlp,
+ &pthlp->u.release);
+#else
+ erts_free(ERTS_ALC_T_PT_HNDL_LIST, pthlp);
+#endif
+}
+
+static ERTS_INLINE void
+abort_nosuspend_task(Port *pp,
+ ErtsPortTaskType type,
+ ErtsPortTaskTypeData *tdp)
+{
+
+ ASSERT(type == ERTS_PORT_TASK_PROC_SIG);
+
+ if (!pp->sched.taskq.bpq)
+ tdp->psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND,
+ &tdp->psig.data);
+ else {
+ ErlDrvSizeT size = erts_proc2port_sig_command_data_size(&tdp->psig.data);
+ tdp->psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND,
+ &tdp->psig.data);
+ aborted_proc2port_data(pp, size);
+ }
+}
static ErtsPortTaskHandleList *
get_free_nosuspend_handles(Port *pp)
@@ -724,14 +878,29 @@ pop_port(ErtsRunQueue *runq)
* Task queue operations
*/
-static ERTS_INLINE erts_aint32_t
-enqueue_task(Port *pp, ErtsPortTask *ptp)
+static ERTS_INLINE int
+enqueue_task(Port *pp,
+ ErtsPortTask *ptp,
+ ErtsProc2PortSigData *sigdp,
+ ErtsPortTaskHandleList *ns_pthlp,
+ erts_aint32_t *flagsp)
+
{
+ int res;
+ erts_aint32_t fail_flags = ERTS_PTS_FLG_EXIT;
erts_aint32_t flags;
ptp->u.alive.next = NULL;
+ if (ns_pthlp)
+ fail_flags |= ERTS_PTS_FLG_BUSY_PORT;
erts_port_task_sched_lock(&pp->sched);
flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
- if (!(flags & ERTS_PTS_FLG_EXIT)) {
+ if (flags & fail_flags)
+ res = 0;
+ else {
+ if (ns_pthlp) {
+ ns_pthlp->u.next = pp->sched.taskq.local.busy.nosuspend;
+ pp->sched.taskq.local.busy.nosuspend = ns_pthlp;
+ }
if (pp->sched.taskq.in.last) {
ASSERT(pp->sched.taskq.in.first);
ASSERT(!pp->sched.taskq.in.last->u.alive.next);
@@ -744,9 +913,12 @@ enqueue_task(Port *pp, ErtsPortTask *ptp)
pp->sched.taskq.in.first = ptp;
}
pp->sched.taskq.in.last = ptp;
+ flags = enqueue_proc2port_data(pp, sigdp, flags);
+ res = 1;
}
erts_port_task_sched_unlock(&pp->sched);
- return flags;
+ *flagsp = flags;
+ return res;
}
static ERTS_INLINE void
@@ -754,7 +926,7 @@ prepare_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
{
erts_aint32_t act = erts_smp_atomic32_read_nob(&pp->sched.flags);
- if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY)) {
+ if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY_PORT)) {
*execqp = pp->sched.taskq.local.first;
*processing_busy_q_p = 0;
}
@@ -799,10 +971,9 @@ finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q)
*execq = NULL;
- /* guess a likely value */
- act = ERTS_PTS_FLG_EXEC;
- if (execq)
- act |= ERTS_PTS_FLG_HAVE_TASKS;
+ act = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ if (act & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
+ act = check_unset_busy_port_q(pp, act, pp->sched.taskq.bpq);
while (1) {
erts_aint32_t new, exp;
@@ -829,9 +1000,12 @@ select_queue_for_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
{
erts_aint32_t flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
+ flags = check_unset_busy_port_q(pp, flags, pp->sched.taskq.bpq);
+
ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
- if (flags & ERTS_PTS_FLG_BUSY) {
+ if (flags & ERTS_PTS_FLG_BUSY_PORT) {
if (*processing_busy_q_p) {
ErtsPortTask *ptp;
@@ -880,7 +1054,7 @@ check_task_for_exec(Port *pp,
ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);
- if ((flags & ERTS_PTS_FLG_BUSY)
+ if ((flags & ERTS_PTS_FLG_BUSY_PORT)
&& (ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)) {
busy_wait_move_to_busy_queue(pp, ptp);
@@ -902,7 +1076,7 @@ check_task_for_exec(Port *pp,
else {
/* Processing busy queue */
- ASSERT(!(flags & ERTS_PTS_FLG_BUSY));
+ ASSERT(!(flags & ERTS_PTS_FLG_BUSY_PORT));
ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);
@@ -1019,10 +1193,7 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp)
erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks);
break;
case ERTS_PORT_TASK_PROC_SIG:
- ptp->u.alive.td.psig.callback(NULL,
- ERTS_PORT_SFLG_INVALID,
- ERTS_PROC2PORT_SIG_ABORT,
- &ptp->u.alive.td.psig.data);
+ ERTS_INTERNAL_ERROR("Aborted process to port signal");
break;
default:
break;
@@ -1042,14 +1213,15 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp)
void
erts_port_task_abort_nosuspend_tasks(Port *pp)
{
+ erts_aint32_t flags;
ErtsPortTaskHandleList *abort_list;
#ifdef ERTS_SMP
ErtsThrPrgrDelayHandle dhndl = ERTS_THR_PRGR_DHANDLE_INVALID;
#endif
erts_port_task_sched_lock(&pp->sched);
- erts_smp_atomic32_read_band_nob(&pp->sched.flags,
- ~ERTS_PTS_FLG_HAVE_NS_TASKS);
+ flags = erts_smp_atomic32_read_band_nob(&pp->sched.flags,
+ ~ERTS_PTS_FLG_HAVE_NS_TASKS);
abort_list = pp->sched.taskq.local.busy.nosuspend;
pp->sched.taskq.local.busy.nosuspend = NULL;
erts_port_task_sched_unlock(&pp->sched);
@@ -1117,7 +1289,7 @@ erts_port_task_abort_nosuspend_tasks(Port *pp)
#endif
schedule_port_task_handle_list_free(pthlp);
- abort_nosuspend_task(type, &td);
+ abort_nosuspend_task(pp, type, &td);
}
}
@@ -1131,6 +1303,8 @@ erts_port_task_schedule(Eterm id,
ErtsPortTaskType type,
...)
{
+ ErtsProc2PortSigData *sigdp = NULL;
+ ErtsPortTaskHandleList *ns_pthlp = NULL;
#ifdef ERTS_SMP
ErtsRunQueue *xrunq;
ErtsThrPrgrDelayHandle dhndl;
@@ -1138,7 +1312,7 @@ erts_port_task_schedule(Eterm id,
ErtsRunQueue *runq;
Port *pp;
ErtsPortTask *ptp = NULL;
- erts_aint32_t act;
+ erts_aint32_t act, add_flags;
if (pthp && erts_port_task_is_scheduled(pthp)) {
ASSERT(0);
@@ -1195,7 +1369,6 @@ erts_port_task_schedule(Eterm id,
break;
}
case ERTS_PORT_TASK_PROC_SIG: {
- ErtsProc2PortSigData *sigdp;
va_list argp;
ASSERT(!pthp);
va_start(argp, type);
@@ -1204,31 +1377,40 @@ erts_port_task_schedule(Eterm id,
ptp->u.alive.td.psig.callback = va_arg(argp, ErtsProc2PortSigCallback);
ptp->u.alive.flags |= va_arg(argp, int);
va_end(argp);
- if (ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND)
- save_nosuspend_handle(pp, ptp);
- else
+ if (!(ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND))
set_handle(ptp, pthp);
+ else {
+ ns_pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST,
+ sizeof(ErtsPortTaskHandleList));
+ set_handle(ptp, &ns_pthlp->handle);
+ }
break;
}
default:
break;
}
- act = enqueue_task(pp, ptp);
- if (act & ERTS_PTS_FLG_EXIT) {
+ if (!enqueue_task(pp, ptp, sigdp, ns_pthlp, &act)) {
reset_handle(ptp);
- goto fail;
+ if (ns_pthlp && !(act & ERTS_PTS_FLG_EXIT))
+ goto abort_nosuspend;
+ else
+ goto fail;
}
+ add_flags = ERTS_PTS_FLG_HAVE_TASKS;
+ if (ns_pthlp)
+ add_flags |= ERTS_PTS_FLG_HAVE_NS_TASKS;
+
while (1) {
erts_aint32_t new, exp;
- if ((act & ERTS_PTS_FLG_HAVE_TASKS)
+ if ((act & add_flags) == add_flags
&& (act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
goto done; /* Done */
new = exp = act;
- new |= ERTS_PTS_FLG_HAVE_TASKS;
+ new |= add_flags;
if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
new |= ERTS_PTS_FLG_IN_RUNQ;
@@ -1281,6 +1463,22 @@ done:
return 0;
+abort_nosuspend:
+
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ erts_port_dec_refc(pp);
+#endif
+
+ abort_nosuspend_task(pp, ptp->type, &ptp->u.alive.td);
+
+ ASSERT(ns_pthlp);
+ erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp);
+ if (ptp)
+ port_task_free(ptp);
+
+ return 0;
+
fail:
#ifdef ERTS_SMP
@@ -1288,6 +1486,9 @@ fail:
erts_port_dec_refc(pp);
#endif
+ if (ns_pthlp)
+ erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp);
+
if (ptp)
port_task_free(ptp);
@@ -1444,13 +1645,24 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
ptp->u.alive.td.io.event_data);
io_tasks_executed++;
break;
- case ERTS_PORT_TASK_PROC_SIG:
+ case ERTS_PORT_TASK_PROC_SIG: {
+ ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
- reds += ptp->u.alive.td.psig.callback(pp,
- state,
- ERTS_PROC2PORT_SIG_EXEC,
- &ptp->u.alive.td.psig.data);
+ if (!pp->sched.taskq.bpq)
+ reds += ptp->u.alive.td.psig.callback(pp,
+ state,
+ ERTS_PROC2PORT_SIG_EXEC,
+ sigdp);
+ else {
+ ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
+ reds += ptp->u.alive.td.psig.callback(pp,
+ state,
+ ERTS_PROC2PORT_SIG_EXEC,
+ sigdp);
+ dequeued_proc2port_data(pp, size);
+ }
break;
+ }
case ERTS_PORT_TASK_DIST_CMD:
reds += erts_dist_command(pp, CONTEXT_REDS-reds);
break;
@@ -1633,12 +1845,23 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp)
break;
case ERTS_PORT_TASK_DIST_CMD:
break;
- case ERTS_PORT_TASK_PROC_SIG:
- ptp->u.alive.td.psig.callback(NULL,
- ERTS_PORT_SFLG_INVALID,
- ERTS_PROC2PORT_SIG_ABORT_CLOSED,
- &ptp->u.alive.td.psig.data);
+ case ERTS_PORT_TASK_PROC_SIG: {
+ ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
+ if (!pp->sched.taskq.bpq)
+ ptp->u.alive.td.psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT_CLOSED,
+ sigdp);
+ else {
+ ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
+ ptp->u.alive.td.psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT_CLOSED,
+ sigdp);
+ aborted_proc2port_data(pp, size);
+ }
break;
+ }
default:
erl_exit(ERTS_ABORT_EXIT,
"Invalid port task type: %d\n",
diff --git a/erts/emulator/beam/erl_port_task.h b/erts/emulator/beam/erl_port_task.h
index c41f8104c7..9976604042 100644
--- a/erts/emulator/beam/erl_port_task.h
+++ b/erts/emulator/beam/erl_port_task.h
@@ -66,16 +66,20 @@ typedef enum {
extern erts_smp_atomic_t erts_port_task_outstanding_io_tasks;
#endif
-#define ERTS_PTS_FLG_IN_RUNQ (((erts_aint32_t) 1) << 0)
-#define ERTS_PTS_FLG_EXEC (((erts_aint32_t) 1) << 1)
-#define ERTS_PTS_FLG_HAVE_TASKS (((erts_aint32_t) 1) << 2)
-#define ERTS_PTS_FLG_EXIT (((erts_aint32_t) 1) << 3)
-#define ERTS_PTS_FLG_BUSY (((erts_aint32_t) 1) << 4)
-#define ERTS_PTS_FLG_HAVE_BUSY_TASKS (((erts_aint32_t) 1) << 5)
-#define ERTS_PTS_FLG_HAVE_NS_TASKS (((erts_aint32_t) 1) << 6)
-#define ERTS_PTS_FLG_PARALLELISM (((erts_aint32_t) 1) << 7)
-#define ERTS_PTS_FLG_FORCE_SCHED (((erts_aint32_t) 1) << 8)
+#define ERTS_PTS_FLG_IN_RUNQ (((erts_aint32_t) 1) << 0)
+#define ERTS_PTS_FLG_EXEC (((erts_aint32_t) 1) << 1)
+#define ERTS_PTS_FLG_HAVE_TASKS (((erts_aint32_t) 1) << 2)
+#define ERTS_PTS_FLG_EXIT (((erts_aint32_t) 1) << 3)
+#define ERTS_PTS_FLG_BUSY_PORT (((erts_aint32_t) 1) << 4)
+#define ERTS_PTS_FLG_BUSY_PORT_Q (((erts_aint32_t) 1) << 5)
+#define ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q (((erts_aint32_t) 1) << 6)
+#define ERTS_PTS_FLG_HAVE_BUSY_TASKS (((erts_aint32_t) 1) << 7)
+#define ERTS_PTS_FLG_HAVE_NS_TASKS (((erts_aint32_t) 1) << 8)
+#define ERTS_PTS_FLG_PARALLELISM (((erts_aint32_t) 1) << 9)
+#define ERTS_PTS_FLG_FORCE_SCHED (((erts_aint32_t) 1) << 10)
+#define ERTS_PTS_FLGS_BUSY \
+ (ERTS_PTS_FLG_BUSY_PORT | ERTS_PTS_FLG_BUSY_PORT_Q)
#define ERTS_PTS_FLGS_FORCE_SCHEDULE_OP \
(ERTS_PTS_FLG_EXIT \
@@ -84,6 +88,15 @@ extern erts_smp_atomic_t erts_port_task_outstanding_io_tasks;
| ERTS_PTS_FLG_EXEC \
| ERTS_PTS_FLG_FORCE_SCHED)
+#define ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_HIGH 8192
+#define ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_LOW 4096
+
+typedef struct {
+ ErlDrvSizeT high;
+ erts_smp_atomic_t low;
+ erts_smp_atomic_t size;
+} ErtsPortTaskBusyPortQ;
+
typedef struct ErtsPortTask_ ErtsPortTask;
typedef struct ErtsPortTaskBusyCallerTable_ ErtsPortTaskBusyCallerTable;
typedef struct ErtsPortTaskHandleList_ ErtsPortTaskHandleList;
@@ -104,6 +117,7 @@ typedef struct {
ErtsPortTask *first;
ErtsPortTask *last;
} in;
+ ErtsPortTaskBusyPortQ *bpq;
} taskq;
erts_smp_atomic32_t flags;
#ifdef ERTS_SMP
@@ -113,6 +127,8 @@ typedef struct {
ERTS_GLB_INLINE void erts_port_task_handle_init(ErtsPortTaskHandle *pthp);
ERTS_GLB_INLINE int erts_port_task_is_scheduled(ErtsPortTaskHandle *pthp);
+ERTS_GLB_INLINE void erts_port_task_pre_init_sched(ErtsPortTaskSched *ptsp,
+ ErtsPortTaskBusyPortQ *bpq);
ERTS_GLB_INLINE void erts_port_task_init_sched(ErtsPortTaskSched *ptsp,
Eterm id);
ERTS_GLB_INLINE void erts_port_task_fini_sched(ErtsPortTaskSched *ptsp);
@@ -138,6 +154,18 @@ erts_port_task_is_scheduled(ErtsPortTaskHandle *pthp)
return ((void *) erts_smp_atomic_read_nob(pthp)) != NULL;
}
+ERTS_GLB_INLINE void erts_port_task_pre_init_sched(ErtsPortTaskSched *ptsp,
+ ErtsPortTaskBusyPortQ *bpq)
+{
+ if (bpq) {
+ erts_aint_t low = (erts_aint_t) ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_LOW;
+ erts_smp_atomic_init_nob(&bpq->low, low);
+ bpq->high = (ErlDrvSizeT) ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_HIGH;
+ erts_smp_atomic_init_nob(&bpq->size, (erts_aint_t) 0);
+ }
+ ptsp->taskq.bpq = bpq;
+}
+
ERTS_GLB_INLINE void
erts_port_task_init_sched(ErtsPortTaskSched *ptsp, Eterm instr_id)
{
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 9024e9ab52..32f3a675fa 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -270,9 +270,10 @@ static Port *create_port(char *name,
Eterm pid,
int *enop)
{
+ ErtsPortTaskBusyPortQ *busy_port_queue;
Port *prt;
char *p;
- size_t port_size, size;
+ size_t port_size, busy_port_queue_size, size;
erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED;
erts_aint32_t x_pts_flgs = 0;
#ifdef DEBUG
@@ -288,7 +289,13 @@ static Port *create_port(char *name,
}
else
#endif
- port_size = size = sizeof(Port);
+ port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port));
+
+ busy_port_queue_size
+ = ((driver->flags & ERL_DRV_FLAG_NO_BUSY_MSGQ)
+ ? 0
+ : ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErtsPortTaskBusyPortQ)));
+ size += busy_port_queue_size;
size += sys_strlen(name) + 1;
@@ -302,6 +309,13 @@ static Port *create_port(char *name,
prt = (Port *) p;
p += port_size;
+ if (!busy_port_queue_size)
+ busy_port_queue = NULL;
+ else {
+ busy_port_queue = (ErtsPortTaskBusyPortQ *) p;
+ p += busy_port_queue_size;
+ }
+
#ifdef ERTS_SMP
if (driver_lock) {
prt->lock = driver_lock;
@@ -320,6 +334,8 @@ static Port *create_port(char *name,
prt->cleanup = 0;
#endif
+ erts_port_task_pre_init_sched(&prt->sched, busy_port_queue);
+
prt->name = p;
sys_strcpy(p, name);
prt->drv_ptr = driver;
@@ -503,8 +519,7 @@ erts_save_suspend_process_on_port(Port *prt, Process *process)
erts_aint32_t flags;
erts_port_task_sched_lock(&prt->sched);
flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
- saved = ((flags & (ERTS_PTS_FLG_BUSY
- | ERTS_PTS_FLG_EXIT)) == ERTS_PTS_FLG_BUSY);
+ saved = (flags & ERTS_PTS_FLGS_BUSY) && !(flags & ERTS_PTS_FLG_EXIT);
if (saved)
erts_proclist_store_last(&prt->suspended, erts_proclist_create(process));
erts_port_task_sched_unlock(&prt->sched);
@@ -1530,7 +1545,7 @@ bad_port_signal(Process *c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = flags;
+ sigdp->flags = (flags & ~ERTS_P2P_SIG_TYPE_MASK) | ERTS_P2P_SIG_TYPE_BAD;
return erts_schedule_proc2port_signal(c_p,
prt,
@@ -1736,7 +1751,7 @@ erts_port_output(Process *c_p,
erts_driver_t *drv = prt->drv_ptr;
size_t size;
int try_call;
- erts_aint32_t sched_flags, busy_flag, invalid_flags;
+ erts_aint32_t sched_flags, busy_flgs, invalid_flags;
int task_flags;
ErtsProc2PortSigCallback port_sig_callback;
ErlDrvBinary *cbin = NULL;
@@ -1747,10 +1762,10 @@ erts_port_output(Process *c_p,
| ERTS_PORT_SIG_FLG_NOSUSPEND
| ERTS_PORT_SIG_FLG_FORCE)) == 0);
- busy_flag = ((flags & ERTS_PORT_SIG_FLG_FORCE)
+ busy_flgs = ((flags & ERTS_PORT_SIG_FLG_FORCE)
? ((erts_aint32_t) 0)
- : ERTS_PTS_FLG_BUSY);
- invalid_flags = busy_flag;
+ : ERTS_PTS_FLGS_BUSY);
+ invalid_flags = busy_flgs;
if (!refp)
invalid_flags |= ERTS_PTS_FLG_PARALLELISM;
@@ -1759,10 +1774,10 @@ erts_port_output(Process *c_p,
*/
sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
- if (sched_flags & (busy_flag|ERTS_PTS_FLG_EXIT))
- return ((sched_flags & busy_flag)
- ? ERTS_PORT_OP_BUSY
- : ERTS_PORT_OP_DROPPED);
+ if (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT))
+ return ((sched_flags & ERTS_PTS_FLG_EXIT)
+ ? ERTS_PORT_OP_DROPPED
+ : ERTS_PORT_OP_BUSY);
try_call = !(sched_flags & (invalid_flags|ERTS_PTS_FLGS_FORCE_SCHEDULE_OP));
@@ -1949,6 +1964,7 @@ erts_port_output(Process *c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
+ sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV;
sigdp->u.outputv.from = from;
sigdp->u.outputv.evp = evp;
sigdp->u.outputv.cbinp = cbin;
@@ -2038,6 +2054,7 @@ erts_port_output(Process *c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
+ sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUT;
sigdp->u.output.from = from;
sigdp->u.output.bufp = buf;
sigdp->u.output.size = size;
@@ -2045,7 +2062,7 @@ erts_port_output(Process *c_p,
}
task_flags = ERTS_PT_FLG_WAIT_BUSY;
- sigdp->flags = flags;
+ sigdp->flags |= flags;
if (flags & (ERTS_P2P_SIG_DATA_FLG_FORCE|ERTS_P2P_SIG_DATA_FLG_NOSUSPEND)) {
task_flags = 0;
if (flags & ERTS_P2P_SIG_DATA_FLG_FORCE)
@@ -2070,7 +2087,7 @@ erts_port_output(Process *c_p,
return res;
}
- if ((sched_flags & (busy_flag|ERTS_PTS_FLG_EXIT)) == ERTS_PTS_FLG_BUSY)
+ if (!(sched_flags & ERTS_PTS_FLG_EXIT) && (sched_flags & busy_flgs))
return ERTS_PORT_OP_BUSY_SCHEDULED;
return res;
@@ -2201,7 +2218,7 @@ erts_port_exit(Process *c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = flags;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_EXIT | flags;
sigdp->u.exit.from = from;
if (is_immed(reason)) {
@@ -2367,7 +2384,7 @@ erts_port_connect(Process *c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = flags;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_CONNECT | flags;
sigdp->u.connect.from = from;
sigdp->u.connect.connected = connect_id;
@@ -2424,7 +2441,7 @@ erts_port_unlink(Process *c_p, Port *prt, Eterm from, Eterm *refp)
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_UNLINK;
sigdp->u.unlink.from = from;
return erts_schedule_proc2port_signal(c_p,
@@ -2509,7 +2526,7 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp)
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_LINK;
sigdp->u.link.port = prt->common.id;
sigdp->u.link.to = to;
@@ -3851,7 +3868,7 @@ erts_port_control(Process* c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_CONTROL;
sigdp->u.control.binp = binp;
sigdp->u.control.command = command;
sigdp->u.control.bufp = bufp;
@@ -4131,7 +4148,7 @@ erts_port_call(Process* c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_CALL;
sigdp->u.call.command = command;
sigdp->u.call.bufp = bufp;
sigdp->u.call.size = size;
@@ -4298,7 +4315,7 @@ erts_port_info(Process* c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_INFO;
sigdp->u.info.item = item;
return erts_schedule_proc2port_signal(c_p,
@@ -4383,7 +4400,7 @@ erts_port_set_data(Process* c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_SET_DATA;
sigdp->u.set_data.data = set_data;
sigdp->u.set_data.bp = bp;
@@ -4516,7 +4533,7 @@ erts_port_get_data(Process* c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_GET_DATA;
return erts_schedule_proc2port_signal(c_p,
prt,
@@ -4608,8 +4625,8 @@ set_busy_port(ErlDrvPort dprt, int on)
if (on) {
flags = erts_smp_atomic32_read_bor_acqb(&prt->sched.flags,
- ERTS_PTS_FLG_BUSY);
- if (flags & ERTS_PTS_FLG_BUSY)
+ ERTS_PTS_FLG_BUSY_PORT);
+ if (flags & ERTS_PTS_FLG_BUSY_PORT)
return; /* Already busy */
if (flags & ERTS_PTS_FLG_HAVE_NS_TASKS)
@@ -4623,11 +4640,9 @@ set_busy_port(ErlDrvPort dprt, int on)
}
#endif
} else {
- ErtsProcList *plp;
-
flags = erts_smp_atomic32_read_band_acqb(&prt->sched.flags,
- ~ERTS_PTS_FLG_BUSY);
- if (!(flags & ERTS_PTS_FLG_BUSY))
+ ~ERTS_PTS_FLG_BUSY_PORT);
+ if (!(flags & ERTS_PTS_FLG_BUSY_PORT))
return; /* Already non-busy */
#ifdef USE_VM_PROBES
@@ -4645,52 +4660,60 @@ set_busy_port(ErlDrvPort dprt, int on)
erts_dist_port_not_busy(prt);
}
- /*
- * Resume, in a round-robin fashion, all processes waiting on the port.
- *
- * This version submitted by Tony Rogvall. The earlier version used
- * to resume the processes in order, which caused starvation of all but
- * the first process.
- */
+ if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q))
+ erts_port_resume_procs(prt);
+ }
+}
- erts_port_task_sched_lock(&prt->sched);
- plp = prt->suspended;
- prt->suspended = NULL;
- erts_port_task_sched_unlock(&prt->sched);
+void
+erts_port_resume_procs(Port *prt)
+{
+ /*
+ * Resume, in a round-robin fashion, all processes waiting on the port.
+ *
+ * This version submitted by Tony Rogvall. The earlier version used
+ * to resume the processes in order, which caused starvation of all but
+ * the first process.
+ */
+ ErtsProcList *plp;
- if (erts_proclist_fetch(&plp, NULL)) {
+ erts_port_task_sched_lock(&prt->sched);
+ plp = prt->suspended;
+ prt->suspended = NULL;
+ erts_port_task_sched_unlock(&prt->sched);
+
+ if (erts_proclist_fetch(&plp, NULL)) {
#ifdef USE_VM_PROBES
- /*
- * Hrm, for blocked dist ports, plp always seems to be NULL.
- * That's not so fun.
- * Well, another way to get the same info is using a D
- * script to correlate an earlier process-port_blocked+pid
- * event with a later process-scheduled event. That's
- * subject to the multi-CPU races with how events are
- * handled, but hey, that way works most of the time.
- */
- if (DTRACE_ENABLED(process_port_unblocked)) {
- DTRACE_CHARBUF(pid_str, 16);
- ErtsProcList* plp2 = plp;
-
- erts_snprintf(port_str, sizeof(port_str),
- "%T", prt->common.id);
- while (plp2 != NULL) {
- erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid);
- DTRACE2(process_port_unblocked, pid_str, port_str);
- }
- }
-#endif
+ /*
+ * Hrm, for blocked dist ports, plp always seems to be NULL.
+ * That's not so fun.
+ * Well, another way to get the same info is using a D
+ * script to correlate an earlier process-port_blocked+pid
+ * event with a later process-scheduled event. That's
+ * subject to the multi-CPU races with how events are
+ * handled, but hey, that way works most of the time.
+ */
+ if (DTRACE_ENABLED(process_port_unblocked)) {
+ DTRACE_CHARBUF(port_str, 16);
+ DTRACE_CHARBUF(pid_str, 16);
+ ErtsProcList* plp2 = plp;
- /* First proc should be resumed last */
- if (plp->next) {
- plp->next->prev = NULL;
- erts_resume_processes(plp->next);
- plp->next = NULL;
+ erts_snprintf(port_str, sizeof(port_str), "%T", prt->common.id);
+ while (plp2 != NULL) {
+ erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid);
+ DTRACE2(process_port_unblocked, pid_str, port_str);
}
- erts_resume_processes(plp);
- }
+ }
+#endif
+
+ /* First proc should be resumed last */
+ if (plp->next) {
+ plp->next->prev = NULL;
+ erts_resume_processes(plp->next);
+ plp->next = NULL;
+ }
+ erts_resume_processes(plp);
}
}
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c
index 4a87b0a385..f68ce68407 100644
--- a/erts/emulator/drivers/common/inet_drv.c
+++ b/erts/emulator/drivers/common/inet_drv.c
@@ -692,6 +692,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#define INET_LOPT_UDP_READ_PACKETS 33 /* Number of packets to read */
#define INET_OPT_RAW 34 /* Raw socket options */
#define INET_LOPT_TCP_SEND_TIMEOUT_CLOSE 35 /* auto-close on send timeout or not */
+#define INET_LOPT_TCP_MSGQ_HIWTRMRK 36 /* set local high watermark */
+#define INET_LOPT_TCP_MSGQ_LOWTRMRK 37 /* set local low watermark */
/* SCTP options: a separate range, from 100: */
#define SCTP_OPT_RTOINFO 100
#define SCTP_OPT_ASSOCINFO 101
@@ -808,6 +810,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#define INET_HIGH_WATERMARK (1024*8) /* 8k pending high => busy */
#define INET_LOW_WATERMARK (1024*4) /* 4k pending => allow more */
+#define INET_HIGH_MSGQ_WATERMARK (1024*8) /* 8k pending high => busy */
+#define INET_LOW_MSGQ_WATERMARK (1024*4) /* 4k pending => allow more */
#define INET_INFINITY 0xffffffff /* infinity value */
@@ -5537,6 +5541,28 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
}
continue;
+ case INET_LOPT_TCP_MSGQ_HIWTRMRK:
+ if (desc->stype == SOCK_STREAM) {
+ ErlDrvSizeT high;
+ if (ival < ERL_DRV_BUSY_MSGQ_LIM_MIN
+ || ERL_DRV_BUSY_MSGQ_LIM_MAX < ival)
+ return -1;
+ high = (ErlDrvSizeT) ival;
+ erl_drv_busy_msgq_limits(desc->port, NULL, &high);
+ }
+ continue;
+
+ case INET_LOPT_TCP_MSGQ_LOWTRMRK:
+ if (desc->stype == SOCK_STREAM) {
+ ErlDrvSizeT low;
+ if (ival < ERL_DRV_BUSY_MSGQ_LIM_MIN
+ || ERL_DRV_BUSY_MSGQ_LIM_MAX < ival)
+ return -1;
+ low = (ErlDrvSizeT) ival;
+ erl_drv_busy_msgq_limits(desc->port, &low, NULL);
+ }
+ continue;
+
case INET_LOPT_TCP_SEND_TIMEOUT:
if (desc->stype == SOCK_STREAM) {
tcp_descriptor* tdesc = (tcp_descriptor*) desc;
@@ -6422,6 +6448,32 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc,
}
continue;
+ case INET_LOPT_TCP_MSGQ_HIWTRMRK:
+ if (desc->stype == SOCK_STREAM) {
+ ErlDrvSizeT high = ERL_DRV_BUSY_MSGQ_READ_ONLY;
+ *ptr++ = opt;
+ erl_drv_busy_msgq_limits(desc->port, NULL, &high);
+ ival = high > INT_MAX ? INT_MAX : (int) high;
+ put_int32(ival, ptr);
+ }
+ else {
+ TRUNCATE_TO(0,ptr);
+ }
+ continue;
+
+ case INET_LOPT_TCP_MSGQ_LOWTRMRK:
+ if (desc->stype == SOCK_STREAM) {
+ ErlDrvSizeT low = ERL_DRV_BUSY_MSGQ_READ_ONLY;
+ *ptr++ = opt;
+ erl_drv_busy_msgq_limits(desc->port, &low, NULL);
+ ival = low > INT_MAX ? INT_MAX : (int) low;
+ put_int32(ival, ptr);
+ }
+ else {
+ TRUNCATE_TO(0,ptr);
+ }
+ continue;
+
case INET_LOPT_TCP_SEND_TIMEOUT:
if (desc->stype == SOCK_STREAM) {
*ptr++ = opt;
@@ -8029,6 +8081,7 @@ static int tcp_inet_init(void)
static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args)
{
+ ErlDrvSizeT q_low, q_high;
tcp_descriptor* desc;
DEBUGF(("tcp_inet_start(%ld) {\r\n", (long)port));
@@ -8038,6 +8091,17 @@ static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args)
return ERL_DRV_ERROR_ERRNO;
desc->high = INET_HIGH_WATERMARK;
desc->low = INET_LOW_WATERMARK;
+ q_high = INET_HIGH_MSGQ_WATERMARK;
+ q_low = INET_LOW_MSGQ_WATERMARK;
+ if (q_low < ERL_DRV_BUSY_MSGQ_LIM_MIN)
+ q_low = ERL_DRV_BUSY_MSGQ_LIM_MIN;
+ else if (q_low > ERL_DRV_BUSY_MSGQ_LIM_MAX)
+ q_low = ERL_DRV_BUSY_MSGQ_LIM_MAX;
+ if (q_high < ERL_DRV_BUSY_MSGQ_LIM_MIN)
+ q_high = ERL_DRV_BUSY_MSGQ_LIM_MIN;
+ else if (q_high > ERL_DRV_BUSY_MSGQ_LIM_MAX)
+ q_high = ERL_DRV_BUSY_MSGQ_LIM_MAX;
+ erl_drv_busy_msgq_limits(port, &q_low, &q_high);
desc->send_timeout = INET_INFINITY;
desc->send_timeout_close = 0;
desc->busy_on_send = 0;
@@ -8061,6 +8125,7 @@ static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args)
static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s,
ErlDrvTermData owner, int* err)
{
+ ErlDrvSizeT q_low, q_high;
ErlDrvPort port = desc->inet.port;
tcp_descriptor* copy_desc;
@@ -8099,6 +8164,13 @@ static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s,
FREE(copy_desc);
return NULL;
}
+
+ /* Read busy msgq limits of parent */
+ q_low = q_high = ERL_DRV_BUSY_MSGQ_READ_ONLY;
+ erl_drv_busy_msgq_limits(desc->inet.port, &q_low, &q_high);
+ /* Write same busy msgq limits to child */
+ erl_drv_busy_msgq_limits(port, &q_low, &q_high);
+
copy_desc->inet.port = port;
copy_desc->inet.dport = driver_mk_port(port);
*err = 0;