aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2012-11-29 01:24:43 +0100
committerRickard Green <[email protected]>2012-12-07 00:24:27 +0100
commit9e4895da833b7777e69efc173f5dc777aaea3201 (patch)
tree02338dbdbf0b449b7b643437bf2cdd0cf73e4615
parent43ebafb5fb40aee326b951d18c1880e6e5fdef6b (diff)
downloadotp-9e4895da833b7777e69efc173f5dc777aaea3201.tar.gz
otp-9e4895da833b7777e69efc173f5dc777aaea3201.tar.bz2
otp-9e4895da833b7777e69efc173f5dc777aaea3201.zip
Add support for busy port message queue
-rw-r--r--erts/doc/src/driver_entry.xml16
-rw-r--r--erts/doc/src/erl_driver.xml81
-rw-r--r--erts/emulator/beam/dist.c19
-rw-r--r--erts/emulator/beam/erl_driver.h10
-rw-r--r--erts/emulator/beam/erl_port.h64
-rw-r--r--erts/emulator/beam/erl_port_task.c431
-rw-r--r--erts/emulator/beam/erl_port_task.h46
-rw-r--r--erts/emulator/beam/io.c163
-rw-r--r--erts/emulator/drivers/common/inet_drv.c72
-rw-r--r--erts/preloaded/ebin/prim_inet.beambin66224 -> 70260 bytes
-rw-r--r--erts/preloaded/src/prim_inet.erl6
-rw-r--r--lib/kernel/doc/src/inet.xml64
-rw-r--r--lib/kernel/src/gen_tcp.erl4
-rw-r--r--lib/kernel/src/inet.erl11
-rw-r--r--lib/kernel/src/inet_int.hrl2
15 files changed, 779 insertions, 210 deletions
diff --git a/erts/doc/src/driver_entry.xml b/erts/doc/src/driver_entry.xml
index a2efdf3ebc..d65a38570d 100644
--- a/erts/doc/src/driver_entry.xml
+++ b/erts/doc/src/driver_entry.xml
@@ -377,11 +377,11 @@ typedef struct erl_drv_entry {
<tag><marker id="driver_flags"/>int driver_flags</tag>
<item>
- <p>This field is used to pass driver capability information to the
- runtime system. If the <c>extended_marker</c> field equals
- <c>ERL_DRV_EXTENDED_MARKER</c>, it should contain <c>0</c> or
- driver flags (<c>ERL_DRV_FLAG_*</c>) ored bitwise. Currently
- the following driver flags exist:
+ <p>This field is used to pass driver capability and other
+ information to the runtime system. If the
+ <c>extended_marker</c> field equals <c>ERL_DRV_EXTENDED_MARKER</c>,
+ it should contain <c>0</c> or driver flags (<c>ERL_DRV_FLAG_*</c>)
+ ored bitwise. Currently the following driver flags exist:
</p>
<taglist>
<tag><c>ERL_DRV_FLAG_USE_PORT_LOCKING</c></tag>
@@ -404,6 +404,12 @@ typedef struct erl_drv_entry {
by the Erlang distribution (the behaviour has always been
required by drivers used by the distribution).
</item>
+ <tag><c>ERL_DRV_FLAG_NO_BUSY_MSGQ</c></tag>
+ <item>Disable busy port message queue functionality. For
+ more information, see the documentation of the
+ <seealso marker="erl_driver#erl_drv_busy_msgq_limits">erl_drv_busy_msgq_limits()</seealso>
+ function.
+ </item>
</taglist>
</item>
<tag>void *handle2</tag>
diff --git a/erts/doc/src/erl_driver.xml b/erts/doc/src/erl_driver.xml
index fcce962557..903b8dfe64 100644
--- a/erts/doc/src/erl_driver.xml
+++ b/erts/doc/src/erl_driver.xml
@@ -1500,16 +1500,81 @@ typedef struct ErlIOVec {
</desc>
</func>
<func>
+ <name><ret>void</ret><nametext>erl_drv_busy_msgq_limits(ErlDrvPort port, ErlDrvSizeT *low, ErlDrvSizeT *high)</nametext></name>
+ <fsummary>Set and get limits for busy port message queue</fsummary>
+ <desc>
+ <marker id="erl_drv_busy_msgq_limits"></marker>
+ <p>Sets and gets limits that will be used for controling the
+ busy state of the port message queue.</p>
+ <p>The port message queue will be set into a busy
+ state when the amount of command data queued on the
+ message queue reaches the <c>high</c> limit. The port
+ message queue will be set into a not busy state when the
+ amount of command data queued on the message queue falls
+ below the <c>low</c> limit. Command data is in this
+ context data passed to the port using either
+ <c>Port ! {Owner, {command, Data}}</c>, or
+ <c>port_command/[2,3]</c>. Note that these limits
+ only concerns command data that have not yet reached the
+ port. The <seealso marker="#set_busy_port">busy port</seealso>
+ feature can be used for data that has reached the port.</p>
+
+ <p>Valid limits are values in the range
+ <c>[ERL_DRV_BUSY_MSGQ_LIM_MIN, ERL_DRV_BUSY_MSGQ_LIM_MAX]</c>.
+ Limits will be automatically adjusted to be sane. That is,
+ the system will adjust values so that the low limit used is
+ lower or equal to the high limit used. By default the high
+ limit will be 8 kB and the low limit will be 4 kB.</p>
+
+ <p>By passing a pointer to an integer variable containing
+ the value <c>ERL_DRV_BUSY_MSGQ_READ_ONLY</c>, currently used
+ limit will be read and written back to the integer variable.
+ A new limit can be set by passing a pointer to an integer
+ variable containing a valid limit. The passed value will be
+ written to the internal limit. The internal limit will then
+ be adjusted. After this the adjusted limit will be written
+ back to the integer variable from which the new value was
+ read. Values are in bytes.</p>
+
+ <p>The busy message queue feature can be disabled either
+ by setting the <c>ERL_DRV_FLAG_NO_BUSY_MSGQ</c>
+ <seealso marker="driver_entry#driver_flags">driver flag</seealso>
+ in the <seealso marker="driver_entry">driver_entry</seealso>
+ used by the driver, or by calling this function with
+ <c>ERL_DRV_BUSY_MSGQ_DISABLED</c> as a limit (either low or
+ high). When this feature has been disabled it cannot be
+ enabled again. When reading the limits both of them
+ will be <c>ERL_DRV_BUSY_MSGQ_DISABLED</c>, if this
+ feature has been disabled.</p>
+
+ <p>Processes sending command data to the port will be suspended
+ if either the port is busy or if the port message queue is
+ busy. Suspended processes will be resumed when neither the
+ port is busy, nor the port message queue is busy.</p>
+
+ <p>For information about busy port functionality
+ see the documentation of the
+ <seealso marker="#set_busy_port">set_busy_port()</seealso>
+ function.</p>
+ </desc>
+ </func>
+ <func>
<name><ret>void</ret><nametext>set_busy_port(ErlDrvPort port, int on)</nametext></name>
<fsummary>Signal or unsignal port as busy</fsummary>
<desc>
<marker id="set_busy_port"></marker>
- <p>This function set and resets the busy status of the port. If
- <c>on</c> is 1, the port is set to busy, if it's 0 the port
- is set to not busy.</p>
- <p>When the port is busy, sending to it with <c>Port ! Data</c>
- or <c>port_command/2</c>, will block the port owner process,
- until the port is signaled as not busy.</p>
+ <p>This function set and unset the busy state of the port. If
+ <c>on</c> is non-zero, the port is set to busy, if it's zero the port
+ is set to not busy. You typically want to combine
+ this feature with the <seealso marker="#erl_drv_busy_msgq_limits">busy
+ port message queue</seealso> functionality.</p>
+ <p>Processes sending command data to the port will be suspended
+ if either the port is busy or if the port message queue
+ is busy. Suspended processes will be resumed when neither the
+ port is busy, nor the port message queue is busy. Command data
+ is in this context data passed to the port using either
+ <c>Port ! {Owner, {command, Data}}</c>, or
+ <c>port_command/[2,3]</c>.</p>
<p>If the
<seealso marker="driver_entry#driver_flags"><![CDATA[ERL_DRV_FLAG_SOFT_BUSY]]></seealso>
has been set in the
@@ -1518,6 +1583,10 @@ typedef struct ErlIOVec {
<seealso marker="erlang#port_command/3">port_command(Port, Data, [force])</seealso>
even though the driver has signaled that it is busy.
</p>
+ <p>For information about busy port message queue functionality
+ see the documentation of the
+ <seealso marker="#erl_drv_busy_msgq_limits">erl_drv_busy_msgq_limits()</seealso>
+ function.</p>
</desc>
</func>
<func>
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index a939b90eac..fd5efd89f1 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -2039,7 +2039,7 @@ erts_dist_command(Port *prt, int reds_limit)
if (reds > reds_limit)
goto preempted;
- if (!(sched_flags & ERTS_PTS_FLG_BUSY) && foq.first) {
+ if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) {
int preempt = 0;
do {
Uint size;
@@ -2057,7 +2057,7 @@ erts_dist_command(Port *prt, int reds_limit)
free_dist_obuf(fob);
sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT);
- if (sched_flags & ERTS_PTS_FLG_BUSY)
+ if (sched_flags & ERTS_PTS_FLG_BUSY_PORT)
break;
} while (foq.first && !preempt);
if (!foq.first)
@@ -2066,7 +2066,7 @@ erts_dist_command(Port *prt, int reds_limit)
goto preempted;
}
- if (sched_flags & ERTS_PTS_FLG_BUSY) {
+ if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) {
if (oq.first) {
ErtsDistOutputBuf *ob;
int preempt;
@@ -2139,7 +2139,7 @@ erts_dist_command(Port *prt, int reds_limit)
free_dist_obuf(fob);
sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT);
- if ((sched_flags & ERTS_PTS_FLG_BUSY) && oq.first && !preempt)
+ if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt)
goto finalize_only;
}
@@ -2168,7 +2168,7 @@ erts_dist_command(Port *prt, int reds_limit)
ASSERT(dep->qsize >= obufsize);
dep->qsize -= obufsize;
obufsize = 0;
- if (!(sched_flags & ERTS_PTS_FLG_BUSY)
+ if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT)
&& (dep->qflgs & ERTS_DE_QFLG_BUSY)
&& dep->qsize < erts_dist_buf_busy_limit) {
ErtsProcList *suspendees;
@@ -2692,6 +2692,15 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
+ /*
+ * Dist-ports do not use the "busy port message queue" functionality, but
+ * instead use "busy dist entry" functionality.
+ */
+ {
+ ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
+ erl_drv_busy_msgq_limits((ErlDrvPort) pp, &disable, NULL);
+ }
+
pp->dist_entry = dep;
dep->version = version;
diff --git a/erts/emulator/beam/erl_driver.h b/erts/emulator/beam/erl_driver.h
index fb9e92e44b..cbc7bd4ede 100644
--- a/erts/emulator/beam/erl_driver.h
+++ b/erts/emulator/beam/erl_driver.h
@@ -157,6 +157,7 @@ typedef struct {
#define ERL_DRV_FLAG_USE_PORT_LOCKING (1 << 0)
#define ERL_DRV_FLAG_SOFT_BUSY (1 << 1)
+#define ERL_DRV_FLAG_NO_BUSY_MSGQ (1 << 2)
/*
* Integer types
@@ -384,9 +385,18 @@ typedef struct erl_drv_entry {
ErlDrvEntry* driver_init(void)
#endif
+#define ERL_DRV_BUSY_MSGQ_DISABLED (~((ErlDrvSizeT) 0))
+#define ERL_DRV_BUSY_MSGQ_READ_ONLY ((ErlDrvSizeT) 0)
+#define ERL_DRV_BUSY_MSGQ_LIM_MAX (ERL_DRV_BUSY_MSGQ_DISABLED - 1)
+#define ERL_DRV_BUSY_MSGQ_LIM_MIN ((ErlDrvSizeT) 1)
+
/*
* These are the functions available for driver writers.
*/
+EXTERN void erl_drv_busy_msgq_limits(ErlDrvPort port,
+ ErlDrvSizeT *low,
+ ErlDrvSizeT *high);
+
EXTERN int driver_select(ErlDrvPort port, ErlDrvEvent event, int mode, int on);
EXTERN int driver_event(ErlDrvPort port, ErlDrvEvent event,
ErlDrvEventData event_data);
diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h
index 1eb6dea710..67c8c28f3f 100644
--- a/erts/emulator/beam/erl_port.h
+++ b/erts/emulator/beam/erl_port.h
@@ -761,15 +761,36 @@ erts_port_driver_callback_epilogue(Port *prt, erts_aint32_t *statep)
#endif /* #if ERTS_GLB_INLINE_INCL_FUNC_DEF */
+void erts_port_resume_procs(Port *);
+
struct binary;
-#define ERTS_P2P_SIG_DATA_FLG_BANG_OP (1 << 0)
-#define ERTS_P2P_SIG_DATA_FLG_REPLY (1 << 1)
-#define ERTS_P2P_SIG_DATA_FLG_NOSUSPEND (1 << 2)
-#define ERTS_P2P_SIG_DATA_FLG_FORCE (1 << 3)
-#define ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT (1 << 4)
-#define ERTS_P2P_SIG_DATA_FLG_BROKEN_LINK (1 << 5)
-#define ERTS_P2P_SIG_DATA_FLG_SCHED (1 << 6)
+#define ERTS_P2P_SIG_TYPE_BAD 0
+#define ERTS_P2P_SIG_TYPE_OUTPUT 1
+#define ERTS_P2P_SIG_TYPE_OUTPUTV 2
+#define ERTS_P2P_SIG_TYPE_CONNECT 3
+#define ERTS_P2P_SIG_TYPE_EXIT 4
+#define ERTS_P2P_SIG_TYPE_CONTROL 5
+#define ERTS_P2P_SIG_TYPE_CALL 6
+#define ERTS_P2P_SIG_TYPE_INFO 7
+#define ERTS_P2P_SIG_TYPE_LINK 8
+#define ERTS_P2P_SIG_TYPE_UNLINK 9
+#define ERTS_P2P_SIG_TYPE_SET_DATA 10
+#define ERTS_P2P_SIG_TYPE_GET_DATA 11
+
+#define ERTS_P2P_SIG_TYPE_BITS 4
+#define ERTS_P2P_SIG_TYPE_MASK \
+ ((1 << ERTS_P2P_SIG_TYPE_BITS) - 1)
+
+#define ERTS_P2P_SIG_DATA_FLG(N) \
+ (1 << (ERTS_P2P_SIG_TYPE_BITS + (N)))
+#define ERTS_P2P_SIG_DATA_FLG_BANG_OP ERTS_P2P_SIG_DATA_FLG(0)
+#define ERTS_P2P_SIG_DATA_FLG_REPLY ERTS_P2P_SIG_DATA_FLG(1)
+#define ERTS_P2P_SIG_DATA_FLG_NOSUSPEND ERTS_P2P_SIG_DATA_FLG(2)
+#define ERTS_P2P_SIG_DATA_FLG_FORCE ERTS_P2P_SIG_DATA_FLG(3)
+#define ERTS_P2P_SIG_DATA_FLG_BAD_OUTPUT ERTS_P2P_SIG_DATA_FLG(4)
+#define ERTS_P2P_SIG_DATA_FLG_BROKEN_LINK ERTS_P2P_SIG_DATA_FLG(5)
+#define ERTS_P2P_SIG_DATA_FLG_SCHED ERTS_P2P_SIG_DATA_FLG(6)
struct ErtsProc2PortSigData_ {
int flags;
@@ -823,6 +844,35 @@ struct ErtsProc2PortSigData_ {
} u;
} ;
+ERTS_GLB_INLINE int
+erts_proc2port_sig_is_command_op(ErtsProc2PortSigData *sigdp);
+ERTS_GLB_INLINE ErlDrvSizeT
+erts_proc2port_sig_command_data_size(ErtsProc2PortSigData *sigdp);
+
+#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+
+ERTS_GLB_INLINE int
+erts_proc2port_sig_is_command_op(ErtsProc2PortSigData *sigdp)
+{
+ switch (sigdp->flags & ERTS_P2P_SIG_TYPE_MASK) {
+ case ERTS_P2P_SIG_TYPE_OUTPUT: return !0;
+ case ERTS_P2P_SIG_TYPE_OUTPUTV: return !0;
+ default: return 0;
+ }
+}
+
+ERTS_GLB_INLINE ErlDrvSizeT
+erts_proc2port_sig_command_data_size(ErtsProc2PortSigData *sigdp)
+{
+ switch (sigdp->flags & ERTS_P2P_SIG_TYPE_MASK) {
+ case ERTS_P2P_SIG_TYPE_OUTPUT: return sigdp->u.output.size;
+ case ERTS_P2P_SIG_TYPE_OUTPUTV: return sigdp->u.outputv.evp->size;
+ default: return (ErlDrvSizeT) 0;
+ }
+}
+
+#endif
+
#define ERTS_PROC2PORT_SIG_EXEC 0
#define ERTS_PROC2PORT_SIG_ABORT 1
#define ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND 2
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index b6b113b753..89f6a4e774 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -539,103 +539,257 @@ set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
}
}
+
/*
- * No-suspend handles.
+ * Busy port queue management
*/
-#ifdef ERTS_SMP
-static void
-free_port_task_handle_list(void *vpthlp)
+static erts_aint32_t
+check_unset_busy_port_q(Port *pp,
+ erts_aint32_t flags,
+ ErtsPortTaskBusyPortQ *bpq)
{
- erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp);
-}
-#endif
+ ErlDrvSizeT qsize, low;
+ int resume_procs = 0;
-static void
-schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp)
-{
-#ifdef ERTS_SMP
- erts_schedule_thr_prgr_later_op(free_port_task_handle_list,
- (void *) pthlp,
- &pthlp->u.release);
-#else
- erts_free(ERTS_ALC_T_PT_HNDL_LIST, pthlp);
-#endif
+ ASSERT(bpq);
+ ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(pp));
+
+ erts_port_task_sched_lock(&pp->sched);
+ qsize = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size);
+ low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low);
+ if (qsize < low) {
+ erts_aint32_t mask = ~(ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
+ | ERTS_PTS_FLG_BUSY_PORT_Q);
+ flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags, mask);
+ if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
+ resume_procs = 1;
+ }
+ else if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) {
+ flags = erts_smp_atomic32_read_band_relb(&pp->sched.flags,
+ ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
+ flags &= ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
+ }
+ erts_port_task_sched_unlock(&pp->sched);
+ if (resume_procs)
+ erts_port_resume_procs(pp);
+
+ return flags;
}
static ERTS_INLINE void
-abort_nosuspend_task(ErtsPortTaskType type,
- ErtsPortTaskTypeData *tdp)
+aborted_proc2port_data(Port *pp, ErlDrvSizeT size)
{
+ ErtsPortTaskBusyPortQ *bpq;
+ erts_aint32_t flags;
+ ErlDrvSizeT qsz;
- if (type != ERTS_PORT_TASK_PROC_SIG)
- ERTS_INTERNAL_ERROR("Invalid no-suspend port task type");
+ ASSERT(pp->sched.taskq.bpq);
- tdp->psig.callback(NULL,
- ERTS_PORT_SFLG_INVALID,
- ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND,
- &tdp->psig.data);
-}
+ if (size == 0)
+ return;
+ bpq = pp->sched.taskq.bpq;
-static void
-save_nosuspend_handle(Port *pp, ErtsPortTask *ptp)
-{
- erts_aint32_t act;
- ErtsPortTaskHandleList *pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST,
- sizeof(ErtsPortTaskHandleList));
+ qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size,
+ (erts_aint_t) -size);
+ ASSERT(qsz + size > qsz);
+ flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ ASSERT(pp->sched.taskq.bpq);
+ if ((flags & (ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
+ | ERTS_PTS_FLG_BUSY_PORT_Q)) != ERTS_PTS_FLG_BUSY_PORT_Q)
+ return;
+ if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low))
+ erts_smp_atomic32_read_bor_nob(&pp->sched.flags,
+ ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
+}
- set_handle(ptp, &pthlp->handle);
+static ERTS_INLINE void
+dequeued_proc2port_data(Port *pp, ErlDrvSizeT size)
+{
+ ErtsPortTaskBusyPortQ *bpq;
+ erts_aint32_t flags;
+ ErlDrvSizeT qsz;
- ASSERT(ptp == handle2task(&pthlp->handle));
- ASSERT(ptp->u.alive.handle == &pthlp->handle);
+ ASSERT(pp->sched.taskq.bpq);
- act = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ if (size == 0)
+ return;
- if (!(act & ERTS_PTS_FLG_BUSY)) {
+ bpq = pp->sched.taskq.bpq;
- erts_port_task_sched_lock(&pp->sched);
- pthlp->u.next = pp->sched.taskq.local.busy.nosuspend;
- pp->sched.taskq.local.busy.nosuspend = pthlp;
- erts_port_task_sched_unlock(&pp->sched);
+ qsz = (ErlDrvSizeT) erts_smp_atomic_add_read_acqb(&bpq->size,
+ (erts_aint_t) -size);
+ ASSERT(qsz + size > qsz);
+ flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q))
+ return;
+ if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->low))
+ check_unset_busy_port_q(pp, flags, bpq);
+}
- act = erts_smp_atomic32_read_nob(&pp->sched.flags);
+static ERTS_INLINE erts_aint32_t
+enqueue_proc2port_data(Port *pp,
+ ErtsProc2PortSigData *sigdp,
+ erts_aint32_t flags)
+{
+ ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq;
+ if (sigdp && bpq) {
+ ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
+ if (size) {
+ erts_aint_t asize = erts_smp_atomic_add_read_acqb(&bpq->size,
+ (erts_aint_t) size);
+ ErlDrvSizeT qsz = (ErlDrvSizeT) asize;
+
+ ASSERT(qsz - size < qsz);
+
+ if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q) && qsz > bpq->high) {
+ flags = erts_smp_atomic32_read_bor_acqb(&pp->sched.flags,
+ ERTS_PTS_FLG_BUSY_PORT_Q);
+ flags |= ERTS_PTS_FLG_BUSY_PORT_Q;
+ qsz = (ErlDrvSizeT) erts_smp_atomic_read_acqb(&bpq->size);
+ if (qsz < (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low)) {
+ flags = (erts_smp_atomic32_read_bor_relb(
+ &pp->sched.flags,
+ ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q));
+ flags |= ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
+ }
+ }
+ ASSERT(!(flags & ERTS_PTS_FLG_EXIT));
+ }
+ }
+ return flags;
+}
- while (1) {
- erts_aint32_t exp, new;
+/*
+ * erl_drv_busy_msgq_limits() is called by drivers either reading or
+ * writing the limits.
+ *
+ * A limit of zero is interpreted as a read only request (using a
+ * limit of zero would not be useful). Other values are interpreted
+ * as a write-read request.
+ */
- if ((act & (ERTS_PTS_FLG_BUSY|ERTS_PTS_FLG_HAVE_NS_TASKS))
- == ERTS_PTS_FLG_HAVE_NS_TASKS)
- return;
+void
+erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp)
+{
+ Port *pp = erts_drvport2port(dport, NULL);
+ ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq;
+ int written = 0, resume_procs = 0;
+ ErlDrvSizeT low, high;
+
+ if (!pp || !bpq) {
+ if (lowp)
+ *lowp = ERL_DRV_BUSY_MSGQ_DISABLED;
+ if (highp)
+ *highp = ERL_DRV_BUSY_MSGQ_DISABLED;
+ return;
+ }
- if (act & ERTS_PTS_FLG_BUSY) {
- erts_aint32_t s;
- s = erts_smp_atomic32_cmpxchg_nob(&ptp->state,
- ERTS_PT_STATE_ABORTED,
- ERTS_PT_STATE_SCHEDULED);
- if (s == ERTS_PT_STATE_SCHEDULED) {
- reset_port_task_handle(&pthlp->handle);
- break; /* Abort task */
- }
- /* Else: someone else handled it */
- return;
- }
+ low = lowp ? *lowp : 0;
+ high = highp ? *highp : 0;
- new = exp = act;
+ erts_port_task_sched_lock(&pp->sched);
- new |= ERTS_PTS_FLG_HAVE_NS_TASKS;
+ if (low == ERL_DRV_BUSY_MSGQ_DISABLED
+ || high == ERL_DRV_BUSY_MSGQ_DISABLED) {
+ /* Disable busy msgq feature */
+ erts_aint32_t flags;
+ pp->sched.taskq.bpq = NULL;
+ flags = ~(ERTS_PTS_FLG_BUSY_PORT_Q|ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
+ flags = erts_smp_atomic32_read_band_acqb(&pp->sched.flags, flags);
+ if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
+ resume_procs = 1;
+ }
+ else {
- act = erts_smp_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
- if (act == exp)
- return;
+ if (!low)
+ low = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->low);
+ else {
+ if (bpq->high < low)
+ bpq->high = low;
+ erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low);
+ written = 1;
+ }
+
+ if (!high)
+ high = bpq->high;
+ else {
+ if (low > high) {
+ low = high;
+ erts_smp_atomic_set_relb(&bpq->low, (erts_aint_t) low);
+ }
+ bpq->high = high;
+ written = 1;
+ }
+ if (written) {
+ ErlDrvSizeT size = (ErlDrvSizeT) erts_smp_atomic_read_nob(&bpq->size);
+ if (size > high)
+ erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
+ ERTS_PTS_FLG_BUSY_PORT_Q);
+ else if (size < low)
+ erts_smp_atomic32_read_bor_relb(&pp->sched.flags,
+ ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
}
}
+ erts_port_task_sched_unlock(&pp->sched);
- abort_nosuspend_task(ptp->type, &ptp->u.alive.td);
+ if (resume_procs)
+ erts_port_resume_procs(pp);
+ if (lowp)
+ *lowp = low;
+ if (highp)
+ *highp = high;
}
+/*
+ * No-suspend handles.
+ */
+
+#ifdef ERTS_SMP
+static void
+free_port_task_handle_list(void *vpthlp)
+{
+ erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp);
+}
+#endif
+
+static void
+schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp)
+{
+#ifdef ERTS_SMP
+ erts_schedule_thr_prgr_later_op(free_port_task_handle_list,
+ (void *) pthlp,
+ &pthlp->u.release);
+#else
+ erts_free(ERTS_ALC_T_PT_HNDL_LIST, pthlp);
+#endif
+}
+
+static ERTS_INLINE void
+abort_nosuspend_task(Port *pp,
+ ErtsPortTaskType type,
+ ErtsPortTaskTypeData *tdp)
+{
+
+ ASSERT(type == ERTS_PORT_TASK_PROC_SIG);
+
+ if (!pp->sched.taskq.bpq)
+ tdp->psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND,
+ &tdp->psig.data);
+ else {
+ ErlDrvSizeT size = erts_proc2port_sig_command_data_size(&tdp->psig.data);
+ tdp->psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND,
+ &tdp->psig.data);
+ aborted_proc2port_data(pp, size);
+ }
+}
static ErtsPortTaskHandleList *
get_free_nosuspend_handles(Port *pp)
@@ -724,14 +878,29 @@ pop_port(ErtsRunQueue *runq)
* Task queue operations
*/
-static ERTS_INLINE erts_aint32_t
-enqueue_task(Port *pp, ErtsPortTask *ptp)
+static ERTS_INLINE int
+enqueue_task(Port *pp,
+ ErtsPortTask *ptp,
+ ErtsProc2PortSigData *sigdp,
+ ErtsPortTaskHandleList *ns_pthlp,
+ erts_aint32_t *flagsp)
+
{
+ int res;
+ erts_aint32_t fail_flags = ERTS_PTS_FLG_EXIT;
erts_aint32_t flags;
ptp->u.alive.next = NULL;
+ if (ns_pthlp)
+ fail_flags |= ERTS_PTS_FLG_BUSY_PORT;
erts_port_task_sched_lock(&pp->sched);
flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
- if (!(flags & ERTS_PTS_FLG_EXIT)) {
+ if (flags & fail_flags)
+ res = 0;
+ else {
+ if (ns_pthlp) {
+ ns_pthlp->u.next = pp->sched.taskq.local.busy.nosuspend;
+ pp->sched.taskq.local.busy.nosuspend = ns_pthlp;
+ }
if (pp->sched.taskq.in.last) {
ASSERT(pp->sched.taskq.in.first);
ASSERT(!pp->sched.taskq.in.last->u.alive.next);
@@ -744,9 +913,12 @@ enqueue_task(Port *pp, ErtsPortTask *ptp)
pp->sched.taskq.in.first = ptp;
}
pp->sched.taskq.in.last = ptp;
+ flags = enqueue_proc2port_data(pp, sigdp, flags);
+ res = 1;
}
erts_port_task_sched_unlock(&pp->sched);
- return flags;
+ *flagsp = flags;
+ return res;
}
static ERTS_INLINE void
@@ -754,7 +926,7 @@ prepare_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
{
erts_aint32_t act = erts_smp_atomic32_read_nob(&pp->sched.flags);
- if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY)) {
+ if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY_PORT)) {
*execqp = pp->sched.taskq.local.first;
*processing_busy_q_p = 0;
}
@@ -799,10 +971,9 @@ finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q)
*execq = NULL;
- /* guess a likely value */
- act = ERTS_PTS_FLG_EXEC;
- if (execq)
- act |= ERTS_PTS_FLG_HAVE_TASKS;
+ act = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ if (act & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
+ act = check_unset_busy_port_q(pp, act, pp->sched.taskq.bpq);
while (1) {
erts_aint32_t new, exp;
@@ -829,9 +1000,12 @@ select_queue_for_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
{
erts_aint32_t flags = erts_smp_atomic32_read_nob(&pp->sched.flags);
+ if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
+ flags = check_unset_busy_port_q(pp, flags, pp->sched.taskq.bpq);
+
ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
- if (flags & ERTS_PTS_FLG_BUSY) {
+ if (flags & ERTS_PTS_FLG_BUSY_PORT) {
if (*processing_busy_q_p) {
ErtsPortTask *ptp;
@@ -880,7 +1054,7 @@ check_task_for_exec(Port *pp,
ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);
- if ((flags & ERTS_PTS_FLG_BUSY)
+ if ((flags & ERTS_PTS_FLG_BUSY_PORT)
&& (ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)) {
busy_wait_move_to_busy_queue(pp, ptp);
@@ -902,7 +1076,7 @@ check_task_for_exec(Port *pp,
else {
/* Processing busy queue */
- ASSERT(!(flags & ERTS_PTS_FLG_BUSY));
+ ASSERT(!(flags & ERTS_PTS_FLG_BUSY_PORT));
ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);
@@ -1019,10 +1193,7 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp)
erts_smp_atomic_dec_relb(&erts_port_task_outstanding_io_tasks);
break;
case ERTS_PORT_TASK_PROC_SIG:
- ptp->u.alive.td.psig.callback(NULL,
- ERTS_PORT_SFLG_INVALID,
- ERTS_PROC2PORT_SIG_ABORT,
- &ptp->u.alive.td.psig.data);
+ ERTS_INTERNAL_ERROR("Aborted process to port signal");
break;
default:
break;
@@ -1042,14 +1213,15 @@ erts_port_task_abort(ErtsPortTaskHandle *pthp)
void
erts_port_task_abort_nosuspend_tasks(Port *pp)
{
+ erts_aint32_t flags;
ErtsPortTaskHandleList *abort_list;
#ifdef ERTS_SMP
ErtsThrPrgrDelayHandle dhndl = ERTS_THR_PRGR_DHANDLE_INVALID;
#endif
erts_port_task_sched_lock(&pp->sched);
- erts_smp_atomic32_read_band_nob(&pp->sched.flags,
- ~ERTS_PTS_FLG_HAVE_NS_TASKS);
+ flags = erts_smp_atomic32_read_band_nob(&pp->sched.flags,
+ ~ERTS_PTS_FLG_HAVE_NS_TASKS);
abort_list = pp->sched.taskq.local.busy.nosuspend;
pp->sched.taskq.local.busy.nosuspend = NULL;
erts_port_task_sched_unlock(&pp->sched);
@@ -1117,7 +1289,7 @@ erts_port_task_abort_nosuspend_tasks(Port *pp)
#endif
schedule_port_task_handle_list_free(pthlp);
- abort_nosuspend_task(type, &td);
+ abort_nosuspend_task(pp, type, &td);
}
}
@@ -1131,6 +1303,8 @@ erts_port_task_schedule(Eterm id,
ErtsPortTaskType type,
...)
{
+ ErtsProc2PortSigData *sigdp = NULL;
+ ErtsPortTaskHandleList *ns_pthlp = NULL;
#ifdef ERTS_SMP
ErtsRunQueue *xrunq;
ErtsThrPrgrDelayHandle dhndl;
@@ -1138,7 +1312,7 @@ erts_port_task_schedule(Eterm id,
ErtsRunQueue *runq;
Port *pp;
ErtsPortTask *ptp = NULL;
- erts_aint32_t act;
+ erts_aint32_t act, add_flags;
if (pthp && erts_port_task_is_scheduled(pthp)) {
ASSERT(0);
@@ -1195,7 +1369,6 @@ erts_port_task_schedule(Eterm id,
break;
}
case ERTS_PORT_TASK_PROC_SIG: {
- ErtsProc2PortSigData *sigdp;
va_list argp;
ASSERT(!pthp);
va_start(argp, type);
@@ -1204,31 +1377,40 @@ erts_port_task_schedule(Eterm id,
ptp->u.alive.td.psig.callback = va_arg(argp, ErtsProc2PortSigCallback);
ptp->u.alive.flags |= va_arg(argp, int);
va_end(argp);
- if (ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND)
- save_nosuspend_handle(pp, ptp);
- else
+ if (!(ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND))
set_handle(ptp, pthp);
+ else {
+ ns_pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST,
+ sizeof(ErtsPortTaskHandleList));
+ set_handle(ptp, &ns_pthlp->handle);
+ }
break;
}
default:
break;
}
- act = enqueue_task(pp, ptp);
- if (act & ERTS_PTS_FLG_EXIT) {
+ if (!enqueue_task(pp, ptp, sigdp, ns_pthlp, &act)) {
reset_handle(ptp);
- goto fail;
+ if (ns_pthlp && !(act & ERTS_PTS_FLG_EXIT))
+ goto abort_nosuspend;
+ else
+ goto fail;
}
+ add_flags = ERTS_PTS_FLG_HAVE_TASKS;
+ if (ns_pthlp)
+ add_flags |= ERTS_PTS_FLG_HAVE_NS_TASKS;
+
while (1) {
erts_aint32_t new, exp;
- if ((act & ERTS_PTS_FLG_HAVE_TASKS)
+ if ((act & add_flags) == add_flags
&& (act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
goto done; /* Done */
new = exp = act;
- new |= ERTS_PTS_FLG_HAVE_TASKS;
+ new |= add_flags;
if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
new |= ERTS_PTS_FLG_IN_RUNQ;
@@ -1281,6 +1463,22 @@ done:
return 0;
+abort_nosuspend:
+
+#ifdef ERTS_SMP
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ erts_port_dec_refc(pp);
+#endif
+
+ abort_nosuspend_task(pp, ptp->type, &ptp->u.alive.td);
+
+ ASSERT(ns_pthlp);
+ erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp);
+ if (ptp)
+ port_task_free(ptp);
+
+ return 0;
+
fail:
#ifdef ERTS_SMP
@@ -1288,6 +1486,9 @@ fail:
erts_port_dec_refc(pp);
#endif
+ if (ns_pthlp)
+ erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp);
+
if (ptp)
port_task_free(ptp);
@@ -1444,13 +1645,24 @@ erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
ptp->u.alive.td.io.event_data);
io_tasks_executed++;
break;
- case ERTS_PORT_TASK_PROC_SIG:
+ case ERTS_PORT_TASK_PROC_SIG: {
+ ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
- reds += ptp->u.alive.td.psig.callback(pp,
- state,
- ERTS_PROC2PORT_SIG_EXEC,
- &ptp->u.alive.td.psig.data);
+ if (!pp->sched.taskq.bpq)
+ reds += ptp->u.alive.td.psig.callback(pp,
+ state,
+ ERTS_PROC2PORT_SIG_EXEC,
+ sigdp);
+ else {
+ ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
+ reds += ptp->u.alive.td.psig.callback(pp,
+ state,
+ ERTS_PROC2PORT_SIG_EXEC,
+ sigdp);
+ dequeued_proc2port_data(pp, size);
+ }
break;
+ }
case ERTS_PORT_TASK_DIST_CMD:
reds += erts_dist_command(pp, CONTEXT_REDS-reds);
break;
@@ -1633,12 +1845,23 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp)
break;
case ERTS_PORT_TASK_DIST_CMD:
break;
- case ERTS_PORT_TASK_PROC_SIG:
- ptp->u.alive.td.psig.callback(NULL,
- ERTS_PORT_SFLG_INVALID,
- ERTS_PROC2PORT_SIG_ABORT_CLOSED,
- &ptp->u.alive.td.psig.data);
+ case ERTS_PORT_TASK_PROC_SIG: {
+ ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
+ if (!pp->sched.taskq.bpq)
+ ptp->u.alive.td.psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT_CLOSED,
+ sigdp);
+ else {
+ ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
+ ptp->u.alive.td.psig.callback(NULL,
+ ERTS_PORT_SFLG_INVALID,
+ ERTS_PROC2PORT_SIG_ABORT_CLOSED,
+ sigdp);
+ aborted_proc2port_data(pp, size);
+ }
break;
+ }
default:
erl_exit(ERTS_ABORT_EXIT,
"Invalid port task type: %d\n",
diff --git a/erts/emulator/beam/erl_port_task.h b/erts/emulator/beam/erl_port_task.h
index c41f8104c7..9976604042 100644
--- a/erts/emulator/beam/erl_port_task.h
+++ b/erts/emulator/beam/erl_port_task.h
@@ -66,16 +66,20 @@ typedef enum {
extern erts_smp_atomic_t erts_port_task_outstanding_io_tasks;
#endif
-#define ERTS_PTS_FLG_IN_RUNQ (((erts_aint32_t) 1) << 0)
-#define ERTS_PTS_FLG_EXEC (((erts_aint32_t) 1) << 1)
-#define ERTS_PTS_FLG_HAVE_TASKS (((erts_aint32_t) 1) << 2)
-#define ERTS_PTS_FLG_EXIT (((erts_aint32_t) 1) << 3)
-#define ERTS_PTS_FLG_BUSY (((erts_aint32_t) 1) << 4)
-#define ERTS_PTS_FLG_HAVE_BUSY_TASKS (((erts_aint32_t) 1) << 5)
-#define ERTS_PTS_FLG_HAVE_NS_TASKS (((erts_aint32_t) 1) << 6)
-#define ERTS_PTS_FLG_PARALLELISM (((erts_aint32_t) 1) << 7)
-#define ERTS_PTS_FLG_FORCE_SCHED (((erts_aint32_t) 1) << 8)
+#define ERTS_PTS_FLG_IN_RUNQ (((erts_aint32_t) 1) << 0)
+#define ERTS_PTS_FLG_EXEC (((erts_aint32_t) 1) << 1)
+#define ERTS_PTS_FLG_HAVE_TASKS (((erts_aint32_t) 1) << 2)
+#define ERTS_PTS_FLG_EXIT (((erts_aint32_t) 1) << 3)
+#define ERTS_PTS_FLG_BUSY_PORT (((erts_aint32_t) 1) << 4)
+#define ERTS_PTS_FLG_BUSY_PORT_Q (((erts_aint32_t) 1) << 5)
+#define ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q (((erts_aint32_t) 1) << 6)
+#define ERTS_PTS_FLG_HAVE_BUSY_TASKS (((erts_aint32_t) 1) << 7)
+#define ERTS_PTS_FLG_HAVE_NS_TASKS (((erts_aint32_t) 1) << 8)
+#define ERTS_PTS_FLG_PARALLELISM (((erts_aint32_t) 1) << 9)
+#define ERTS_PTS_FLG_FORCE_SCHED (((erts_aint32_t) 1) << 10)
+#define ERTS_PTS_FLGS_BUSY \
+ (ERTS_PTS_FLG_BUSY_PORT | ERTS_PTS_FLG_BUSY_PORT_Q)
#define ERTS_PTS_FLGS_FORCE_SCHEDULE_OP \
(ERTS_PTS_FLG_EXIT \
@@ -84,6 +88,15 @@ extern erts_smp_atomic_t erts_port_task_outstanding_io_tasks;
| ERTS_PTS_FLG_EXEC \
| ERTS_PTS_FLG_FORCE_SCHED)
+#define ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_HIGH 8192
+#define ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_LOW 4096
+
+typedef struct {
+ ErlDrvSizeT high;
+ erts_smp_atomic_t low;
+ erts_smp_atomic_t size;
+} ErtsPortTaskBusyPortQ;
+
typedef struct ErtsPortTask_ ErtsPortTask;
typedef struct ErtsPortTaskBusyCallerTable_ ErtsPortTaskBusyCallerTable;
typedef struct ErtsPortTaskHandleList_ ErtsPortTaskHandleList;
@@ -104,6 +117,7 @@ typedef struct {
ErtsPortTask *first;
ErtsPortTask *last;
} in;
+ ErtsPortTaskBusyPortQ *bpq;
} taskq;
erts_smp_atomic32_t flags;
#ifdef ERTS_SMP
@@ -113,6 +127,8 @@ typedef struct {
ERTS_GLB_INLINE void erts_port_task_handle_init(ErtsPortTaskHandle *pthp);
ERTS_GLB_INLINE int erts_port_task_is_scheduled(ErtsPortTaskHandle *pthp);
+ERTS_GLB_INLINE void erts_port_task_pre_init_sched(ErtsPortTaskSched *ptsp,
+ ErtsPortTaskBusyPortQ *bpq);
ERTS_GLB_INLINE void erts_port_task_init_sched(ErtsPortTaskSched *ptsp,
Eterm id);
ERTS_GLB_INLINE void erts_port_task_fini_sched(ErtsPortTaskSched *ptsp);
@@ -138,6 +154,18 @@ erts_port_task_is_scheduled(ErtsPortTaskHandle *pthp)
return ((void *) erts_smp_atomic_read_nob(pthp)) != NULL;
}
+ERTS_GLB_INLINE void erts_port_task_pre_init_sched(ErtsPortTaskSched *ptsp,
+ ErtsPortTaskBusyPortQ *bpq)
+{
+ if (bpq) {
+ erts_aint_t low = (erts_aint_t) ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_LOW;
+ erts_smp_atomic_init_nob(&bpq->low, low);
+ bpq->high = (ErlDrvSizeT) ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_HIGH;
+ erts_smp_atomic_init_nob(&bpq->size, (erts_aint_t) 0);
+ }
+ ptsp->taskq.bpq = bpq;
+}
+
ERTS_GLB_INLINE void
erts_port_task_init_sched(ErtsPortTaskSched *ptsp, Eterm instr_id)
{
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 9024e9ab52..32f3a675fa 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -270,9 +270,10 @@ static Port *create_port(char *name,
Eterm pid,
int *enop)
{
+ ErtsPortTaskBusyPortQ *busy_port_queue;
Port *prt;
char *p;
- size_t port_size, size;
+ size_t port_size, busy_port_queue_size, size;
erts_aint32_t state = ERTS_PORT_SFLG_CONNECTED;
erts_aint32_t x_pts_flgs = 0;
#ifdef DEBUG
@@ -288,7 +289,13 @@ static Port *create_port(char *name,
}
else
#endif
- port_size = size = sizeof(Port);
+ port_size = size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port));
+
+ busy_port_queue_size
+ = ((driver->flags & ERL_DRV_FLAG_NO_BUSY_MSGQ)
+ ? 0
+ : ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErtsPortTaskBusyPortQ)));
+ size += busy_port_queue_size;
size += sys_strlen(name) + 1;
@@ -302,6 +309,13 @@ static Port *create_port(char *name,
prt = (Port *) p;
p += port_size;
+ if (!busy_port_queue_size)
+ busy_port_queue = NULL;
+ else {
+ busy_port_queue = (ErtsPortTaskBusyPortQ *) p;
+ p += busy_port_queue_size;
+ }
+
#ifdef ERTS_SMP
if (driver_lock) {
prt->lock = driver_lock;
@@ -320,6 +334,8 @@ static Port *create_port(char *name,
prt->cleanup = 0;
#endif
+ erts_port_task_pre_init_sched(&prt->sched, busy_port_queue);
+
prt->name = p;
sys_strcpy(p, name);
prt->drv_ptr = driver;
@@ -503,8 +519,7 @@ erts_save_suspend_process_on_port(Port *prt, Process *process)
erts_aint32_t flags;
erts_port_task_sched_lock(&prt->sched);
flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
- saved = ((flags & (ERTS_PTS_FLG_BUSY
- | ERTS_PTS_FLG_EXIT)) == ERTS_PTS_FLG_BUSY);
+ saved = (flags & ERTS_PTS_FLGS_BUSY) && !(flags & ERTS_PTS_FLG_EXIT);
if (saved)
erts_proclist_store_last(&prt->suspended, erts_proclist_create(process));
erts_port_task_sched_unlock(&prt->sched);
@@ -1530,7 +1545,7 @@ bad_port_signal(Process *c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = flags;
+ sigdp->flags = (flags & ~ERTS_P2P_SIG_TYPE_MASK) | ERTS_P2P_SIG_TYPE_BAD;
return erts_schedule_proc2port_signal(c_p,
prt,
@@ -1736,7 +1751,7 @@ erts_port_output(Process *c_p,
erts_driver_t *drv = prt->drv_ptr;
size_t size;
int try_call;
- erts_aint32_t sched_flags, busy_flag, invalid_flags;
+ erts_aint32_t sched_flags, busy_flgs, invalid_flags;
int task_flags;
ErtsProc2PortSigCallback port_sig_callback;
ErlDrvBinary *cbin = NULL;
@@ -1747,10 +1762,10 @@ erts_port_output(Process *c_p,
| ERTS_PORT_SIG_FLG_NOSUSPEND
| ERTS_PORT_SIG_FLG_FORCE)) == 0);
- busy_flag = ((flags & ERTS_PORT_SIG_FLG_FORCE)
+ busy_flgs = ((flags & ERTS_PORT_SIG_FLG_FORCE)
? ((erts_aint32_t) 0)
- : ERTS_PTS_FLG_BUSY);
- invalid_flags = busy_flag;
+ : ERTS_PTS_FLGS_BUSY);
+ invalid_flags = busy_flgs;
if (!refp)
invalid_flags |= ERTS_PTS_FLG_PARALLELISM;
@@ -1759,10 +1774,10 @@ erts_port_output(Process *c_p,
*/
sched_flags = erts_smp_atomic32_read_nob(&prt->sched.flags);
- if (sched_flags & (busy_flag|ERTS_PTS_FLG_EXIT))
- return ((sched_flags & busy_flag)
- ? ERTS_PORT_OP_BUSY
- : ERTS_PORT_OP_DROPPED);
+ if (sched_flags & (busy_flgs|ERTS_PTS_FLG_EXIT))
+ return ((sched_flags & ERTS_PTS_FLG_EXIT)
+ ? ERTS_PORT_OP_DROPPED
+ : ERTS_PORT_OP_BUSY);
try_call = !(sched_flags & (invalid_flags|ERTS_PTS_FLGS_FORCE_SCHEDULE_OP));
@@ -1949,6 +1964,7 @@ erts_port_output(Process *c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
+ sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUTV;
sigdp->u.outputv.from = from;
sigdp->u.outputv.evp = evp;
sigdp->u.outputv.cbinp = cbin;
@@ -2038,6 +2054,7 @@ erts_port_output(Process *c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
+ sigdp->flags = ERTS_P2P_SIG_TYPE_OUTPUT;
sigdp->u.output.from = from;
sigdp->u.output.bufp = buf;
sigdp->u.output.size = size;
@@ -2045,7 +2062,7 @@ erts_port_output(Process *c_p,
}
task_flags = ERTS_PT_FLG_WAIT_BUSY;
- sigdp->flags = flags;
+ sigdp->flags |= flags;
if (flags & (ERTS_P2P_SIG_DATA_FLG_FORCE|ERTS_P2P_SIG_DATA_FLG_NOSUSPEND)) {
task_flags = 0;
if (flags & ERTS_P2P_SIG_DATA_FLG_FORCE)
@@ -2070,7 +2087,7 @@ erts_port_output(Process *c_p,
return res;
}
- if ((sched_flags & (busy_flag|ERTS_PTS_FLG_EXIT)) == ERTS_PTS_FLG_BUSY)
+ if (!(sched_flags & ERTS_PTS_FLG_EXIT) && (sched_flags & busy_flgs))
return ERTS_PORT_OP_BUSY_SCHEDULED;
return res;
@@ -2201,7 +2218,7 @@ erts_port_exit(Process *c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = flags;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_EXIT | flags;
sigdp->u.exit.from = from;
if (is_immed(reason)) {
@@ -2367,7 +2384,7 @@ erts_port_connect(Process *c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = flags;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_CONNECT | flags;
sigdp->u.connect.from = from;
sigdp->u.connect.connected = connect_id;
@@ -2424,7 +2441,7 @@ erts_port_unlink(Process *c_p, Port *prt, Eterm from, Eterm *refp)
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_UNLINK;
sigdp->u.unlink.from = from;
return erts_schedule_proc2port_signal(c_p,
@@ -2509,7 +2526,7 @@ erts_port_link(Process *c_p, Port *prt, Eterm to, Eterm *refp)
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_LINK;
sigdp->u.link.port = prt->common.id;
sigdp->u.link.to = to;
@@ -3851,7 +3868,7 @@ erts_port_control(Process* c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_CONTROL;
sigdp->u.control.binp = binp;
sigdp->u.control.command = command;
sigdp->u.control.bufp = bufp;
@@ -4131,7 +4148,7 @@ erts_port_call(Process* c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_CALL;
sigdp->u.call.command = command;
sigdp->u.call.bufp = bufp;
sigdp->u.call.size = size;
@@ -4298,7 +4315,7 @@ erts_port_info(Process* c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_INFO;
sigdp->u.info.item = item;
return erts_schedule_proc2port_signal(c_p,
@@ -4383,7 +4400,7 @@ erts_port_set_data(Process* c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_SET_DATA;
sigdp->u.set_data.data = set_data;
sigdp->u.set_data.bp = bp;
@@ -4516,7 +4533,7 @@ erts_port_get_data(Process* c_p,
}
sigdp = erts_port_task_alloc_p2p_sig_data();
- sigdp->flags = 0;
+ sigdp->flags = ERTS_P2P_SIG_TYPE_GET_DATA;
return erts_schedule_proc2port_signal(c_p,
prt,
@@ -4608,8 +4625,8 @@ set_busy_port(ErlDrvPort dprt, int on)
if (on) {
flags = erts_smp_atomic32_read_bor_acqb(&prt->sched.flags,
- ERTS_PTS_FLG_BUSY);
- if (flags & ERTS_PTS_FLG_BUSY)
+ ERTS_PTS_FLG_BUSY_PORT);
+ if (flags & ERTS_PTS_FLG_BUSY_PORT)
return; /* Already busy */
if (flags & ERTS_PTS_FLG_HAVE_NS_TASKS)
@@ -4623,11 +4640,9 @@ set_busy_port(ErlDrvPort dprt, int on)
}
#endif
} else {
- ErtsProcList *plp;
-
flags = erts_smp_atomic32_read_band_acqb(&prt->sched.flags,
- ~ERTS_PTS_FLG_BUSY);
- if (!(flags & ERTS_PTS_FLG_BUSY))
+ ~ERTS_PTS_FLG_BUSY_PORT);
+ if (!(flags & ERTS_PTS_FLG_BUSY_PORT))
return; /* Already non-busy */
#ifdef USE_VM_PROBES
@@ -4645,52 +4660,60 @@ set_busy_port(ErlDrvPort dprt, int on)
erts_dist_port_not_busy(prt);
}
- /*
- * Resume, in a round-robin fashion, all processes waiting on the port.
- *
- * This version submitted by Tony Rogvall. The earlier version used
- * to resume the processes in order, which caused starvation of all but
- * the first process.
- */
+ if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q))
+ erts_port_resume_procs(prt);
+ }
+}
- erts_port_task_sched_lock(&prt->sched);
- plp = prt->suspended;
- prt->suspended = NULL;
- erts_port_task_sched_unlock(&prt->sched);
+void
+erts_port_resume_procs(Port *prt)
+{
+ /*
+ * Resume, in a round-robin fashion, all processes waiting on the port.
+ *
+ * This version submitted by Tony Rogvall. The earlier version used
+ * to resume the processes in order, which caused starvation of all but
+ * the first process.
+ */
+ ErtsProcList *plp;
- if (erts_proclist_fetch(&plp, NULL)) {
+ erts_port_task_sched_lock(&prt->sched);
+ plp = prt->suspended;
+ prt->suspended = NULL;
+ erts_port_task_sched_unlock(&prt->sched);
+
+ if (erts_proclist_fetch(&plp, NULL)) {
#ifdef USE_VM_PROBES
- /*
- * Hrm, for blocked dist ports, plp always seems to be NULL.
- * That's not so fun.
- * Well, another way to get the same info is using a D
- * script to correlate an earlier process-port_blocked+pid
- * event with a later process-scheduled event. That's
- * subject to the multi-CPU races with how events are
- * handled, but hey, that way works most of the time.
- */
- if (DTRACE_ENABLED(process_port_unblocked)) {
- DTRACE_CHARBUF(pid_str, 16);
- ErtsProcList* plp2 = plp;
-
- erts_snprintf(port_str, sizeof(port_str),
- "%T", prt->common.id);
- while (plp2 != NULL) {
- erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid);
- DTRACE2(process_port_unblocked, pid_str, port_str);
- }
- }
-#endif
+ /*
+ * Hrm, for blocked dist ports, plp always seems to be NULL.
+ * That's not so fun.
+ * Well, another way to get the same info is using a D
+ * script to correlate an earlier process-port_blocked+pid
+ * event with a later process-scheduled event. That's
+ * subject to the multi-CPU races with how events are
+ * handled, but hey, that way works most of the time.
+ */
+ if (DTRACE_ENABLED(process_port_unblocked)) {
+ DTRACE_CHARBUF(port_str, 16);
+ DTRACE_CHARBUF(pid_str, 16);
+ ErtsProcList* plp2 = plp;
- /* First proc should be resumed last */
- if (plp->next) {
- plp->next->prev = NULL;
- erts_resume_processes(plp->next);
- plp->next = NULL;
+ erts_snprintf(port_str, sizeof(port_str), "%T", prt->common.id);
+ while (plp2 != NULL) {
+ erts_snprintf(pid_str, sizeof(pid_str), "%T", plp2->pid);
+ DTRACE2(process_port_unblocked, pid_str, port_str);
}
- erts_resume_processes(plp);
- }
+ }
+#endif
+
+ /* First proc should be resumed last */
+ if (plp->next) {
+ plp->next->prev = NULL;
+ erts_resume_processes(plp->next);
+ plp->next = NULL;
+ }
+ erts_resume_processes(plp);
}
}
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c
index 4a87b0a385..f68ce68407 100644
--- a/erts/emulator/drivers/common/inet_drv.c
+++ b/erts/emulator/drivers/common/inet_drv.c
@@ -692,6 +692,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#define INET_LOPT_UDP_READ_PACKETS 33 /* Number of packets to read */
#define INET_OPT_RAW 34 /* Raw socket options */
#define INET_LOPT_TCP_SEND_TIMEOUT_CLOSE 35 /* auto-close on send timeout or not */
+#define INET_LOPT_TCP_MSGQ_HIWTRMRK 36 /* set local high watermark */
+#define INET_LOPT_TCP_MSGQ_LOWTRMRK 37 /* set local low watermark */
/* SCTP options: a separate range, from 100: */
#define SCTP_OPT_RTOINFO 100
#define SCTP_OPT_ASSOCINFO 101
@@ -808,6 +810,8 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#define INET_HIGH_WATERMARK (1024*8) /* 8k pending high => busy */
#define INET_LOW_WATERMARK (1024*4) /* 4k pending => allow more */
+#define INET_HIGH_MSGQ_WATERMARK (1024*8) /* 8k pending high => busy */
+#define INET_LOW_MSGQ_WATERMARK (1024*4) /* 4k pending => allow more */
#define INET_INFINITY 0xffffffff /* infinity value */
@@ -5537,6 +5541,28 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
}
continue;
+ case INET_LOPT_TCP_MSGQ_HIWTRMRK:
+ if (desc->stype == SOCK_STREAM) {
+ ErlDrvSizeT high;
+ if (ival < ERL_DRV_BUSY_MSGQ_LIM_MIN
+ || ERL_DRV_BUSY_MSGQ_LIM_MAX < ival)
+ return -1;
+ high = (ErlDrvSizeT) ival;
+ erl_drv_busy_msgq_limits(desc->port, NULL, &high);
+ }
+ continue;
+
+ case INET_LOPT_TCP_MSGQ_LOWTRMRK:
+ if (desc->stype == SOCK_STREAM) {
+ ErlDrvSizeT low;
+ if (ival < ERL_DRV_BUSY_MSGQ_LIM_MIN
+ || ERL_DRV_BUSY_MSGQ_LIM_MAX < ival)
+ return -1;
+ low = (ErlDrvSizeT) ival;
+ erl_drv_busy_msgq_limits(desc->port, &low, NULL);
+ }
+ continue;
+
case INET_LOPT_TCP_SEND_TIMEOUT:
if (desc->stype == SOCK_STREAM) {
tcp_descriptor* tdesc = (tcp_descriptor*) desc;
@@ -6422,6 +6448,32 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc,
}
continue;
+ case INET_LOPT_TCP_MSGQ_HIWTRMRK:
+ if (desc->stype == SOCK_STREAM) {
+ ErlDrvSizeT high = ERL_DRV_BUSY_MSGQ_READ_ONLY;
+ *ptr++ = opt;
+ erl_drv_busy_msgq_limits(desc->port, NULL, &high);
+ ival = high > INT_MAX ? INT_MAX : (int) high;
+ put_int32(ival, ptr);
+ }
+ else {
+ TRUNCATE_TO(0,ptr);
+ }
+ continue;
+
+ case INET_LOPT_TCP_MSGQ_LOWTRMRK:
+ if (desc->stype == SOCK_STREAM) {
+ ErlDrvSizeT low = ERL_DRV_BUSY_MSGQ_READ_ONLY;
+ *ptr++ = opt;
+ erl_drv_busy_msgq_limits(desc->port, &low, NULL);
+ ival = low > INT_MAX ? INT_MAX : (int) low;
+ put_int32(ival, ptr);
+ }
+ else {
+ TRUNCATE_TO(0,ptr);
+ }
+ continue;
+
case INET_LOPT_TCP_SEND_TIMEOUT:
if (desc->stype == SOCK_STREAM) {
*ptr++ = opt;
@@ -8029,6 +8081,7 @@ static int tcp_inet_init(void)
static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args)
{
+ ErlDrvSizeT q_low, q_high;
tcp_descriptor* desc;
DEBUGF(("tcp_inet_start(%ld) {\r\n", (long)port));
@@ -8038,6 +8091,17 @@ static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args)
return ERL_DRV_ERROR_ERRNO;
desc->high = INET_HIGH_WATERMARK;
desc->low = INET_LOW_WATERMARK;
+ q_high = INET_HIGH_MSGQ_WATERMARK;
+ q_low = INET_LOW_MSGQ_WATERMARK;
+ if (q_low < ERL_DRV_BUSY_MSGQ_LIM_MIN)
+ q_low = ERL_DRV_BUSY_MSGQ_LIM_MIN;
+ else if (q_low > ERL_DRV_BUSY_MSGQ_LIM_MAX)
+ q_low = ERL_DRV_BUSY_MSGQ_LIM_MAX;
+ if (q_high < ERL_DRV_BUSY_MSGQ_LIM_MIN)
+ q_high = ERL_DRV_BUSY_MSGQ_LIM_MIN;
+ else if (q_high > ERL_DRV_BUSY_MSGQ_LIM_MAX)
+ q_high = ERL_DRV_BUSY_MSGQ_LIM_MAX;
+ erl_drv_busy_msgq_limits(port, &q_low, &q_high);
desc->send_timeout = INET_INFINITY;
desc->send_timeout_close = 0;
desc->busy_on_send = 0;
@@ -8061,6 +8125,7 @@ static ErlDrvData tcp_inet_start(ErlDrvPort port, char* args)
static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s,
ErlDrvTermData owner, int* err)
{
+ ErlDrvSizeT q_low, q_high;
ErlDrvPort port = desc->inet.port;
tcp_descriptor* copy_desc;
@@ -8099,6 +8164,13 @@ static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s,
FREE(copy_desc);
return NULL;
}
+
+ /* Read busy msgq limits of parent */
+ q_low = q_high = ERL_DRV_BUSY_MSGQ_READ_ONLY;
+ erl_drv_busy_msgq_limits(desc->inet.port, &q_low, &q_high);
+ /* Write same busy msgq limits to child */
+ erl_drv_busy_msgq_limits(port, &q_low, &q_high);
+
copy_desc->inet.port = port;
copy_desc->inet.dport = driver_mk_port(port);
*err = 0;
diff --git a/erts/preloaded/ebin/prim_inet.beam b/erts/preloaded/ebin/prim_inet.beam
index ae9c17bbcb..e5c1de86f4 100644
--- a/erts/preloaded/ebin/prim_inet.beam
+++ b/erts/preloaded/ebin/prim_inet.beam
Binary files differ
diff --git a/erts/preloaded/src/prim_inet.erl b/erts/preloaded/src/prim_inet.erl
index 846ae97ed2..f7a9718cde 100644
--- a/erts/preloaded/src/prim_inet.erl
+++ b/erts/preloaded/src/prim_inet.erl
@@ -1071,6 +1071,8 @@ enc_opt(deliver) -> ?INET_LOPT_DELIVER;
enc_opt(exit_on_close) -> ?INET_LOPT_EXITONCLOSE;
enc_opt(high_watermark) -> ?INET_LOPT_TCP_HIWTRMRK;
enc_opt(low_watermark) -> ?INET_LOPT_TCP_LOWTRMRK;
+enc_opt(high_msgq_watermark) -> ?INET_LOPT_TCP_MSGQ_HIWTRMRK;
+enc_opt(low_msgq_watermark) -> ?INET_LOPT_TCP_MSGQ_LOWTRMRK;
enc_opt(bit8) -> ?INET_LOPT_BIT8;
enc_opt(send_timeout) -> ?INET_LOPT_TCP_SEND_TIMEOUT;
enc_opt(send_timeout_close) -> ?INET_LOPT_TCP_SEND_TIMEOUT_CLOSE;
@@ -1125,6 +1127,8 @@ dec_opt(?INET_LOPT_DELIVER) -> deliver;
dec_opt(?INET_LOPT_EXITONCLOSE) -> exit_on_close;
dec_opt(?INET_LOPT_TCP_HIWTRMRK) -> high_watermark;
dec_opt(?INET_LOPT_TCP_LOWTRMRK) -> low_watermark;
+dec_opt(?INET_LOPT_TCP_MSGQ_HIWTRMRK) -> high_msgq_watermark;
+dec_opt(?INET_LOPT_TCP_MSGQ_LOWTRMRK) -> low_msgq_watermark;
dec_opt(?INET_LOPT_BIT8) -> bit8;
dec_opt(?INET_LOPT_TCP_SEND_TIMEOUT) -> send_timeout;
dec_opt(?INET_LOPT_TCP_SEND_TIMEOUT_CLOSE) -> send_timeout_close;
@@ -1220,6 +1224,8 @@ type_opt_1(deliver) ->
type_opt_1(exit_on_close) -> bool;
type_opt_1(low_watermark) -> int;
type_opt_1(high_watermark) -> int;
+type_opt_1(low_msgq_watermark) -> int;
+type_opt_1(high_msgq_watermark) -> int;
type_opt_1(bit8) ->
{enum,[{clear, ?INET_BIT8_CLEAR},
{set, ?INET_BIT8_SET},
diff --git a/lib/kernel/doc/src/inet.xml b/lib/kernel/doc/src/inet.xml
index bf6c4cfb1a..bfdb163e17 100644
--- a/lib/kernel/doc/src/inet.xml
+++ b/lib/kernel/doc/src/inet.xml
@@ -487,6 +487,36 @@ fe80::204:acff:fe17:bf38
example <c>Size == 2</c>, the data received will match
<c>[Byte1,Byte2|Binary]</c>.</p>
</item>
+ <tag><c>{high_msgq_watermark, Size}</c> (TCP/IP sockets)</tag>
+ <item>
+ <p>The socket message queue will be set into a busy
+ state when the amount of data queued on the message
+ queue reaches this limit. Note that this limit only
+ concerns data that have not yet reached the ERTS internal
+ socket implementation. Default value used is 8 kB.</p>
+ <p>Senders of data to the socket will be suspended if
+ either the socket message queue is busy, or the socket
+ itself is busy.</p>
+ <p>For more information see the <c>low_msgq_watermark</c>,
+ <c>high_watermark</c>, and <c>low_watermark</c> options.</p>
+ <p>Note that distribution sockets will disable the use of
+ <c>high_msgq_watermark</c> and <c>low_msgq_watermark</c>,
+ and will instead use the
+ <seealso marker="erts:erlang#system_info_dist_buf_busy_limit">distribution
+ buffer busy limit</seealso> which is a similar feature.</p>
+ </item>
+ <tag><c>{high_watermark, Size}</c> (TCP/IP sockets)</tag>
+ <item>
+ <p>The socket will be set into a busy state when the amount
+ of data queued internally by the ERTS socket implementation
+ reaches this limit. Default value used is 8 kB.</p>
+ <p>Senders of data to the socket will be suspended if
+ either the socket message queue is busy, or the socket
+ itself is busy.</p>
+ <p>For more information see the <c>low_watermark</c>,
+ <c>high_msgq_watermark</c>, and <c>low_msqg_watermark</c>
+ options.</p>
+ </item>
<tag><c>{keepalive, Boolean}</c>(TCP/IP sockets)</tag>
<item>
<p>Enables/disables periodic transmission on a connected
@@ -495,6 +525,40 @@ fe80::204:acff:fe17:bf38
considered broken and an error message will be sent to
the controlling process. Default disabled.</p>
</item>
+ <tag><c>{low_msgq_watermark, Size}</c> (TCP/IP sockets)</tag>
+ <item>
+ <p>If the socket message queue is in a busy state, the
+ socket message queue will be set in a not busy state when
+ the amount of data queued in the message queue falls
+ below this limit. Note that this limit only concerns data
+ that have not yet reached the ERTS internal socket
+ implementation. Default value used is 4 kB.</p>
+ <p>Senders that have been suspended due to either a
+ busy message queue or a busy socket, will be resumed
+ when neither the socket message queue, nor the socket
+ are busy.</p>
+ <p>For more information see the <c>high_msgq_watermark</c>,
+ <c>high_watermark</c>, and <c>low_watermark</c> options.</p>
+ <p>Note that distribution sockets will disable the use of
+ <c>high_msgq_watermark</c> and <c>low_msgq_watermark</c>,
+ and will instead use the
+ <seealso marker="erts:erlang#system_info_dist_buf_busy_limit">distribution
+ buffer busy limit</seealso> which is a similar feature.</p>
+ </item>
+ <tag><c>{low_watermark, Size}</c> (TCP/IP sockets)</tag>
+ <item>
+ <p>If the socket is in a busy state, the socket will
+ be set in a not busy state when the amount of data
+ queued internally by the ERTS socket implementation
+ falls below this limit. Default value used is 4 kB.</p>
+ <p>Senders that have been suspended due to either a
+ busy message queue or a busy socket, will be resumed
+ when neither the socket message queue, nor the socket
+ are busy.</p>
+ <p>For more information see the <c>high_watermark</c>,
+ <c>high_msgq_watermark</c>, and <c>low_msgq_watermark</c>
+ options.</p>
+ </item>
<tag><c>{nodelay, Boolean}</c>(TCP/IP sockets)</tag>
<item>
<p>If <c>Boolean == true</c>, the <c>TCP_NODELAY</c> option
diff --git a/lib/kernel/src/gen_tcp.erl b/lib/kernel/src/gen_tcp.erl
index ef6bfdf7f4..519c1bb8b4 100644
--- a/lib/kernel/src/gen_tcp.erl
+++ b/lib/kernel/src/gen_tcp.erl
@@ -38,9 +38,11 @@
{dontroute, boolean()} |
{exit_on_close, boolean()} |
{header, non_neg_integer()} |
+ {high_msgq_watermark, pos_integer()} |
{high_watermark, non_neg_integer()} |
{keepalive, boolean()} |
{linger, {boolean(), non_neg_integer()}} |
+ {low_msgq_watermark, pos_integer()} |
{low_watermark, non_neg_integer()} |
{mode, list | binary} | list | binary |
{nodelay, boolean()} |
@@ -68,9 +70,11 @@
dontroute |
exit_on_close |
header |
+ high_msgq_watermark |
high_watermark |
keepalive |
linger |
+ low_msgq_watermark |
low_watermark |
mode |
nodelay |
diff --git a/lib/kernel/src/inet.erl b/lib/kernel/src/inet.erl
index abaf4486dc..e791bb135f 100644
--- a/lib/kernel/src/inet.erl
+++ b/lib/kernel/src/inet.erl
@@ -535,6 +535,7 @@ options() ->
buffer, header, active, packet, deliver, mode,
multicast_if, multicast_ttl, multicast_loop,
exit_on_close, high_watermark, low_watermark,
+ high_msgq_watermark, low_msgq_watermark,
bit8, send_timeout, send_timeout_close
].
@@ -552,8 +553,9 @@ stats() ->
connect_options() ->
[tos, priority, reuseaddr, keepalive, linger, sndbuf, recbuf, nodelay,
header, active, packet, packet_size, buffer, mode, deliver,
- exit_on_close, high_watermark, low_watermark, bit8, send_timeout,
- send_timeout_close, delay_send,raw].
+ exit_on_close, high_watermark, low_watermark, high_msgq_watermark,
+ low_msgq_watermark, bit8, send_timeout, send_timeout_close,
+ delay_send,raw].
connect_options(Opts, Family) ->
BaseOpts =
@@ -608,8 +610,9 @@ con_add(Name, Val, R, Opts, AllOpts) ->
listen_options() ->
[tos, priority, reuseaddr, keepalive, linger, sndbuf, recbuf, nodelay,
header, active, packet, buffer, mode, deliver, backlog,
- exit_on_close, high_watermark, low_watermark, bit8, send_timeout,
- send_timeout_close, delay_send, packet_size,raw].
+ exit_on_close, high_watermark, low_watermark, high_msgq_watermark,
+ low_msgq_watermark, bit8, send_timeout, send_timeout_close,
+ delay_send, packet_size,raw].
listen_options(Opts, Family) ->
BaseOpts =
diff --git a/lib/kernel/src/inet_int.hrl b/lib/kernel/src/inet_int.hrl
index cf893c73eb..2dfbf75d25 100644
--- a/lib/kernel/src/inet_int.hrl
+++ b/lib/kernel/src/inet_int.hrl
@@ -141,6 +141,8 @@
-define(INET_LOPT_READ_PACKETS, 33).
-define(INET_OPT_RAW, 34).
-define(INET_LOPT_TCP_SEND_TIMEOUT_CLOSE, 35).
+-define(INET_LOPT_TCP_MSGQ_HIWTRMRK, 36).
+-define(INET_LOPT_TCP_MSGQ_LOWTRMRK, 37).
% Specific SCTP options: separate range:
-define(SCTP_OPT_RTOINFO, 100).
-define(SCTP_OPT_ASSOCINFO, 101).